文档章节

ExecutorCompletionService源码分析

AbeJeffrey
 AbeJeffrey
发布于 2017/04/07 11:18
字数 888
阅读 42
收藏 2
点赞 0
评论 0

ExecutorCompletionService实现了接口CompletionService(关于CompletionService的说明可参见Executor框架详解),其内部组合了Executor,AbstractExecutorService,BlockingQueue,其中Executor用于执行任务,AbstractExecutorService负责适配返回的FutureTask对象(不太明白组合AbstractExecutorService的好处),BlockingQueue则用于存放已完成任务的Future。看源码:

    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

构造方法

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

根据指定executor创建ExecutorCompletionService对象,内部默认使用LinkedBlockingQueue作为完成队列。

    public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

根据指定executor和completionQueue创建ExecutorCompletionService对象,支持定制完成队列。

下面看看ExecutorCompletionService如何包装自己的Future。

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

内部类QueueingFuture用于包装Future,当任务完成后自动调用done()将任务返回的Future添加到队列中,done()在FutureTask的finishCompletion()中调用,可自行查看。

同时ExecutorCompletionService定义自己的实现包装任务Runnable和Callable的结果。

    //包装Callable的返回结果
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else //查看AbstractExecutorService中的newTaskFor实现可知,实际就是new FutureTask<V>(task)
             //为什么还要组合一个AbstractExecutorService呢,感觉多此一举
            return aes.newTaskFor(task);
    }
    //包装Runnable
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);//内部也是使用适配器模式将Runnable包装为Callable
        else
            return aes.newTaskFor(task, result);
    }

submit

下面看看提交任务的实现:

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

代码非常简单,具体执行都由Executor代劳。

获取结果

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

提供以上三种获取任务结果的Future,take和poll都会在取得结果后从队列移除元素,不同的是,当队列为空的表现不同,take会阻塞,poll将返回null,poll(long timeout, TimeUnit unit)则会等待指定时间。

以上则是ExecutorCompletionService的全部源码,非常简单。下面看一下ExecutorCompletionService的应用场景:

现在有一组需要并发执行的任务,各任务执行周期不相同,客户端希望完成一个任务则立即取得执行结果。该场景下,使用ExecutorCompletionService即可优雅的实现。这里选用Runnable作为任务,对ExecutorCompletionService来说,Runnable和Callable都一样,Runnable最终也会封装为Callable,代码如下:

public class CompletionServiceTest {
    private   static ExecutorService            executor;
    private   static int                        numTask=50;
    private   static CompletionService<String>  completionService;
    private static class MyTask implements Runnable{
        private String taskName;
        private MyTask(String taskName){
            this.taskName=taskName;
        }
        @Override
        public void run() {
            long s=System.currentTimeMillis();
            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(this.taskName+" finished! The total time is "+(System.currentTimeMillis()-s)+"ms");
        }
    }
    public static void main(String[] args) {
        startAll();
        try {
            get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        executor.shutdown();//切记需要关闭线程池,否则,进程不会结束
    }
    private static void startAll(){
        if(null==executor){
            executor=Executors.newFixedThreadPool(numTask);
        }
        if(null==completionService){
            completionService=new ExecutorCompletionService<String>(executor);//使用默认队列
        }
        for(int i=0;i<numTask;i++){
            completionService.submit(new MyTask("Task"+i),"task"+i+" return");
        }
    }
    private static void get() throws InterruptedException,ExecutionException{
        if(null!=completionService){
            for(int i = 0;i<numTask;i++ ){ 
                System.out.println(completionService.take().get());
            }  
        }
    }
}

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/874638

© 著作权归作者所有

共有 人打赏支持
AbeJeffrey
粉丝 29
博文 43
码字总数 116062
作品 0
杭州
高级程序员
源码 | 批量执行invokeAll() && 多选一invokeAny()

原文出处:猴子007 ExecutorService中定义了两个批量执行任务的方法,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中非常方便。invokeAll()在所有任务都完成(包括成功/被中断/...

猴子007
2017/12/02
0
0
源码|批量执行invokeAll()&&多选一invokeAny()

ExecutorService中定义了两个批量执行任务的方法,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中非常方便。invokeAll()在所有任务都完成(包括成功/被中断/超时)后才会返回,i...

猴子007
2017/12/04
0
0
java - concurrent 之 CompletionService

CompletionService /** A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks. Producers {@code submit} tasks fo......

yuzn
2016/08/17
9
0
Java Executor 框架

Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor、Executors、ExecutorService、CompletionService、Future、Callable等。(图片引用自http://www.javacl...

leesama
2014/12/01
0
0
ThreadPoolExecutor源码分析及阻塞提交任务方法

ThreadPoolExecutor源码 ThreadPoolExecutor 基本使用参考:ThreadPoolExecutor执行过程分析 线程池状态标志 ctl 保存了线程池的运行状态(runState)和线程池内有效线程数量(workerCount)...

4rnold
06/28
0
0
concurrent包学习--ThreadPoolExecutor实现

    ThreadPoolExecutor是jdk自带的线程池实现。看到了"池"一定会想到对象池模式,它是单例模式的一个变种,主要思想是通过共享复用已有的空闲对象,达到限制开销和提高性能的目的。这里...

积淀
03/05
0
0
ThreadPoolExecutor执行过程分析

ThreadPoolExecutor corePoolSize:线程池核心线程数(平时保留的线程数) maximumPoolSize:线程池最大线程数(当workQueue都放不下时,启动新线程,最大线程数) keepAliveTime:超出coreP...

4rnold
06/23
0
0
java 利用Future异步获取多线程任务结果

简述 Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口是Java线程Future模式的实现,可以来进行异步计算。 有了Future就可以进行三段式的编程了,1.启动多线程任务...

qq948939246
01/11
0
0
在Executor中一步一步提高并发

要把程序变为并发程序,首先要理清各个任务之间的边界。在大多数服务器应用程序中都存在一个明显的任务边界:即单个客户请求。但有时候任务边界也并非是显而易见的,比如在单个客户请求中仍有...

摆渡者
2016/11/10
88
0
源码之下无秘密 ── 做最好的 Netty 源码分析教程

背景 在工作中, 虽然我经常使用到 Netty 库, 但是很多时候对 Netty 的一些概念还是处于知其然, 不知其所以然的状态, 因此就萌生了学习 Netty 源码的想法. 刚开始看源码的时候, 自然是比较痛苦...

永顺
2017/11/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

前端基础

1. get请求传参长度的误区 误区:我们经常说get请求参数的大小存在限制,而post请求的参数大小是无限制的。 实际上HTTP 协议从未规定 GET/POST 的请求长度限制是多少。对get请求参数的限制是...

wenxingjun
今天
0
0
Android 复制和粘贴功能

做了一回搬运工,原文地址:https://blog.csdn.net/kennethyo/article/details/76602765 Android 复制和粘贴功能,需要调用系统服务ClipboardManager来实现。 ClipboardManager mClipboardM...

她叫我小渝
今天
0
0
拦截SQLSERVER的SSL加密通道替换传输过程中的用户名密码实现运维审计(一)

工作准备 •一台SQLSERVER 2005/SQLSERVER 2008服务 •SQLSERVER jdbc驱动程序 •Java开发环境eclipse + jdk1.8 •java反编译工具JD-Core 反编译JDBC分析SQLSERVER客户端与服务器通信原理 SQ...

紅顏為君笑
今天
6
0
jQuery零基础入门——(六)修改DOM结构

《jQuery零基础入门》系列博文是在廖雪峰老师的博文基础上,可能补充了个人的理解和日常遇到的点,用我的理解表述出来,主干出处来自廖雪峰老师的技术分享。 在《零基础入门JavaScript》的时...

JandenMa
今天
0
0
linux mint 1.9 qq 安装

转: https://www.jianshu.com/p/cdc3d03c144d 1. 下载 qq 轻聊版,可在百度搜索后下载 QQ7.9Light.exe 2. 去wine的官网(https://wiki.winehq.org/Ubuntu) 安装 wine . 提醒网页可以切换成中...

Canaan_
今天
0
0
PHP后台运行命令并管理运行程序

php后台运行命令并管理后台运行程序 class ProcessModel{ private $pid; private $command; private $resultToFile = ''; public function __construct($cl=false){......

colin_86
今天
1
0
数据结构与算法4

在此程序中,HighArray类中的find()方法用数据项的值作为参数传递,它的返回值决定是否找到此数据项。 insert()方法向数组下一个空位置放置一个新的数据项。一个名为nElems的字段跟踪记录着...

沉迷于编程的小菜菜
今天
1
1
fiddler安装和基本使用以及代理设置

项目需求 由于开发过程中客户端和服务器数据交互非常频繁,有时候服务端需要知道客户端调用接口传了哪些参数过来,这个时候就需要一个工具可以监听这些接口请求参数,已经接口的响应的数据,这种...

银装素裹
今天
0
0
Python分析《我不是药神》豆瓣评论

读取 Mongo 中的短评数据,进行中文分词 对分词结果取 Top50 生成词云 生成词云效果 看来网上关于 我不是药神 vs 达拉斯 的争论很热啊。关于词频统计就这些,代码中也会完成一些其它的分析任...

猫咪编程
今天
0
0
虚拟机怎么安装vmware tools

https://blog.csdn.net/tjcwt2011/article/details/72638977

AndyZhouX
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部