workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current RejectedExecutionHandler.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* RejectedExecutionHandler, if task cannot be accepted
* for execution
* @throws NullPointerException if command is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
for (;;) {
if (runState != RUNNING) {
reject(command);
return;
}
if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
return;
if (workQueue.offer(command))
return;
Runnable r = addIfUnderMaximumPoolSize(command);
if (r == command)
return;
if (r == null) {
reject(command);
return;
}
// else retry
}
}
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be
* accepted. Invocation has no additional effect if already shut
* down.
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate threads that
* the caller is not permitted to modify because it does not hold
* {@link java.lang.RuntimePermission}("modifyThread"),
* or the security manager's checkAccess method denies access.
*/
public void shutdown() {
// Fail if caller doesn't have modifyThread permission. We
// explicitly check permissions directly because we can't trust
// implementations of SecurityManager to correctly override
// the "check access" methods such that our documented
// security policy is implemented.
SecurityManager security = System.getSecurityManager();
if (security != null)
java.security.AccessController.checkPermission(shutdownPerm);
boolean fullyTerminated = false;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (workers.size() > 0) {
// Check if caller can modify worker threads. This
// might not be true even if passed above check, if
// the SecurityManager treats some threads specially.
if (security != null) {
for (Worker w: workers)
security.checkAccess(w.thread);
}
int state = runState;
if (state == RUNNING) // don't override shutdownNow
runState = SHUTDOWN;
try {
for (Worker w: workers)
w.interruptIfIdle();
} catch(SecurityException se) {
// If SecurityManager allows above checks, but
// then unexpectedly throws exception when
// interrupting threads (which it ought not do),
// back out as cleanly as we can. Some threads may
// have been killed but we remain in non-shutdown
// state.
runState = state;
throw se;
}
}
else { // If no workers, trigger full termination now
fullyTerminated = true;
runState = TERMINATED;
termination.signalAll();
}
} finally {
mainLock.unlock();
}
if (fullyTerminated)
terminated();
}
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks that were
* awaiting execution.
*
* This implementation cancels tasks via {@link
* Thread#interrupt}, so if any tasks mask or fail to respond to
* interrupts, they may never terminate.
*
* @return list of tasks that never commenced execution
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate threads that
* the caller is not permitted to modify because it does not hold
* {@link java.lang.RuntimePermission}("modifyThread"),
* or the security manager's checkAccess method denies access.
*/
public List shutdownNow() {
// Almost the same code as shutdown()
SecurityManager security = System.getSecurityManager();
if (security != null)
java.security.AccessController.checkPermission(shutdownPerm);
boolean fullyTerminated = false;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (workers.size() > 0) {
if (security != null) {
for (Worker w: workers)
security.checkAccess(w.thread);
}
int state = runState;
if (state != TERMINATED)
runState = STOP;
try {
for (Worker w : workers)
w.interruptNow();
} catch(SecurityException se) {
runState = state; // back out;
throw se;
}
}
else { // If no workers, trigger full termination now
fullyTerminated = true;
runState = TERMINATED;
termination.signalAll();
}
} finally {
mainLock.unlock();
}
if (fullyTerminated)
terminated();
return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY));
}
public boolean isShutdown() {
return runState != RUNNING;
}
/**
* Returns true if this executor is in the process of terminating
* after shutdown or shutdownNow but has not
* completely terminated. This method may be useful for
* debugging. A return of true reported a sufficient
* period after shutdown may indicate that submitted tasks have
* ignored or suppressed interruption, causing this executor not
* to properly terminate.
* @return true if terminating but not yet terminated.
*/
public boolean isTerminating() {
return runState == STOP;
}
public boolean isTerminated() {
return runState == TERMINATED;
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runState == TERMINATED)
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
/**
* Invokes shutdown when this executor is no longer
* referenced.
*/
protected void finalize() {
shutdown();
}
/**
* Sets the thread factory used to create new threads.
*
* @param threadFactory the new thread factory
* @throws NullPointerException if threadFactory is null
* @see #getThreadFactory
*/
public void setThreadFactory(ThreadFactory threadFactory) {
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
/**
* Returns the thread factory used to create new threads.
*
* @return the current thread factory
* @see #setThreadFactory
*/
public ThreadFactory getThreadFactory() {
return threadFactory;
}
/**
* Sets a new handler for unexecutable tasks.
*
* @param handler the new handler
* @throws NullPointerException if handler is null
* @see #getRejectedExecutionHandler
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
/**
* Returns the current handler for unexecutable tasks.
*
* @return the current handler
* @see #setRejectedExecutionHandler
*/
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handler;
}
/**
* Returns the task queue used by this executor. Access to the
* task queue is intended primarily for debugging and monitoring.
* This queue may be in active use. Retrieving the task queue
* does not prevent queued tasks from executing.
*
* @return the task queue
*/
public BlockingQueue getQueue() {
return workQueue;
}
/**
* Removes this task from the executor's internal queue if it is
* present, thus causing it not to be run if it has not already
* started.
*
* This method may be useful as one part of a cancellation
* scheme. It may fail to remove tasks that have been converted
* into other forms before being placed on the internal queue. For
* example, a task entered using submit might be
* converted into a form that maintains Future status.
* However, in such cases, method {@link ThreadPoolExecutor#purge}
* may be used to remove those Futures that have been cancelled.
*
*
* @param task the task to remove
* @return true if the task was removed
*/
public boolean remove(Runnable task) {
return getQueue().remove(task);
}
/**
* Tries to remove from the work queue all {@link Future}
* tasks that have been cancelled. This method can be useful as a
* storage reclamation operation, that has no other impact on
* functionality. Cancelled tasks are never executed, but may
* accumulate in work queues until worker threads can actively
* remove them. Invoking this method instead tries to remove them now.
* However, this method may fail to remove tasks in
* the presence of interference by other threads.
*/
public void purge() {
// Fail if we encounter interference during traversal
try {
Iterator it = getQueue().iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future>) {
Future> c = (Future>)r;
if (c.isCancelled())
it.remove();
}
}
}
catch(ConcurrentModificationException ex) {
return;
}
}
/**
* Sets the core number of threads. This overrides any value set
* in the constructor. If the new value is smaller than the
* current value, excess existing threads will be terminated when
* they next become idle. If larger, new threads will, if needed,
* be started to execute any queued tasks.
*
* @param corePoolSize the new core size
* @throws IllegalArgumentException if corePoolSize
* less than zero
* @see #getCorePoolSize
*/
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int extra = this.corePoolSize - corePoolSize;
this.corePoolSize = corePoolSize;
if (extra < 0) {
int n = workQueue.size();
// We have to create initially-idle threads here
// because we otherwise have no recourse about
// what to do with a dequeued task if addThread fails.
while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize ) {
Thread t = addThread(null);
if (t != null)
t.start();
else
break;
}
}
else if (extra > 0 && poolSize > corePoolSize) {
Iterator it = workers.iterator();
while (it.hasNext() &&
extra-- > 0 &&
poolSize > corePoolSize &&
workQueue.remainingCapacity() == 0)
it.next().interruptIfIdle();
}
} finally {
mainLock.unlock();
}
}
/**
* Returns the core number of threads.
*
* @return the core number of threads
* @see #setCorePoolSize
*/
public int getCorePoolSize() {
return corePoolSize;
}
/**
* Starts a core thread, causing it to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed. This method will return false
* if all core threads have already been started.
* @return true if a thread was started
*/
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null);
}
/**
* Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed.
* @return the number of threads started.
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))
++n;
return n;
}
/**
* Sets the maximum allowed number of threads. This overrides any
* value set in the constructor. If the new value is smaller than
* the current value, excess existing threads will be
* terminated when they next become idle.
*
* @param maximumPoolSize the new maximum
* @throws IllegalArgumentException if maximumPoolSize less than zero or
* the {@link #getCorePoolSize core pool size}
* @see #getMaximumPoolSize
*/
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int extra = this.maximumPoolSize - maximumPoolSize;
this.maximumPoolSize = maximumPoolSize;
if (extra > 0 && poolSize > maximumPoolSize) {
Iterator it = workers.iterator();
while (it.hasNext() &&
extra > 0 &&
poolSize > maximumPoolSize) {
it.next().interruptIfIdle();
--extra;
}
}
} finally {
mainLock.unlock();
}
}
/**
* Returns the maximum allowed number of threads.
*
* @return the maximum allowed number of threads
* @see #setMaximumPoolSize
*/
public int getMaximumPoolSize() {
return maximumPoolSize;
}
/**
* Sets the time limit for which threads may remain idle before
* being terminated. If there are more than the core number of
* threads currently in the pool, after waiting this amount of
* time without processing a task, excess threads will be
* terminated. This overrides any value set in the constructor.
* @param time the time to wait. A time value of zero will cause
* excess threads to terminate immediately after executing tasks.
* @param unit the time unit of the time argument
* @throws IllegalArgumentException if time less than zero
* @see #getKeepAliveTime
*/
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
this.keepAliveTime = unit.toNanos(time);
}
/**
* Returns the thread keep-alive time, which is the amount of time
* which threads in excess of the core pool size may remain
* idle before being terminated.
*
* @param unit the desired time unit of the result
* @return the time limit
* @see #setKeepAliveTime
*/
public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
/* Statistics */
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize() {
return poolSize;
}
/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers) {
if (w.isActive())
++n;
}
return n;
} finally {
mainLock.unlock();
}
}
/**
* Returns the largest number of threads that have ever
* simultaneously been in the pool.
*
* @return the number of threads
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate total number of tasks that have been
* scheduled for execution. Because the states of tasks and
* threads may change dynamically during computation, the returned
* value is only an approximation, but one that does not ever
* decrease across successive calls.
*
* @return the number of tasks
*/
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isActive())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate total number of tasks that have
* completed execution. Because the states of tasks and threads
* may change dynamically during computation, the returned value
* is only an approximation, but one that does not ever decrease
* across successive calls.
*
* @return the number of tasks
*/
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
/**
* Method invoked prior to executing the given Runnable in the
* given thread. This method is invoked by thread t that
* will execute task r, and may be used to re-initialize
* ThreadLocals, or to perform logging. Note: To properly nest
* multiple overridings, subclasses should generally invoke
* super.beforeExecute at the end of this method.
*
* @param t the thread that will run task r.
* @param r the task that will be executed.
*/
protected void beforeExecute(Thread t, Runnable r) { }
/**
* Method invoked upon completion of execution of the given
* Runnable. This method is invoked by the thread that executed
* the task. If non-null, the Throwable is the uncaught exception
* that caused execution to terminate abruptly. Note: To properly
* nest multiple overridings, subclasses should generally invoke
* super.afterExecute at the beginning of this method.
*
* @param r the runnable that has completed.
* @param t the exception that caused termination, or null if
* execution completed normally.
*/
protected void afterExecute(Runnable r, Throwable t) { }
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* super.terminated within this method.
*/
protected void terminated() { }
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the execute method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a CallerRunsPolicy.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* RejectedExecutionException.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an AbortPolicy.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a DiscardPolicy.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries execute, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a DiscardOldestPolicy for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}