文档章节

ExecutorCompletionService源码分析

AbeJeffrey
 AbeJeffrey
发布于 2017/04/07 11:18
字数 888
阅读 59
收藏 2

ExecutorCompletionService实现了接口CompletionService(关于CompletionService的说明可参见Executor框架详解),其内部组合了Executor,AbstractExecutorService,BlockingQueue,其中Executor用于执行任务,AbstractExecutorService负责适配返回的FutureTask对象(不太明白组合AbstractExecutorService的好处),BlockingQueue则用于存放已完成任务的Future。看源码:

    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

构造方法

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

根据指定executor创建ExecutorCompletionService对象,内部默认使用LinkedBlockingQueue作为完成队列。

    public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

根据指定executor和completionQueue创建ExecutorCompletionService对象,支持定制完成队列。

下面看看ExecutorCompletionService如何包装自己的Future。

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

内部类QueueingFuture用于包装Future,当任务完成后自动调用done()将任务返回的Future添加到队列中,done()在FutureTask的finishCompletion()中调用,可自行查看。

同时ExecutorCompletionService定义自己的实现包装任务Runnable和Callable的结果。

    //包装Callable的返回结果
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else //查看AbstractExecutorService中的newTaskFor实现可知,实际就是new FutureTask<V>(task)
             //为什么还要组合一个AbstractExecutorService呢,感觉多此一举
            return aes.newTaskFor(task);
    }
    //包装Runnable
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);//内部也是使用适配器模式将Runnable包装为Callable
        else
            return aes.newTaskFor(task, result);
    }

submit

下面看看提交任务的实现:

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

代码非常简单,具体执行都由Executor代劳。

获取结果

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

提供以上三种获取任务结果的Future,take和poll都会在取得结果后从队列移除元素,不同的是,当队列为空的表现不同,take会阻塞,poll将返回null,poll(long timeout, TimeUnit unit)则会等待指定时间。

以上则是ExecutorCompletionService的全部源码,非常简单。下面看一下ExecutorCompletionService的应用场景:

现在有一组需要并发执行的任务,各任务执行周期不相同,客户端希望完成一个任务则立即取得执行结果。该场景下,使用ExecutorCompletionService即可优雅的实现。这里选用Runnable作为任务,对ExecutorCompletionService来说,Runnable和Callable都一样,Runnable最终也会封装为Callable,代码如下:

public class CompletionServiceTest {
    private   static ExecutorService            executor;
    private   static int                        numTask=50;
    private   static CompletionService<String>  completionService;
    private static class MyTask implements Runnable{
        private String taskName;
        private MyTask(String taskName){
            this.taskName=taskName;
        }
        @Override
        public void run() {
            long s=System.currentTimeMillis();
            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(this.taskName+" finished! The total time is "+(System.currentTimeMillis()-s)+"ms");
        }
    }
    public static void main(String[] args) {
        startAll();
        try {
            get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        executor.shutdown();//切记需要关闭线程池,否则,进程不会结束
    }
    private static void startAll(){
        if(null==executor){
            executor=Executors.newFixedThreadPool(numTask);
        }
        if(null==completionService){
            completionService=new ExecutorCompletionService<String>(executor);//使用默认队列
        }
        for(int i=0;i<numTask;i++){
            completionService.submit(new MyTask("Task"+i),"task"+i+" return");
        }
    }
    private static void get() throws InterruptedException,ExecutionException{
        if(null!=completionService){
            for(int i = 0;i<numTask;i++ ){ 
                System.out.println(completionService.take().get());
            }  
        }
    }
}

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/874638

© 著作权归作者所有

共有 人打赏支持
AbeJeffrey
粉丝 35
博文 43
码字总数 116095
作品 0
杭州
高级程序员
私信 提问
Java多线程6 CompletionService

CompletionService 1 CompletionService介绍 CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象。 如果你向Executor提交了一个批处理任务...

香沙小熊
2018/11/27
0
0
Java使用Executor执行Callable任务时的几种方法

多线程在需要返回值时,我们知道需要用到Callable和Future。Callable的cell方法可以返回一个值并且可抛出异常,是对Runnable的很好的补充;Future表示了一个任务的周期,它提供了判断任务状态...

东都大狼狗
2018/09/09
0
0
java.util.concurrent.ExecutorCompletionService 源码

线程池相关 源码: 类 ExecutorCompletionService<V> 所有已实现的接口: CompletionService<V> 使用提供的 来执行任务的 。 此类将安排那些完成时提交的任务,把它们放置在可使用 take 访问...

狼王黄师傅
2018/11/29
0
0
源码 | 批量执行invokeAll() && 多选一invokeAny()

原文出处:猴子007 ExecutorService中定义了两个批量执行任务的方法,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中非常方便。invokeAll()在所有任务都完成(包括成功/被中断/...

猴子007
2017/12/02
0
0
源码|批量执行invokeAll()&&多选一invokeAny()

ExecutorService中定义了两个批量执行任务的方法,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中非常方便。invokeAll()在所有任务都完成(包括成功/被中断/超时)后才会返回,i...

猴子007
2017/12/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

cnetos7+docker+rancher构建基于DevOps的全自动CI【01】

来自DevOps实践分享,分享从开发代码到生产环境部署的一条龙操作的实践及经验, 包含工具技术的选型及考量、私有代码库与私有镜像库的应用等。 1、环境选择 安装Rancher环境,一定要在干净的...

Elson
1分钟前
0
0
21分钟教会你分析MaxCompute账单

背景 阿里云大计算服务MaxCompute是一款商业化的大数据分析平台,其计算资源有预付费和后付费两种计费方式。并且产品每天按照project为维度进行计量计费(账单基本情况下会第二天6点前产出)...

zhaowei121
4分钟前
0
0
CTO职场解惑指南系列(一)

基于科技能够改变世界的事实,几乎每个公司的程序员都自带闪光灯。程序员的手和普通人的手自然是有区别的,“我们可是用双手改变了世界” 。(码农真的是靠双手吃饭,呵呵) 这个世界上但凡靠...

阿里云云栖社区
9分钟前
0
0
css实现图片自适应容器宽高

css实现图片自适应容器宽高的做法一般如下所示 <style>div{width: 200px; height: 200px}div img{width: 100%; height: 100%}</style><div><img src="xxxx.png" /></div> 当外层容......

小草先森
9分钟前
0
0
PlatON在CentOS上编译部署

本文作者为万向区块链CTO罗荣阁。 目录 PlatON在CentOS上编译部署 1. CentOS 环境准备 1.1. 使用rpm 安装devtoolset-7 1.2. 使用rpm 安装dos2unix 1.3. 准备PlatON代码 1.4. 确保build脚本正...

万向区块链
17分钟前
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部