文档章节

ExecutorCompletionService

傅小水water
 傅小水water
发布于 2017/04/06 17:16
字数 747
阅读 5
收藏 0

#描述
当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果。

#实现方式

##方式一
通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 。

1.	public class CompletionServiceTest {  
2.	  
3.	    static class Task implements Callable<String>{  
4.	        private int i;  
5.	          
6.	        public Task(int i){  
7.	            this.i = i;  
8.	        }  
9.	  
10.	        @Override  
11.	        public String call() throws Exception {  
12.	            Thread.sleep(10000);  
13.	            return Thread.currentThread().getName() + "执行完任务:" + i;  
14.	        }     
15.	    }  
16.	      
17.	    public static void main(String[] args){  
18.	        testUseFuture();  
19.	    }  
20.	      
21.	    private static void testUseFuture(){  
22.	        int numThread = 5;  
23.	        ExecutorService executor = Executors.newFixedThreadPool(numThread);  
24.	        List<Future<String>> futureList = new ArrayList<Future<String>>();  
25.	        for(int i = 0;i<numThread;i++ ){  
26.	            Future<String> future = executor.submit(new CompletionServiceTest.Task(i));  
27.	            futureList.add(future);  
28.	        }  
29.	                  
30.	        while(numThread > 0){  
31.	            for(Future<String> future : futureList){  
32.	                String result = null;  
33.	                try {  
34.	                    result = future.get(0, TimeUnit.SECONDS);  
35.	                } catch (InterruptedException e) {  
36.	                    e.printStackTrace();  
37.	                } catch (ExecutionException e) {  
38.	                    e.printStackTrace();  
39.	                } catch (TimeoutException e) {  
40.	                    //超时异常直接忽略  
41.	                }  
42.	                if(null != result){  
43.	                    futureList.remove(future);  
44.	                    numThread--;  
45.	                    System.out.println(result);  
46.	                    //此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)  
47.	                    break;  
48.	                }  
49.	            }  
50.	        }  
51.	    }  
52.	}

##方式二 第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。

1.	public class CompletionServiceTest {  
2.	  
3.	    static class Task implements Callable<String>{  
4.	        private int i;  
5.	          
6.	        public Task(int i){  
7.	            this.i = i;  
8.	        }  
9.	  
10.	        @Override  
11.	        public String call() throws Exception {  
12.	            Thread.sleep(10000);  
13.	            return Thread.currentThread().getName() + "执行完任务:" + i;  
14.	        }     
15.	    }  
16.	      
17.	    public static void main(String[] args) throws InterruptedException, ExecutionException{  
18.	        testExecutorCompletionService();  
19.	    }  
20.	      
21.	    private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{  
22.	        int numThread = 5;  
23.	        ExecutorService executor = Executors.newFixedThreadPool(numThread);  
24.	        CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);  
25.	        for(int i = 0;i<numThread;i++ ){  
26.	            completionService.submit(new CompletionServiceTest.Task(i));  
27.	        }  
28.	}  
29.	          
30.	        for(int i = 0;i<numThread;i++ ){       
31.	            System.out.println(completionService.take().get());  
32.	        }  
33.	          
34.	    }  

#ExecutorCompletionService分析
CompletionService是Executor和BlockingQueue的结合体。

1.	public ExecutorCompletionService(Executor executor) {  
2.	        if (executor == null)  
3.	            throw new NullPointerException();  
4.	        this.executor = executor;  
5.	        this.aes = (executor instanceof AbstractExecutorService) ?  
6.	            (AbstractExecutorService) executor : null;  
7.	        this.completionQueue = new LinkedBlockingQueue<Future<V>>();  
8.	    }

任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture。

1.	public Future<V> submit(Callable<V> task) {  
2.	        if (task == null) throw new NullPointerException();  
3.	        RunnableFuture<V> f = newTaskFor(task);  
4.	        executor.execute(new QueueingFuture(f));  
5.	        return f;  
6.	    }

QueueingFuture是FutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

1.	private class QueueingFuture extends FutureTask<Void> {  
2.	        QueueingFuture(RunnableFuture<V> task) {  
3.	            super(task, null);  
4.	            this.task = task;  
5.	        }  
6.	        protected void done() { completionQueue.add(task); }  
7.	        private final Future<V> task;  
8.	    }  

而通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。

1.	public Future<V> take() throws InterruptedException {  
2.	    return completionQueue.take();  
3.	}  
4.	  
5.	public Future<V> poll() {  
6.	    return completionQueue.poll();  
7.	}

© 著作权归作者所有

共有 人打赏支持
傅小水water
粉丝 1
博文 23
码字总数 9371
作品 0
杭州
Java使用Executor执行Callable任务时的几种方法

多线程在需要返回值时,我们知道需要用到Callable和Future。Callable的cell方法可以返回一个值并且可抛出异常,是对Runnable的很好的补充;Future表示了一个任务的周期,它提供了判断任务状态...

东都大狼狗
09/09
0
0
java - concurrent 之 CompletionService

CompletionService /** A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks. Producers {@code submit} tasks fo......

yuzn
2016/08/17
9
0
Java Executor 框架

Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor、Executors、ExecutorService、CompletionService、Future、Callable等。(图片引用自http://www.javacl...

leesama
2014/12/01
0
0
concurrent包学习--ThreadPoolExecutor实现

    ThreadPoolExecutor是jdk自带的线程池实现。看到了"池"一定会想到对象池模式,它是单例模式的一个变种,主要思想是通过共享复用已有的空闲对象,达到限制开销和提高性能的目的。这里...

积淀
03/05
0
0
从静态代码扫描引擎PMD源码学习-多线程任务模型和File过滤设计

不知不觉在工作中研究PMD并定制规则已经4个月左右了。其实PMD有许多值得我学习的源码,不过出于时间并不曾动笔。今天简单记录总结一下PMD的多线程和File过滤设计的源码。 1 public class Mul...

phinehasz
07/21
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Delphi 常用API 函数(好多都没见过)

AdjustWindowRect 给定一种窗口样式,计算获得目标客户区矩形所需的窗口大小 AnyPopup 判断屏幕上是否存在任何弹出式窗口 ArrangeIconicWindows 排列一个父窗口的最小化子窗口 AttachThread...

dillonxiao
12分钟前
1
0
阿里云ubuntu配置Android开发环境编译Apk

1.命令行下载Android SDK $ wget https://dl.google.com/android/android-sdk_r24.4.1-linux.tgz $ tar zxvf android-sdk_r24.4.1-linux.tgz 2.列出可以现在的SDK ./android list sdk  -a 3......

SuShine
12分钟前
1
0
maven导出项目依赖的jar包

一、导出到默认目录 targed/dependency 从Maven项目中导出项目依赖的jar包:进入工程pom.xml 所在的目录下,执行如下命令: mvn dependency:copy-dependencies 二、导出到自定义目录中 在mav...

来来来来来
13分钟前
1
0
Win10下React Native环境安装教程及错误处理办法(实测)

https://blog.csdn.net/zhangatle/article/details/53289471 准备工作 注意:小米手机MIUI有坑,文末有解决方法 1 首先,你需要先安装Node.js并进行环境变量的配置,具体可以参考我的另一篇文...

james_laughing
13分钟前
0
0
IDEA2018 Mybatis plugin破解

IDEA2018 Mybatis plugin破解 Mybatis Plugin 一、Mybatis Plugin插件是什么 提供Mapper接口与配置文件中对应SQL的导航 编辑XML文件时自动补全 根据Mapper接口, 使用快捷键生成xml文件及SQL...

DemonsI
14分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部