文档章节

ExecutorCompletionService源码分析

AbeJeffrey
 AbeJeffrey
发布于 2017/04/07 11:18
字数 888
阅读 48
收藏 2

ExecutorCompletionService实现了接口CompletionService(关于CompletionService的说明可参见Executor框架详解),其内部组合了Executor,AbstractExecutorService,BlockingQueue,其中Executor用于执行任务,AbstractExecutorService负责适配返回的FutureTask对象(不太明白组合AbstractExecutorService的好处),BlockingQueue则用于存放已完成任务的Future。看源码:

    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

构造方法

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

根据指定executor创建ExecutorCompletionService对象,内部默认使用LinkedBlockingQueue作为完成队列。

    public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

根据指定executor和completionQueue创建ExecutorCompletionService对象,支持定制完成队列。

下面看看ExecutorCompletionService如何包装自己的Future。

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

内部类QueueingFuture用于包装Future,当任务完成后自动调用done()将任务返回的Future添加到队列中,done()在FutureTask的finishCompletion()中调用,可自行查看。

同时ExecutorCompletionService定义自己的实现包装任务Runnable和Callable的结果。

    //包装Callable的返回结果
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else //查看AbstractExecutorService中的newTaskFor实现可知,实际就是new FutureTask<V>(task)
             //为什么还要组合一个AbstractExecutorService呢,感觉多此一举
            return aes.newTaskFor(task);
    }
    //包装Runnable
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);//内部也是使用适配器模式将Runnable包装为Callable
        else
            return aes.newTaskFor(task, result);
    }

submit

下面看看提交任务的实现:

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

代码非常简单,具体执行都由Executor代劳。

获取结果

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

提供以上三种获取任务结果的Future,take和poll都会在取得结果后从队列移除元素,不同的是,当队列为空的表现不同,take会阻塞,poll将返回null,poll(long timeout, TimeUnit unit)则会等待指定时间。

以上则是ExecutorCompletionService的全部源码,非常简单。下面看一下ExecutorCompletionService的应用场景:

现在有一组需要并发执行的任务,各任务执行周期不相同,客户端希望完成一个任务则立即取得执行结果。该场景下,使用ExecutorCompletionService即可优雅的实现。这里选用Runnable作为任务,对ExecutorCompletionService来说,Runnable和Callable都一样,Runnable最终也会封装为Callable,代码如下:

public class CompletionServiceTest {
    private   static ExecutorService            executor;
    private   static int                        numTask=50;
    private   static CompletionService<String>  completionService;
    private static class MyTask implements Runnable{
        private String taskName;
        private MyTask(String taskName){
            this.taskName=taskName;
        }
        @Override
        public void run() {
            long s=System.currentTimeMillis();
            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(this.taskName+" finished! The total time is "+(System.currentTimeMillis()-s)+"ms");
        }
    }
    public static void main(String[] args) {
        startAll();
        try {
            get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        executor.shutdown();//切记需要关闭线程池,否则,进程不会结束
    }
    private static void startAll(){
        if(null==executor){
            executor=Executors.newFixedThreadPool(numTask);
        }
        if(null==completionService){
            completionService=new ExecutorCompletionService<String>(executor);//使用默认队列
        }
        for(int i=0;i<numTask;i++){
            completionService.submit(new MyTask("Task"+i),"task"+i+" return");
        }
    }
    private static void get() throws InterruptedException,ExecutionException{
        if(null!=completionService){
            for(int i = 0;i<numTask;i++ ){ 
                System.out.println(completionService.take().get());
            }  
        }
    }
}

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/874638

© 著作权归作者所有

共有 人打赏支持
AbeJeffrey
粉丝 31
博文 43
码字总数 116062
作品 0
杭州
高级程序员
Java使用Executor执行Callable任务时的几种方法

多线程在需要返回值时,我们知道需要用到Callable和Future。Callable的cell方法可以返回一个值并且可抛出异常,是对Runnable的很好的补充;Future表示了一个任务的周期,它提供了判断任务状态...

东都大狼狗
09/09
0
0
源码 | 批量执行invokeAll() && 多选一invokeAny()

原文出处:猴子007 ExecutorService中定义了两个批量执行任务的方法,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中非常方便。invokeAll()在所有任务都完成(包括成功/被中断/...

猴子007
2017/12/02
0
0
源码|批量执行invokeAll()&&多选一invokeAny()

ExecutorService中定义了两个批量执行任务的方法,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中非常方便。invokeAll()在所有任务都完成(包括成功/被中断/超时)后才会返回,i...

猴子007
2017/12/04
0
0
从静态代码扫描引擎PMD源码学习-多线程任务模型和File过滤设计

不知不觉在工作中研究PMD并定制规则已经4个月左右了。其实PMD有许多值得我学习的源码,不过出于时间并不曾动笔。今天简单记录总结一下PMD的多线程和File过滤设计的源码。 1 public class Mul...

phinehasz
07/21
0
0
java - concurrent 之 CompletionService

CompletionService /** A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks. Producers {@code submit} tasks fo......

yuzn
2016/08/17
9
0

没有更多内容

加载失败,请刷新页面

加载更多

如何通过 J2Cache 实现分布式 session 存储

做 Java Web 开发的人多数都会需要使用到 session (会话),我们使用 session 来保存一些需要在两个不同的请求之间共享数据。一般 Java 的 Web 容器像 Tomcat、Resin、Jetty 等等,它们会在...

红薯
今天
1
0
C++ std::thread

C++11提供了std::thread类来表示一个多线程对象。 1,首先介绍一下std::this_thread命名空间: (1)std::this_thread::get_id():返回当前线程id (2)std::this_thread::yield():用户接口...

yepanl
今天
2
0
Nignx缓存文件与动态文件自动均衡的配置

下面这段nginx的配置脚本的作用是,自动判断是否存在缓存文件,如果有优先输出缓存文件,不经过php,如果没有,则回到php去处理,同时生成缓存文件。 PHP框架是ThinkPHP,最后一个rewrite有关...

swingcoder
今天
1
0
20180920 usermod命令与用户密码管理

命令 usermod usermod 命令的选项和 useradd 差不多。 一个用户可以属于多个组,但是gid只有一个;除了gid,其他的组(groups)叫做扩展组。 usermod -u 1010 username # 更改用户idusermod ...

野雪球
今天
1
0
Java网络编程基础

1. 简单了解网络通信协议TCP/IP网络模型相关名词 应用层(HTTP,FTP,DNS等) 传输层(TCP,UDP) 网络层(IP,ICMP等) 链路层(驱动程序,接口等) 链路层:用于定义物理传输通道,通常是对...

江左煤郎
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部