class ThreadPoolExecutor
extends AbtractExecutorService
能提升大量异步任务的执行效率,提供了线程等资源的管理,并且维护了一些基本的统计信息。
两个重要的参数:corePoolSize和maximumPoolSize。当一个任务被提交时,如果正在运行的线程数少于corePoolSize,不管线程是否空闲,则创建新的线程处理该任务请求;如果线程数大于corePoolSize,且小于maximumPoolSize,只有在任务队列已满时才创建新的线程。设置corePoolSize==maximumPoolSize,可以创建一个固定大小的线程池;也可以设置maximumPoolSize为Integer.MAX_VALUE。
默认只有新任务来的时候才会创建和启动线程,也可以通过重写方法prestartCoreThread或prestartAllCoreThreads,可以用一个非空的队列来在线程池创建的时候预启动线程。
线程的创建默认是通过defaultThreadFactory,被创建的线程有相同的优先级(NORM_PRIORITY),线程工厂指定的线程名,相同的线程组,非守护线程等。用户可以实现自己的ThreadFactory来指定上述属性。线程必须有“modifyThread”运行时权限,如果没有该权限,一些服务将无法使用,如运行时的配置改变无法生效,线程池关闭不一定成功等。
如果线程池当前的线程数大于corePoolSize,多余的线程在空闲时间超过keepAliveTime后会被终止;如果设置allowCoreThreadTimeOut(true),则该限制也同样应用于core threads。
使用BlockingQueue来存放任务队列。当新任务来时:
1. 如果小于corePoolSize的线程数执行,则创建新线程;
2. 如果大于等于corePoolSize的线程数执行,则新任务添加到队列中;
3. 如果任务请求无法添加到队列中,创建新线程,当超过maximumPoolSize时任务会被拒绝。
通常有三种任务队列策略:
1. SynchronousQueue:不保存任务,直接将任务提交线程处理。可以避免一组相互依赖的请求内部可能出现的锁。通常要求设置maximumPoolSize为最大;
2. LinkedBlockingQueue:队列无最大上限。创建的线程数不会超过corePoolSize,maximumPoolSize在这种情况下不起作用。适合相互独立的线程,可以用来缓冲突发的大量请求;
3. ArrayBlockingQueue:使用有限的maximumPoolSize,避免资源的耗尽;但协调和控制的难度增大,需要在队列大小和线程池大小之间做平衡。使用大的队列和小的线程池,可以最小化cpu使用和系统资源,以及上下文切换消耗,但导致低吞吐量。如果任务频繁的阻塞(如I/O),系统可以分配比用户允许的更多的线程时间。使用小的队列通常要求大的线程池,可以保持CPU忙碌,但可能带来更大的调度开销,以致于较低吞吐量。
当线程池已经关闭,或者使用了有限的maximumPoolSize和队列并且已饱和,则新的任务将会被拒绝。这时将调用rejectedExecution。有四种预定义的线程拒绝策略:
1. ThreadPoolExecutor.AbortPolicy,抛出RejectedExecutionException;
2. ThreadPoolExecutor.CallerRunsPolicy,(the thread that invokes execute itself runs the task)提供了一种简单的反馈控制机制来降低任务提交的频率;
3. ThreadPoolExecutor.DiscardPolicy,任务被抛弃;
4. ThreadPoolExecutor.DiscardOldestPolicy,队列顶端的任务被抛弃,并且重试任务执行;
除了预定义,也可以定义和使用其它类型的RejectedExecutionHandler。
同时提供了可被重写的方法beforeExecute和afterExecute,用于控制运行环境。此外terminated可以被重写,用于线程池停止时需要处理的过程。如果hook或回调方法执行过程中抛出异常,则内部的worker线程可能运行失败并且意外终止。
abstract class AbstractExecutorService
implements ExecutorService
ExecutorService的默认实现。
通过newTaskFor实现了submit,invokeAll,invokeAny;
interface ExecutorService
implements Executor
shutdown允许之前提交的任务在中止之前执行完成,不再接受新任务,但不等待任务执行完成;
shutdownNow并且尝试停止当前执行的任务,阻止等待的任务执行,返回等待执行的任务列表,不保证一定能够关闭执行中的任务(例如调用interrupt中断线程有可能失败);
isShutdown:
isTerminated: 调用shutdown/shutdownNow之后,如果所有任务都完成了则返回true。
awaitTermination(timeout,unit): 在shutdown请求后,阻塞到所有任务都执行完成或者超时,或者当前线程被中断。如果是超时,返回false,如果该executor中止返回true;
<T> Future<T> submit(Callable<T> task): Future模式,提交后返回一个Future,通过调用Future的get方法获取实际任务成功执行的返回值;
<T> Future<T> submit(Runnable task, T result):
Future<?> submit(Runnable task);
invokeAll(tasks): 执行给定的一组任务,返回Future;
interface Executor
将任务提交与任务执行的机制解耦。不显式的创建线程,而是用例如executor.execute(new RunnableTask1());的形式。
不严格要求异步执行,可以直接在调用者线程中执行:
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
但通常是为每个任务开启一个新的线程。
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
Executor只有一个execute(Runnable command);方法,要求如果任务不能被接受执行,则抛出RejectedExecutionException,如果command为null,抛出NullPointerException异常。