/* This file is part of the source code for 3D-XplorMath-J, Version 1.0 (January 2008). * Copyright (c) 2008 The 3D-XplorMath Consortium (http://3d-xplormath.org). * This source code is released under a BSD License, which allows redistribution * in source and binary form, with or without modification, provided copyright * and license information are included, and with no warranty or guarantee of * any kind. For details, see http://3d-xplormath.org/j/source/BSDLicense.txt */ package vmm.core; import java.util.ArrayList; import java.util.Collection; import vmm.functions.EvalStack; /** * Provides parallization for a collection of tasks, where each task is an * object of type Runnable. The tasks can be run by a pool of threads, * where the size of the pool can be specified in the TaskManager constructor. * The default is for the number ofthreads to be equal to the number of processors. *

There are three ways to run a collection of tasks: {@link #executeAndWait(Collection)} * will run all the task in a collection of Runnable objects and will return only * when all the tasks have finished. {@link #executeAsync(Collection)} will run * the tasks asynchronously. The return value of this method is an object of * type {@link TaskManager.Job}; this "job" object can be used to get status infomation * about the job, to cancel the job, and to wait for the job to finish. * {@link #createJob()} also returns an object of type TaskManager.Job, but in this * case the job does not initially have any tasks to do. Tasks can be added by * calling {@link TaskManager.Job#add(Runnable)}. After all tasks that are part of * the job have been added, {@link TaskManager.Job#close()} must be called to * indicate that the job is complete. (Note that it is not possible to * add additional tasks to a job that has been created using {@link #executeAsync(Collection)}.) */ public class TaskManager { /** * Represents a job that consists of the execution of a number of tasks. * It is not possible to create an object of this class directly. Objects * of type TaskManager.Job are returned by {@link TaskManager#executeAsync(Collection)} * and {@link TaskManager#createJob()}. The tasks that are part of a job will * be exectued by the TaskManager that created the job, using the pool of * threads in that TaskManager. A job can start executing as soon as it has * been created and the first task has been added. * @see TaskManager */ public static class Job { private final TaskManager owner; private final ArrayList tasks; private volatile boolean closed; private volatile boolean finished; private volatile boolean canceled; private int nextTask; private volatile int finishedTaskCount; private volatile int failedTaskCount; private final ArrayList waitingFinishedTasks; private final ArrayList waitingFailedTasks; private Job(TaskManager owner, Collection tasks) { this.owner = owner; if (tasks == null) this.tasks = new ArrayList(); else this.tasks = new ArrayList(tasks); for (int i = this.tasks.size() - 1; i >= 0; i--) if (this.tasks.get(i) == null) this.tasks.remove(i); waitingFinishedTasks = new ArrayList(); waitingFailedTasks = new ArrayList(); } private void finish(Runnable task) { synchronized(owner) { if (finished) return; finishedTaskCount++; waitingFinishedTasks.add(task); if (closed && finishedTaskCount + failedTaskCount == tasks.size()) { finished = true; owner.finish(this); } } } private void fail(Runnable task) { synchronized(owner) { if (finished) return; failedTaskCount++; waitingFailedTasks.add(task); if (closed && finishedTaskCount + failedTaskCount == tasks.size()) { finished = true; owner.finish(this); } } } private Runnable nextTask() { synchronized(owner) { if (finished || nextTask >= tasks.size()) return null; else return tasks.get(nextTask++); } } /** * Add a task to this job. The job will not finish until all the tasks that have * been added to the job have finished (successfully or because of an exception). Also, * the job must be closed before it can finish. * Note that tasks cannot be added to a job that has is "closed". See {@link #close()}. * @param task the task that is to be added to this job. A null value is ignored. * @throws IllegalStateException if the job has already been closed. */ public void add(Runnable task) { if (task == null) return; if (closed) throw new IllegalStateException("Can't add a new task to a job after the job has been closed."); synchronized(owner) { tasks.add(task); owner.notifyAll(); } } /** * "Close" this job, making it possible for the job to complete. Closing a job also makes * it impossible to add new tasks to the job. A job that was created using * {@link TaskManager#executeAsync(Collection)} is already closed when it is returned by * that method. A job that was created using {@link TaskManager#createJob()} must * be closed, or it will be impossible for that job to finish; the job should be closed * by calling this method after all the tasks that are part of the job have been added. */ synchronized public void close() { if (closed) return; closed = true; if (finishedTaskCount + failedTaskCount == tasks.size()) { finished = true; owner.finish(this); } } /** * Tells the fraction of tasks that have been added to this job that have been completed. * @return a number between 0 and 1 obtained by dividing the number of completed tasks by * the number of tasks that have been added. If no tasks have been added, the return value * is 1. Note that the fractionDone can go down, if more tasks are added to the job. */ public double fractionDone() { if (tasks.size() == 0) return 1; else return (double)(finishedTaskCount + failedTaskCount)/tasks.size(); } /** * Returns the number of tasks in this job that have been completed successfully. */ public int finishedTaskCount() { return finishedTaskCount; } /** * Returns the number of tasks in this job that have been terminated because of an exception. */ public int failedTaskCount() { return failedTaskCount; } /** * Returns the number of tasks that have been added to this job. */ public int totalTaskCount() { return tasks.size(); } /** * Cancel the job. Tasks that have not yet been started will not be discarded; however, * tasks that are in progress can run to completion and might finish after this method * returns. This method can be called to cancel a job even if that job has not yet been * closed. */ public void cancel() { synchronized(owner) { finished = true; canceled = true; closed = true; owner.finish(this); owner.notifyAll(); } } /** * Tells whether the job is finished. A job is finished either when all the tasks * that are part of the job are done or when the job has been canceled. */ public boolean isFinished() { return finished; } /** * Tells whether hte job has been canceled. A job can be canceled by calling * {@link #cancel()}. */ public boolean isCanceled() { return canceled; } /** * Returns an array that contains tasks from this job that have completed successfully. * If this method has been called previously, only the newly completed tasks, since the * last call, are returned. The return value can be an empty array, if there are no * newly completed tasks, but the return value is never null. This method can be * used to retreive completed tasks for further processing. */ public Runnable[] finishedTasks() { synchronized(owner) { if (waitingFinishedTasks.size() == 0) return new Runnable[0]; Runnable[] tasks = new Runnable[waitingFinishedTasks.size()]; waitingFinishedTasks.toArray(tasks); waitingFinishedTasks.clear(); return tasks; } } /** * Returns an array that contains tasks from this job that have terminated because of * an Error or Exception. If this method has been called previously, only the newly * completed tasks, since the last call, are returned. The return value is non-null * but can be empty. */ public Runnable[] failedTasks() { synchronized(owner) { if (waitingFailedTasks.size() == 0) return new Runnable[0]; Runnable[] tasks = new Runnable[waitingFailedTasks.size()]; waitingFailedTasks.toArray(tasks); waitingFailedTasks.clear(); return tasks; } } /** * Waits either a specified amount of time or indefinitely for this job to finish. * The method will return only after the job completes or after the specified timeout * if the job does not complete within that time. * The return value tells whether or not the job has completed. If the job is * already complete when this method is called, it returns immedialtely. * @param timeoutMilliseconds the maximum time to wait for the job to finish. A value * of 0 (or less) means to wait as long as it takes for the job to finish. * @return true if the job has finished, false if not. Note that if the argument is * less than or equal to 0, then the return value has to be true. */ public boolean await(int timeoutMilliseconds) { synchronized(owner) { if (finished) return true; try { if (timeoutMilliseconds <= 0) owner.wait(); else owner.wait(timeoutMilliseconds); } catch (InterruptedException e) { } return finished; } } } private final ThreadPool threadPool; private boolean shutDown; private ArrayList activeJobs; private int nextJobForTask; /** * Create a TaskManager that will use a pool of threads with one thread per available processor. * @see #TaskManager(int) */ public TaskManager() { this(0); } /** * Create a TaskManager that will use a pool of threads with a specified number of threads. * The threads are used to execute "jobs", where a job consists of a collection of * Runnable objects. Note that even a thread pool with just one thread can be useful * for asynchronous execution. * @param threadPoolSize the number of thread to be used. If the value is 0 (or less), * then the number of threads will be equal to the number of available processors. */ public TaskManager(int threadPoolSize) { if (threadPoolSize <= 0) threadPoolSize = Runtime.getRuntime().availableProcessors(); activeJobs = new ArrayList(); threadPool = new ThreadPool(this, threadPoolSize); } /** * This method should be called before discarding the TaskManager. Any jobs that * have not been completed are cancled (using {@link TaskManager.Job#cancel()}). * Then the threads in the thread pool are allowed to die. It is not possible * to add new jobs to a TaskManager after the TaskManager has been shut down. */ synchronized public void shutDown() { shutDown = true; for (Job job : activeJobs) job.cancel(); notifyAll(); } /** * Returns the number of threads that will be used in the thread pool. * The value was set in the constructor and does not change. * The return value is a positive integer. */ public int getThreadPoolSize() { return threadPool.getSize(); } /** * Executes all the tasks in a collection of tasks. This method does not * return until all the tasks have finished, either by terminating * normally or by throwing an exception. If the thread pool size is * 1, all the tasks are executed in the calling thread, without the * use of any additional threads. If the thread pool size is greater * than 1, all the threads in the pool are used to execute the tasks, * resulting in some parallelization. * @see #getThreadPoolSize() * @param tasks the tasks to be performed. Must be non-null. Each task in the collection * is an object of type Runnable. Null entries in the collection are ignored. (Note that the * actual parameter can be (for example) an ArrayList declared as of type * ArrayList<Runnable> or ArrayList<Type> where Type is a class * that implements the Runnable interface. * @throws IllegalStateException if this method is called after {@link #shutDown()} has been called. * @throws NullPointerException if the argument is null */ public void executeAndWait(Collection tasks) { if (shutDown) throw new IllegalStateException("Can't execute tasks after shutdown."); if (tasks == null) throw new NullPointerException("The collection of tasks can't be null."); if (getThreadPoolSize() == 1) { for (Runnable task : tasks) { try { if (task != null) task.run(); } finally { // if one task fails, continue with the next task } } } else { Job job = executeAsync(tasks); job.await(0); } } /** * Creates a job to execute a specified collection of tasks, and starts working * on the job. This method returns immedialtely, and the execution of the tasks * procedes asynchronously. The tasks are executed using the TaskManager's thread pool. * @param tasks a non-null collection of tasks to be executed. Each task is an object * of that implements the Runnable interface. Null tasks are ignored. * @return a "job" object that can be used to get status information about the job and * to wait for the job to be completed. It is not possible to add additional tasks * to this job. * @throws IllegalStateException if this method is called after {@link #shutDown()} has been called. * @throws NullPointerException if the argument is null */ synchronized public Job executeAsync(Collection tasks) { if (shutDown) throw new IllegalStateException("Can't execute tasks after shutdown."); if (tasks == null) throw new NullPointerException("The collection of tasks can't be null."); Job job = new Job(this,tasks); job.close(); activeJobs.add(job); notifyAll(); return job; } /** * Creates a "job" to which a collection of tasks can be added. See {@link TaskManager.Job#add(Runnable)}. * The job must be "closed," using {@link TaskManager.Job#close()} after all the tasks have been added, * or the job will never complete. * @return the job object. This can be used to add tasks to the job, to get status information about * the job, and to wait for the job to complete. * @throws IllegalStateException if this method is called after {@link #shutDown()} has been called. */ synchronized public Job createJob() { if (shutDown) throw new IllegalStateException("Can't execute tasks after shutdown."); Job job = new Job(this,null); activeJobs.add(job); return job; } /** * Tells whether this TaskManager has at least one job that has not yet finished. Note that the * TaskManager might not really be doing anyting, if none of the jobs have tasks that * need to be performed. */ synchronized public boolean busy() { return activeJobs.size() > 0; } synchronized private void finish(Job job) { // called by a job when it is finished, so the job can be removed from this task manager. if (!shutDown) { int jobnum = activeJobs.indexOf(job); if (jobnum == -1) return; activeJobs.remove(jobnum); if (jobnum < nextJobForTask) nextJobForTask--; } notifyAll(); } synchronized private Object[] nextTask() { // goes through the list of active jobs looking for a task to do; can block if no task is available Runnable task = null; Job job = null; while (task == null && !shutDown) { if (activeJobs.size() > 0) { if (nextJobForTask >= activeJobs.size()) nextJobForTask = 0; int jobnum = nextJobForTask; do { job = activeJobs.get(jobnum); task = job.nextTask(); jobnum++; if (jobnum >= activeJobs.size()) jobnum = 0; } while (task == null && jobnum != nextJobForTask); } if (task == null) { try { wait(); } catch (InterruptedException e) { } } } if (shutDown) return null; nextJobForTask++; return new Object[] { task, job }; } private static class ThreadPool { final Worker[] pool; final TaskManager owner; boolean shutDown; ThreadPool(TaskManager owner, int poolSize) { this.owner = owner; pool = new Worker[poolSize]; int priority = Thread.currentThread().getPriority(); for (int i = 0; i < poolSize; i++) { pool[i] = new Worker(); pool[i].setDaemon(true); try { pool[i].setPriority(priority-1); } catch (Exception e) { } pool[i].start(); } } int getSize() { return pool.length; } class Worker extends Thread { public void run() { Job job; try { while (true) { Runnable task; Object[] taskinfo = owner.nextTask(); if (taskinfo == null) break; task = (Runnable)taskinfo[0]; job = (Job)taskinfo[1]; boolean ok = true; try { task.run(); } catch (Throwable e) { ok = false; } finally { if (ok) job.finish(task); else job.fail(task); } } } finally { EvalStack.perThreadRelease(this); } } } } }