文档章节

ExecutorCompletionService

傅小水water
 傅小水water
发布于 2017/04/06 17:16
字数 747
阅读 5
收藏 0

#描述
当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果。

#实现方式

##方式一
通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 。

1.	public class CompletionServiceTest {  
2.	  
3.	    static class Task implements Callable<String>{  
4.	        private int i;  
5.	          
6.	        public Task(int i){  
7.	            this.i = i;  
8.	        }  
9.	  
10.	        @Override  
11.	        public String call() throws Exception {  
12.	            Thread.sleep(10000);  
13.	            return Thread.currentThread().getName() + "执行完任务:" + i;  
14.	        }     
15.	    }  
16.	      
17.	    public static void main(String[] args){  
18.	        testUseFuture();  
19.	    }  
20.	      
21.	    private static void testUseFuture(){  
22.	        int numThread = 5;  
23.	        ExecutorService executor = Executors.newFixedThreadPool(numThread);  
24.	        List<Future<String>> futureList = new ArrayList<Future<String>>();  
25.	        for(int i = 0;i<numThread;i++ ){  
26.	            Future<String> future = executor.submit(new CompletionServiceTest.Task(i));  
27.	            futureList.add(future);  
28.	        }  
29.	                  
30.	        while(numThread > 0){  
31.	            for(Future<String> future : futureList){  
32.	                String result = null;  
33.	                try {  
34.	                    result = future.get(0, TimeUnit.SECONDS);  
35.	                } catch (InterruptedException e) {  
36.	                    e.printStackTrace();  
37.	                } catch (ExecutionException e) {  
38.	                    e.printStackTrace();  
39.	                } catch (TimeoutException e) {  
40.	                    //超时异常直接忽略  
41.	                }  
42.	                if(null != result){  
43.	                    futureList.remove(future);  
44.	                    numThread--;  
45.	                    System.out.println(result);  
46.	                    //此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)  
47.	                    break;  
48.	                }  
49.	            }  
50.	        }  
51.	    }  
52.	}

##方式二 第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。

1.	public class CompletionServiceTest {  
2.	  
3.	    static class Task implements Callable<String>{  
4.	        private int i;  
5.	          
6.	        public Task(int i){  
7.	            this.i = i;  
8.	        }  
9.	  
10.	        @Override  
11.	        public String call() throws Exception {  
12.	            Thread.sleep(10000);  
13.	            return Thread.currentThread().getName() + "执行完任务:" + i;  
14.	        }     
15.	    }  
16.	      
17.	    public static void main(String[] args) throws InterruptedException, ExecutionException{  
18.	        testExecutorCompletionService();  
19.	    }  
20.	      
21.	    private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{  
22.	        int numThread = 5;  
23.	        ExecutorService executor = Executors.newFixedThreadPool(numThread);  
24.	        CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);  
25.	        for(int i = 0;i<numThread;i++ ){  
26.	            completionService.submit(new CompletionServiceTest.Task(i));  
27.	        }  
28.	}  
29.	          
30.	        for(int i = 0;i<numThread;i++ ){       
31.	            System.out.println(completionService.take().get());  
32.	        }  
33.	          
34.	    }  

#ExecutorCompletionService分析
CompletionService是Executor和BlockingQueue的结合体。

1.	public ExecutorCompletionService(Executor executor) {  
2.	        if (executor == null)  
3.	            throw new NullPointerException();  
4.	        this.executor = executor;  
5.	        this.aes = (executor instanceof AbstractExecutorService) ?  
6.	            (AbstractExecutorService) executor : null;  
7.	        this.completionQueue = new LinkedBlockingQueue<Future<V>>();  
8.	    }

任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture。

1.	public Future<V> submit(Callable<V> task) {  
2.	        if (task == null) throw new NullPointerException();  
3.	        RunnableFuture<V> f = newTaskFor(task);  
4.	        executor.execute(new QueueingFuture(f));  
5.	        return f;  
6.	    }

QueueingFuture是FutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

1.	private class QueueingFuture extends FutureTask<Void> {  
2.	        QueueingFuture(RunnableFuture<V> task) {  
3.	            super(task, null);  
4.	            this.task = task;  
5.	        }  
6.	        protected void done() { completionQueue.add(task); }  
7.	        private final Future<V> task;  
8.	    }  

而通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。

1.	public Future<V> take() throws InterruptedException {  
2.	    return completionQueue.take();  
3.	}  
4.	  
5.	public Future<V> poll() {  
6.	    return completionQueue.poll();  
7.	}

© 著作权归作者所有

共有 人打赏支持
上一篇: IDEA常用快捷键
下一篇: Velocity学习记录
傅小水water
粉丝 1
博文 29
码字总数 18093
作品 0
杭州
私信 提问
Java使用Executor执行Callable任务时的几种方法

多线程在需要返回值时,我们知道需要用到Callable和Future。Callable的cell方法可以返回一个值并且可抛出异常,是对Runnable的很好的补充;Future表示了一个任务的周期,它提供了判断任务状态...

东都大狼狗
09/09
0
0
java - concurrent 之 CompletionService

CompletionService /** A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks. Producers {@code submit} tasks fo......

yuzn
2016/08/17
9
0
Java Executor 框架

Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor、Executors、ExecutorService、CompletionService、Future、Callable等。(图片引用自http://www.javacl...

leesama
2014/12/01
0
0
concurrent包学习--ThreadPoolExecutor实现

    ThreadPoolExecutor是jdk自带的线程池实现。看到了"池"一定会想到对象池模式,它是单例模式的一个变种,主要思想是通过共享复用已有的空闲对象,达到限制开销和提高性能的目的。这里...

积淀
03/05
0
0
从静态代码扫描引擎PMD源码学习-多线程任务模型和File过滤设计

不知不觉在工作中研究PMD并定制规则已经4个月左右了。其实PMD有许多值得我学习的源码,不过出于时间并不曾动笔。今天简单记录总结一下PMD的多线程和File过滤设计的源码。 1 public class Mul...

phinehasz
07/21
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Win10:默认的图片打开应用,打开图片时速度明显很慢的解决办法

首先,我们随便地打开一张图片。然后,点击右上角的三个小点,最后点击弹出菜单最下面的“设置”。如下图: 在“设置”中找到下面的“人物”,把它关掉就好了。 原来,默认情况下,Win 10的图...

LivingInFHL
57分钟前
2
0
js代码激发onchange事件,兼容谷歌火狐IE

var el = document.getElementsByName('role')[0]; el.value = '3'; var evt = document.createEvent("HTMLEvents"); evt.initEvent("change", false, true); el.dispatchEvent(evt);......

我退而结网
今天
3
0
mysql客户端报错:libmysqlclient_16 not defined in file libmysqlclient.so.16

报错情况: 安装完mydumper之后(上一篇文章),登陆Mysql客户端报错:version libmysqlclient_16 not defined in file libmysqlclient.so.16 with link time reference 同样:mysql的其他客...

machogyb
今天
1
0
MySQL 数据库中间件 安装部署测试全过程

1、环境准备 1.1、操作系统环境 [root@MyCat conf]# uname -aLinux MyCat 2.6.32-431.el6.x86_64 #1 SMP Sun Nov 10 22:19:54 EST 2013 x86_64 x86_64 x86_64 GNU/Linux 1.2、关闭SELIN......

PeakFang-BOK
今天
6
0
Linux Mysql 安装

https://www.cnblogs.com/xinjing-jingxin/p/8025805.html

流氓兔-
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部