ThreadPoolExecutor笔记

原创
2014/04/27 20:43
阅读数 279

class ThreadPoolExecutor

    extends AbtractExecutorService

    能提升大量异步任务的执行效率,提供了线程等资源的管理,并且维护了一些基本的统计信息。

    两个重要的参数:corePoolSizemaximumPoolSize。当一个任务被提交时,如果正在运行的线程数少于corePoolSize,不管线程是否空闲,则创建新的线程处理该任务请求;如果线程数大于corePoolSize,且小于maximumPoolSize,只有在任务队列已满时才创建新的线程。设置corePoolSize==maximumPoolSize,可以创建一个固定大小的线程池;也可以设置maximumPoolSizeInteger.MAX_VALUE

    默认只有新任务来的时候才会创建和启动线程,也可以通过重写方法prestartCoreThreadprestartAllCoreThreads,可以用一个非空的队列来在线程池创建的时候预启动线程。

    线程的创建默认是通过defaultThreadFactory,被创建的线程有相同的优先级(NORM_PRIORITY),线程工厂指定的线程名,相同的线程组,非守护线程等。用户可以实现自己的ThreadFactory来指定上述属性。线程必须有“modifyThread”运行时权限,如果没有该权限,一些服务将无法使用,如运行时的配置改变无法生效,线程池关闭不一定成功等。

    如果线程池当前的线程数大于corePoolSize,多余的线程在空闲时间超过keepAliveTime后会被终止;如果设置allowCoreThreadTimeOut(true),则该限制也同样应用于core threads

    使用BlockingQueue来存放任务队列。当新任务来时:

    1. 如果小于corePoolSize的线程数执行,则创建新线程;

    2. 如果大于等于corePoolSize的线程数执行,则新任务添加到队列中;

    3. 如果任务请求无法添加到队列中,创建新线程,当超过maximumPoolSize时任务会被拒绝。

通常有三种任务队列策略:

    1. SynchronousQueue:不保存任务,直接将任务提交线程处理。可以避免一组相互依赖的请求内部可能出现的锁。通常要求设置maximumPoolSize为最大;

    2. LinkedBlockingQueue:队列无最大上限。创建的线程数不会超过corePoolSizemaximumPoolSize在这种情况下不起作用。适合相互独立的线程,可以用来缓冲突发的大量请求;

    3. ArrayBlockingQueue:使用有限的maximumPoolSize,避免资源的耗尽;但协调和控制的难度增大,需要在队列大小和线程池大小之间做平衡。使用大的队列和小的线程池,可以最小化cpu使用和系统资源,以及上下文切换消耗,但导致低吞吐量。如果任务频繁的阻塞(如I/O),系统可以分配比用户允许的更多的线程时间。使用小的队列通常要求大的线程池,可以保持CPU忙碌,但可能带来更大的调度开销,以致于较低吞吐量。

    当线程池已经关闭,或者使用了有限的maximumPoolSize和队列并且已饱和,则新的任务将会被拒绝。这时将调用rejectedExecution。有四种预定义的线程拒绝策略:

    1. ThreadPoolExecutor.AbortPolicy,抛出RejectedExecutionException

    2. ThreadPoolExecutor.CallerRunsPolicy,(the thread that invokes execute itself runs the task)提供了一种简单的反馈控制机制来降低任务提交的频率;

    3. ThreadPoolExecutor.DiscardPolicy,任务被抛弃;

    4. ThreadPoolExecutor.DiscardOldestPolicy,队列顶端的任务被抛弃,并且重试任务执行;

    除了预定义,也可以定义和使用其它类型的RejectedExecutionHandler

    同时提供了可被重写的方法beforeExecuteafterExecute,用于控制运行环境。此外terminated可以被重写,用于线程池停止时需要处理的过程。如果hook或回调方法执行过程中抛出异常,则内部的worker线程可能运行失败并且意外终止。


abstract class AbstractExecutorService

        implements ExecutorService

     ExecutorService的默认实现。

    通过newTaskFor实现了submitinvokeAllinvokeAny


interface ExecutorService

        implements Executor

    shutdown允许之前提交的任务在中止之前执行完成,不再接受新任务,但不等待任务执行完成;

    shutdownNow并且尝试停止当前执行的任务,阻止等待的任务执行,返回等待执行的任务列表,不保证一定能够关闭执行中的任务(例如调用interrupt中断线程有可能失败);

    isShutdown:

    isTerminated: 调用shutdown/shutdownNow之后,如果所有任务都完成了则返回true

    awaitTermination(timeout,unit): shutdown请求后,阻塞到所有任务都执行完成或者超时,或者当前线程被中断。如果是超时,返回false,如果该executor中止返回true

    <T> Future<T> submit(Callable<T> task): Future模式,提交后返回一个Future,通过调用Futureget方法获取实际任务成功执行的返回值;

    <T> Future<T> submit(Runnable taskT result):

    Future<?> submit(Runnable task);

    invokeAll(tasks): 执行给定的一组任务,返回Future


interface Executor

    将任务提交与任务执行的机制解耦。不显式的创建线程,而是用例如executor.execute(new RunnableTask1());的形式。

    不严格要求异步执行,可以直接在调用者线程中执行:

class DirectExecutor implements Executor {

    public void execute(Runnable r) {

        r.run();

    }

}

但通常是为每个任务开启一个新的线程。

class ThreadPerTaskExecutor implements Executor {

    public void execute(Runnable r) {

        new Thread(r).start();

    }

}

    Executor只有一个execute(Runnable command);方法,要求如果任务不能被接受执行,则抛出RejectedExecutionException,如果commandnull,抛出NullPointerException异常。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部