Java Executor分析

原创
2016/10/14 16:58
阅读数 53

Executor框架可以快速创建Java线程池。
Executors是Executor框架的基础。所有的Executor和ExecutorService都实现Executor接口,其中ExecutorService是比较重要的接口,Executors提供了很多静态方法可以实现很多ThreadPool。通过ExecutorService的submit可以分别将Callable和Runnable任务提交到线程中处理;通过shutdown()关闭所有线程,Callable返回的Future有结果,Runnable则没有。

Executors类:

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    }

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory);
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory);
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
}


主要方法:
1. newFixedThreadPool返回ThreadPoolExecutor,ThreadPoolExecutor创建固定数目的线程,当有任务submit时,如果没有线程可执行时,则加入任务队列。ThreadPoolExecutor刚开始不创建线程,当有任务提交时,如果poolSize<corePoolSize则创建线程,当poolSize==corePoolSize时,新增任务加入任务队列,如果此时任务队列满了并且poolSize<maximumPoolSize,则创建线程处理,否则就拒绝任务,拒绝策略取决于RejectedExecutionHandler,Worker线程则一直在等待队列getTask和run间循环;

2. newCachedThreadPool返回的也是ThreadPoolExecutor,区别是cache线程数大小没有限制,有60秒的IDLE,如果超过60秒没有任务则会被关闭;

3. newSingleThreadExecutor返回Executors.FinalizableDelegatedExecutorService,底层也是ThreadPoolExecutor,区别是线程是1~1,IDLE为0秒;

4. newScheduledThreadPool返回ScheduledExecutorService,ScheduledExecutorService继承ThreadPoolExecutor并实现了ScheduledExecutorService,ScheduledExecutorService封装一个DelayedWorkQueue,Worker执行一次任务后add进去,另一个Worker触发take,take需要等待一个delay,重复循环下去。

5. newWorkStealingPool返回ForkJoinPool,ForkJoinPool分而治之,ForkJoinPool默认创建CPU数目创建并行等级,会动态的增加或关闭线程,最大线程数限制为32767,空闲线程会主动为别的线程分担任务,从而使CPU处于饱满状态,我们要使用ForkJoin框架,必须首先创建一个ForkJoinTask。它提供在任务中执行fork( )和join( )操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:RecursiveAction用于没有返回结果的任务,RecursiveTask 用于有返回结果的任务。ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部