文档章节

ExecutorCompletionService源码分析

AbeJeffrey
 AbeJeffrey
发布于 2017/04/07 11:18
字数 888
阅读 318
收藏 2

ExecutorCompletionService实现了接口CompletionService(关于CompletionService的说明可参见Executor框架详解),其内部组合了Executor,AbstractExecutorService,BlockingQueue,其中Executor用于执行任务,AbstractExecutorService负责适配返回的FutureTask对象(不太明白组合AbstractExecutorService的好处),BlockingQueue则用于存放已完成任务的Future。看源码:

    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

构造方法

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

根据指定executor创建ExecutorCompletionService对象,内部默认使用LinkedBlockingQueue作为完成队列。

    public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

根据指定executor和completionQueue创建ExecutorCompletionService对象,支持定制完成队列。

下面看看ExecutorCompletionService如何包装自己的Future。

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

内部类QueueingFuture用于包装Future,当任务完成后自动调用done()将任务返回的Future添加到队列中,done()在FutureTask的finishCompletion()中调用,可自行查看。

同时ExecutorCompletionService定义自己的实现包装任务Runnable和Callable的结果。

    //包装Callable的返回结果
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else //查看AbstractExecutorService中的newTaskFor实现可知,实际就是new FutureTask<V>(task)
             //为什么还要组合一个AbstractExecutorService呢,感觉多此一举
            return aes.newTaskFor(task);
    }
    //包装Runnable
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);//内部也是使用适配器模式将Runnable包装为Callable
        else
            return aes.newTaskFor(task, result);
    }

submit

下面看看提交任务的实现:

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

代码非常简单,具体执行都由Executor代劳。

获取结果

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

提供以上三种获取任务结果的Future,take和poll都会在取得结果后从队列移除元素,不同的是,当队列为空的表现不同,take会阻塞,poll将返回null,poll(long timeout, TimeUnit unit)则会等待指定时间。

以上则是ExecutorCompletionService的全部源码,非常简单。下面看一下ExecutorCompletionService的应用场景:

现在有一组需要并发执行的任务,各任务执行周期不相同,客户端希望完成一个任务则立即取得执行结果。该场景下,使用ExecutorCompletionService即可优雅的实现。这里选用Runnable作为任务,对ExecutorCompletionService来说,Runnable和Callable都一样,Runnable最终也会封装为Callable,代码如下:

public class CompletionServiceTest {
    private   static ExecutorService            executor;
    private   static int                        numTask=50;
    private   static CompletionService<String>  completionService;
    private static class MyTask implements Runnable{
        private String taskName;
        private MyTask(String taskName){
            this.taskName=taskName;
        }
        @Override
        public void run() {
            long s=System.currentTimeMillis();
            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(this.taskName+" finished! The total time is "+(System.currentTimeMillis()-s)+"ms");
        }
    }
    public static void main(String[] args) {
        startAll();
        try {
            get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        executor.shutdown();//切记需要关闭线程池,否则,进程不会结束
    }
    private static void startAll(){
        if(null==executor){
            executor=Executors.newFixedThreadPool(numTask);
        }
        if(null==completionService){
            completionService=new ExecutorCompletionService<String>(executor);//使用默认队列
        }
        for(int i=0;i<numTask;i++){
            completionService.submit(new MyTask("Task"+i),"task"+i+" return");
        }
    }
    private static void get() throws InterruptedException,ExecutionException{
        if(null!=completionService){
            for(int i = 0;i<numTask;i++ ){ 
                System.out.println(completionService.take().get());
            }  
        }
    }
}

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/874638

© 著作权归作者所有

AbeJeffrey
粉丝 51
博文 43
码字总数 116095
作品 0
杭州
高级程序员
私信 提问
加载中

评论(0)

Java多线程6 CompletionService

CompletionService 1 CompletionService介绍 CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象。 如果你向Executor提交了一个批处理任务...

香沙小熊
2018/11/27
0
0
Java使用Executor执行Callable任务时的几种方法

多线程在需要返回值时,我们知道需要用到Callable和Future。Callable的cell方法可以返回一个值并且可抛出异常,是对Runnable的很好的补充;Future表示了一个任务的周期,它提供了判断任务状态...

东都大狼狗
2018/09/09
37
0
java.util.concurrent.ExecutorCompletionService 源码

线程池相关 源码: 类 ExecutorCompletionService<V> 所有已实现的接口: CompletionService<V> 使用提供的 来执行任务的 。 此类将安排那些完成时提交的任务,把它们放置在可使用 take 访问...

潦草的犀牛
2018/11/29
38
0
源码 | 批量执行invokeAll() && 多选一invokeAny()

原文出处:猴子007 ExecutorService中定义了两个批量执行任务的方法,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中非常方便。invokeAll()在所有任务都完成(包括成功/被中断/...

猴子007
2017/12/02
0
0
源码|批量执行invokeAll()&&多选一invokeAny()

ExecutorService中定义了两个批量执行任务的方法,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中非常方便。invokeAll()在所有任务都完成(包括成功/被中断/超时)后才会返回,i...

猴子007
2017/12/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

网站变灰

平时到哀悼日的时候,我们看到很多知名网站都变成灰色了 只需要加下面的样式即可 html { filter: url(“data:image/svg+xml;utf8,<svg xmlns=\’http://www.w3.org/2000/svg\’><filter id=\...

三蹦子
27分钟前
36
0
网站首页蒙灰CSS样式

每次全国哀悼日,各大网站首页都变成了灰色,添加以下全局CSS样式,可以实现此效果: html,html *{  filter:gray!important;  filter:progid:DXImageTransform.Microsoft.BasicImage(...

达时索
33分钟前
74
0
1.NET Core 概述

.NET Core 概述 .NET Core是一个免费的、开源的、跨平台的、广泛使用的Web框架;它是由微软维护的。社区广泛参与支持的一个框架。.NET Core可以运行在:Windows、MacOS以及Linux操作系统上。...

osc_8j0twt2u
41分钟前
41
0
2.ASP.NET Core概述

ASP.NETCore概述 ASP.NET Core是最新的ASP.NET Web开发框架,它主要定向于运行在.NET Core平台上。 ASP.NET Core是免费的、开源的、跨平台的框架,可以用于编写基于云的应用程序,例如:Web...

osc_oa6qrgun
42分钟前
61
0
计算与软件工程作业三

作业要求 https://edu.cnblogs.com/campus/jssf/infor_computation17-31/homework/10454 我在这个课程的目标是 掌握软件开发知识,自己设计简单的程序,发布并维护 此作业在哪个具体方面帮我...

osc_kl6fknqf
42分钟前
84
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部