文档章节

ExecutorCompletionService

傅小水water
 傅小水water
发布于 2017/04/06 17:16
字数 747
阅读 6
收藏 0

#描述
当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果。

#实现方式

##方式一
通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 。

1.	public class CompletionServiceTest {  
2.	  
3.	    static class Task implements Callable<String>{  
4.	        private int i;  
5.	          
6.	        public Task(int i){  
7.	            this.i = i;  
8.	        }  
9.	  
10.	        @Override  
11.	        public String call() throws Exception {  
12.	            Thread.sleep(10000);  
13.	            return Thread.currentThread().getName() + "执行完任务:" + i;  
14.	        }     
15.	    }  
16.	      
17.	    public static void main(String[] args){  
18.	        testUseFuture();  
19.	    }  
20.	      
21.	    private static void testUseFuture(){  
22.	        int numThread = 5;  
23.	        ExecutorService executor = Executors.newFixedThreadPool(numThread);  
24.	        List<Future<String>> futureList = new ArrayList<Future<String>>();  
25.	        for(int i = 0;i<numThread;i++ ){  
26.	            Future<String> future = executor.submit(new CompletionServiceTest.Task(i));  
27.	            futureList.add(future);  
28.	        }  
29.	                  
30.	        while(numThread > 0){  
31.	            for(Future<String> future : futureList){  
32.	                String result = null;  
33.	                try {  
34.	                    result = future.get(0, TimeUnit.SECONDS);  
35.	                } catch (InterruptedException e) {  
36.	                    e.printStackTrace();  
37.	                } catch (ExecutionException e) {  
38.	                    e.printStackTrace();  
39.	                } catch (TimeoutException e) {  
40.	                    //超时异常直接忽略  
41.	                }  
42.	                if(null != result){  
43.	                    futureList.remove(future);  
44.	                    numThread--;  
45.	                    System.out.println(result);  
46.	                    //此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)  
47.	                    break;  
48.	                }  
49.	            }  
50.	        }  
51.	    }  
52.	}

##方式二 第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。

1.	public class CompletionServiceTest {  
2.	  
3.	    static class Task implements Callable<String>{  
4.	        private int i;  
5.	          
6.	        public Task(int i){  
7.	            this.i = i;  
8.	        }  
9.	  
10.	        @Override  
11.	        public String call() throws Exception {  
12.	            Thread.sleep(10000);  
13.	            return Thread.currentThread().getName() + "执行完任务:" + i;  
14.	        }     
15.	    }  
16.	      
17.	    public static void main(String[] args) throws InterruptedException, ExecutionException{  
18.	        testExecutorCompletionService();  
19.	    }  
20.	      
21.	    private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{  
22.	        int numThread = 5;  
23.	        ExecutorService executor = Executors.newFixedThreadPool(numThread);  
24.	        CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);  
25.	        for(int i = 0;i<numThread;i++ ){  
26.	            completionService.submit(new CompletionServiceTest.Task(i));  
27.	        }  
28.	}  
29.	          
30.	        for(int i = 0;i<numThread;i++ ){       
31.	            System.out.println(completionService.take().get());  
32.	        }  
33.	          
34.	    }  

#ExecutorCompletionService分析
CompletionService是Executor和BlockingQueue的结合体。

1.	public ExecutorCompletionService(Executor executor) {  
2.	        if (executor == null)  
3.	            throw new NullPointerException();  
4.	        this.executor = executor;  
5.	        this.aes = (executor instanceof AbstractExecutorService) ?  
6.	            (AbstractExecutorService) executor : null;  
7.	        this.completionQueue = new LinkedBlockingQueue<Future<V>>();  
8.	    }

任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture。

1.	public Future<V> submit(Callable<V> task) {  
2.	        if (task == null) throw new NullPointerException();  
3.	        RunnableFuture<V> f = newTaskFor(task);  
4.	        executor.execute(new QueueingFuture(f));  
5.	        return f;  
6.	    }

QueueingFuture是FutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

1.	private class QueueingFuture extends FutureTask<Void> {  
2.	        QueueingFuture(RunnableFuture<V> task) {  
3.	            super(task, null);  
4.	            this.task = task;  
5.	        }  
6.	        protected void done() { completionQueue.add(task); }  
7.	        private final Future<V> task;  
8.	    }  

而通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。

1.	public Future<V> take() throws InterruptedException {  
2.	    return completionQueue.take();  
3.	}  
4.	  
5.	public Future<V> poll() {  
6.	    return completionQueue.poll();  
7.	}

© 著作权归作者所有

共有 人打赏支持
上一篇: IDEA常用快捷键
下一篇: Velocity学习记录
傅小水water
粉丝 1
博文 29
码字总数 18093
作品 0
杭州
私信 提问
Java多线程6 CompletionService

CompletionService 1 CompletionService介绍 CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象。 如果你向Executor提交了一个批处理任务...

香沙小熊
2018/11/27
0
0
java.util.concurrent.ExecutorCompletionService 源码

线程池相关 源码: 类 ExecutorCompletionService<V> 所有已实现的接口: CompletionService<V> 使用提供的 来执行任务的 。 此类将安排那些完成时提交的任务,把它们放置在可使用 take 访问...

狼王黄师傅
2018/11/29
0
0
Java使用Executor执行Callable任务时的几种方法

多线程在需要返回值时,我们知道需要用到Callable和Future。Callable的cell方法可以返回一个值并且可抛出异常,是对Runnable的很好的补充;Future表示了一个任务的周期,它提供了判断任务状态...

东都大狼狗
2018/09/09
0
0
java - concurrent 之 CompletionService

CompletionService /** A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks. Producers {@code submit} tasks fo......

yuzn
2016/08/17
9
0
Java Executor 框架

Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor、Executors、ExecutorService、CompletionService、Future、Callable等。(图片引用自http://www.javacl...

leesama
2014/12/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周一乱弹 —— 白掌柜说了卖货不卖身

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @爱漫爱 :这是一场修行分享羽肿的单曲《Moony》 手机党少年们想听歌,请使劲儿戳(这里) @clouddyy :开不开心? 开心呀, 我又不爱睡懒觉…...

小小编辑
今天
7
0
大数据教程(11.7)hadoop2.9.1平台上仓库工具hive1.2.2搭建

上一篇文章介绍了hive2.3.4的搭建,然而这个版本已经不能稳定的支持mapreduce程序。本篇博主将分享hive1.2.2工具搭建全过程。先说明:本节就直接在上一节的hadoop环境中搭建了! 一、下载apa...

em_aaron
今天
2
0
开始看《JSP&Servlet学习笔记》

1:WEB应用简介。其中1.2.1对Web容器的工作流程写得不错 2:编写Servlet。搞清楚了Java的Web目录结构,以及Web.xml的一些配置作用。特别是讲了@WebServlet标签 3:请求与响应。更细致的讲了从...

max佩恩
今天
4
0
mysql分区功能详细介绍,以及实例

一,什么是数据库分区 前段时间写过一篇关于mysql分表的的文章,下面来说一下什么是数据库分区,以mysql为例。mysql数据库中的数据是以文件的形势存在磁盘上的,默认放在/mysql/data下面(可...

吴伟祥
今天
3
0
SQL语句查询

1.1 排序 通过order by语句,可以将查询出的结果进行排序。放置在select语句的最后。 格式: SELECT * FROM 表名 ORDER BY 排序字段ASC|DESC; ASC 升序 (默认) DESC 降序 1.查询所有商品信息,...

stars永恒
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部