文档章节

ExecutorCompletionService

傅小水water
 傅小水water
发布于 2017/04/06 17:16
字数 747
阅读 5
收藏 0
点赞 0
评论 0

#描述
当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果。

#实现方式

##方式一
通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 。

1.	public class CompletionServiceTest {  
2.	  
3.	    static class Task implements Callable<String>{  
4.	        private int i;  
5.	          
6.	        public Task(int i){  
7.	            this.i = i;  
8.	        }  
9.	  
10.	        @Override  
11.	        public String call() throws Exception {  
12.	            Thread.sleep(10000);  
13.	            return Thread.currentThread().getName() + "执行完任务:" + i;  
14.	        }     
15.	    }  
16.	      
17.	    public static void main(String[] args){  
18.	        testUseFuture();  
19.	    }  
20.	      
21.	    private static void testUseFuture(){  
22.	        int numThread = 5;  
23.	        ExecutorService executor = Executors.newFixedThreadPool(numThread);  
24.	        List<Future<String>> futureList = new ArrayList<Future<String>>();  
25.	        for(int i = 0;i<numThread;i++ ){  
26.	            Future<String> future = executor.submit(new CompletionServiceTest.Task(i));  
27.	            futureList.add(future);  
28.	        }  
29.	                  
30.	        while(numThread > 0){  
31.	            for(Future<String> future : futureList){  
32.	                String result = null;  
33.	                try {  
34.	                    result = future.get(0, TimeUnit.SECONDS);  
35.	                } catch (InterruptedException e) {  
36.	                    e.printStackTrace();  
37.	                } catch (ExecutionException e) {  
38.	                    e.printStackTrace();  
39.	                } catch (TimeoutException e) {  
40.	                    //超时异常直接忽略  
41.	                }  
42.	                if(null != result){  
43.	                    futureList.remove(future);  
44.	                    numThread--;  
45.	                    System.out.println(result);  
46.	                    //此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)  
47.	                    break;  
48.	                }  
49.	            }  
50.	        }  
51.	    }  
52.	}

##方式二 第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。

1.	public class CompletionServiceTest {  
2.	  
3.	    static class Task implements Callable<String>{  
4.	        private int i;  
5.	          
6.	        public Task(int i){  
7.	            this.i = i;  
8.	        }  
9.	  
10.	        @Override  
11.	        public String call() throws Exception {  
12.	            Thread.sleep(10000);  
13.	            return Thread.currentThread().getName() + "执行完任务:" + i;  
14.	        }     
15.	    }  
16.	      
17.	    public static void main(String[] args) throws InterruptedException, ExecutionException{  
18.	        testExecutorCompletionService();  
19.	    }  
20.	      
21.	    private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{  
22.	        int numThread = 5;  
23.	        ExecutorService executor = Executors.newFixedThreadPool(numThread);  
24.	        CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);  
25.	        for(int i = 0;i<numThread;i++ ){  
26.	            completionService.submit(new CompletionServiceTest.Task(i));  
27.	        }  
28.	}  
29.	          
30.	        for(int i = 0;i<numThread;i++ ){       
31.	            System.out.println(completionService.take().get());  
32.	        }  
33.	          
34.	    }  

#ExecutorCompletionService分析
CompletionService是Executor和BlockingQueue的结合体。

1.	public ExecutorCompletionService(Executor executor) {  
2.	        if (executor == null)  
3.	            throw new NullPointerException();  
4.	        this.executor = executor;  
5.	        this.aes = (executor instanceof AbstractExecutorService) ?  
6.	            (AbstractExecutorService) executor : null;  
7.	        this.completionQueue = new LinkedBlockingQueue<Future<V>>();  
8.	    }

任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture。

1.	public Future<V> submit(Callable<V> task) {  
2.	        if (task == null) throw new NullPointerException();  
3.	        RunnableFuture<V> f = newTaskFor(task);  
4.	        executor.execute(new QueueingFuture(f));  
5.	        return f;  
6.	    }

QueueingFuture是FutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

1.	private class QueueingFuture extends FutureTask<Void> {  
2.	        QueueingFuture(RunnableFuture<V> task) {  
3.	            super(task, null);  
4.	            this.task = task;  
5.	        }  
6.	        protected void done() { completionQueue.add(task); }  
7.	        private final Future<V> task;  
8.	    }  

而通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。

1.	public Future<V> take() throws InterruptedException {  
2.	    return completionQueue.take();  
3.	}  
4.	  
5.	public Future<V> poll() {  
6.	    return completionQueue.poll();  
7.	}

© 著作权归作者所有

共有 人打赏支持
傅小水water
粉丝 1
博文 15
码字总数 9373
作品 0
杭州
java - concurrent 之 CompletionService

CompletionService /** A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks. Producers {@code submit} tasks fo......

yuzn ⋅ 2016/08/17 ⋅ 0

Java Executor 框架

Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor、Executors、ExecutorService、CompletionService、Future、Callable等。(图片引用自http://www.javacl...

leesama ⋅ 2014/12/01 ⋅ 0

concurrent包学习--ThreadPoolExecutor实现

    ThreadPoolExecutor是jdk自带的线程池实现。看到了"池"一定会想到对象池模式,它是单例模式的一个变种,主要思想是通过共享复用已有的空闲对象,达到限制开销和提高性能的目的。这里...

积淀 ⋅ 03/05 ⋅ 0

java 利用Future异步获取多线程任务结果

简述 Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口是Java线程Future模式的实现,可以来进行异步计算。 有了Future就可以进行三段式的编程了,1.启动多线程任务...

qq948939246 ⋅ 01/11 ⋅ 0

在Executor中一步一步提高并发

要把程序变为并发程序,首先要理清各个任务之间的边界。在大多数服务器应用程序中都存在一个明显的任务边界:即单个客户请求。但有时候任务边界也并非是显而易见的,比如在单个客户请求中仍有...

摆渡者 ⋅ 2016/11/10 ⋅ 0

源码 | 批量执行invokeAll() && 多选一invokeAny()

原文出处:猴子007 ExecutorService中定义了两个批量执行任务的方法,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中非常方便。invokeAll()在所有任务都完成(包括成功/被中断/...

猴子007 ⋅ 2017/12/02 ⋅ 0

Java线程池使用时需要注意的几点

线程池作用 CPU资源隔离 减少上下文切换 减少线程创建/关闭的资源开销 更好并发控制 更好生命周期控制 设计时注意事项 设计时,需注意: 任务混杂 任务依赖 饥饿死锁 慢操作 使用时注意事项 ...

秋雨霏霏 ⋅ 02/11 ⋅ 0

源码|批量执行invokeAll()&&多选一invokeAny()

ExecutorService中定义了两个批量执行任务的方法,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中非常方便。invokeAll()在所有任务都完成(包括成功/被中断/超时)后才会返回,i...

猴子007 ⋅ 2017/12/04 ⋅ 0

CompletionService 简介

当向Executor提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask,你可以循环获取task,并用future.get()去获取结果,但是如果这个task没有完成,你就得阻塞在这里,这个实效...

古月楼 ⋅ 2013/09/03 ⋅ 0

java.util.concurrent包(4)——Callable和Future

Callable和Future,一个产生结果,一个拿到结果。 Callable接口类似于Runnable,从名字就可以看出来了,但是Runnable不会返回结果,并且无法抛出返回结果的异常,而Callable功能更强大一些,...

woshixuye111 ⋅ 2014/06/22 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Sqoop

1.Sqoop: 《=》 SQL to Hadoop 背景 1)场景:数据在RDBMS中,我们如何使用Hive或者Hadoop来进行数据分析呢? 1) RDBMS ==> Hadoop(广义) 2) Hadoop ==> RDBMS 2)原来可以通过MapReduce I...

GordonNemo ⋅ 32分钟前 ⋅ 0

全量构建和增量构建的区别

1.全量构建每次更新时都需要更新整个数据集,增量构建只对需要更新的时间范围进行更新,所以计算量会较小。 2.全量构建查询时不需要合并不同Segment,增量构建查询时需要合并不同Segment的结...

无精疯 ⋅ 42分钟前 ⋅ 0

如何将S/4HANA系统存储的图片文件用Java程序保存到本地

我在S/4HANA的事务码MM02里为Material维护图片文件作为附件: 通过如下简单的ABAP代码即可将图片文件的二进制内容读取出来: REPORT zgos_api.DATA ls_appl_object TYPE gos_s_obj.DA...

JerryWang_SAP ⋅ 今天 ⋅ 0

云计算的选择悖论如何对待?

导读 人们都希望在工作和生活中有所选择。但心理学家的调查研究表明,在多种选项中进行选择并不一定会使人们更快乐,甚至不会产生更好的决策。心理学家Barry Schwartz称之为“选择悖论”。云...

问题终结者 ⋅ 今天 ⋅ 0

637. Average of Levels in Binary Tree - LeetCode

Question 637. Average of Levels in Binary Tree Solution 思路:定义一个map,层数作为key,value保存每层的元素个数和所有元素的和,遍历这个树,把map里面填值,遍历结束后,再遍历这个map,把每...

yysue ⋅ 今天 ⋅ 0

IDEA配置和使用

版本控制 svn IDEA版本控制工具不能使用 VCS-->Enable Version Control Integration File-->Settings-->Plugins 搜索Subversion,勾选SVN和Git插件 删除.idea文件夹重新生成项目 安装SVN客户......

bithup ⋅ 今天 ⋅ 0

PE格式第三讲扩展,VA,RVA,FA的概念

作者:IBinary 出处:http://www.cnblogs.com/iBinary/ 版权所有,欢迎保留原文链接进行转载:) 一丶VA概念 VA (virtual Address) 虚拟地址的意思 ,比如随便打开一个PE,找下它的虚拟地址 这边...

simpower ⋅ 今天 ⋅ 0

180623-SpringBoot之logback配置文件

SpringBoot配置logback 项目的日志配置属于比较常见的case了,之前接触和使用的都是Spring结合xml的方式,引入几个依赖,然后写个 logback.xml 配置文件即可,那么在SpringBoot中可以怎么做?...

小灰灰Blog ⋅ 今天 ⋅ 0

冒泡排序

原理:比较两个相邻的元素,将值大的元素交换至右端。 思路:依次比较相邻的两个数,将小数放在前面,大数放在后面。即在第一趟:首先比较第1个和第2个数,将小数放前,大数放后。然后比较第...

人觉非常君 ⋅ 今天 ⋅ 0

Vagrant setup

安装软件 brew cask install virtualboxbrew cask install vagrant 创建project mkdir -p mst/vmcd mst/vmvagrant init hashicorp/precise64vagrant up hashicorp/precise64是一个box......

遥借东风 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部