ExecutorCompletionService
ExecutorCompletionService
傅小水water 发表于10个月前
ExecutorCompletionService
  • 发表于 10个月前
  • 阅读 5
  • 收藏 0
  • 点赞 0
  • 评论 0

【腾讯云】如何购买服务器最划算?>>>   

#描述
当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果。

#实现方式

##方式一
通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 。

1.	public class CompletionServiceTest {  
2.	  
3.	    static class Task implements Callable<String>{  
4.	        private int i;  
5.	          
6.	        public Task(int i){  
7.	            this.i = i;  
8.	        }  
9.	  
10.	        @Override  
11.	        public String call() throws Exception {  
12.	            Thread.sleep(10000);  
13.	            return Thread.currentThread().getName() + "执行完任务:" + i;  
14.	        }     
15.	    }  
16.	      
17.	    public static void main(String[] args){  
18.	        testUseFuture();  
19.	    }  
20.	      
21.	    private static void testUseFuture(){  
22.	        int numThread = 5;  
23.	        ExecutorService executor = Executors.newFixedThreadPool(numThread);  
24.	        List<Future<String>> futureList = new ArrayList<Future<String>>();  
25.	        for(int i = 0;i<numThread;i++ ){  
26.	            Future<String> future = executor.submit(new CompletionServiceTest.Task(i));  
27.	            futureList.add(future);  
28.	        }  
29.	                  
30.	        while(numThread > 0){  
31.	            for(Future<String> future : futureList){  
32.	                String result = null;  
33.	                try {  
34.	                    result = future.get(0, TimeUnit.SECONDS);  
35.	                } catch (InterruptedException e) {  
36.	                    e.printStackTrace();  
37.	                } catch (ExecutionException e) {  
38.	                    e.printStackTrace();  
39.	                } catch (TimeoutException e) {  
40.	                    //超时异常直接忽略  
41.	                }  
42.	                if(null != result){  
43.	                    futureList.remove(future);  
44.	                    numThread--;  
45.	                    System.out.println(result);  
46.	                    //此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)  
47.	                    break;  
48.	                }  
49.	            }  
50.	        }  
51.	    }  
52.	}

##方式二 第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。

1.	public class CompletionServiceTest {  
2.	  
3.	    static class Task implements Callable<String>{  
4.	        private int i;  
5.	          
6.	        public Task(int i){  
7.	            this.i = i;  
8.	        }  
9.	  
10.	        @Override  
11.	        public String call() throws Exception {  
12.	            Thread.sleep(10000);  
13.	            return Thread.currentThread().getName() + "执行完任务:" + i;  
14.	        }     
15.	    }  
16.	      
17.	    public static void main(String[] args) throws InterruptedException, ExecutionException{  
18.	        testExecutorCompletionService();  
19.	    }  
20.	      
21.	    private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{  
22.	        int numThread = 5;  
23.	        ExecutorService executor = Executors.newFixedThreadPool(numThread);  
24.	        CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);  
25.	        for(int i = 0;i<numThread;i++ ){  
26.	            completionService.submit(new CompletionServiceTest.Task(i));  
27.	        }  
28.	}  
29.	          
30.	        for(int i = 0;i<numThread;i++ ){       
31.	            System.out.println(completionService.take().get());  
32.	        }  
33.	          
34.	    }  

#ExecutorCompletionService分析
CompletionService是Executor和BlockingQueue的结合体。

1.	public ExecutorCompletionService(Executor executor) {  
2.	        if (executor == null)  
3.	            throw new NullPointerException();  
4.	        this.executor = executor;  
5.	        this.aes = (executor instanceof AbstractExecutorService) ?  
6.	            (AbstractExecutorService) executor : null;  
7.	        this.completionQueue = new LinkedBlockingQueue<Future<V>>();  
8.	    }

任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture。

1.	public Future<V> submit(Callable<V> task) {  
2.	        if (task == null) throw new NullPointerException();  
3.	        RunnableFuture<V> f = newTaskFor(task);  
4.	        executor.execute(new QueueingFuture(f));  
5.	        return f;  
6.	    }

QueueingFuture是FutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

1.	private class QueueingFuture extends FutureTask<Void> {  
2.	        QueueingFuture(RunnableFuture<V> task) {  
3.	            super(task, null);  
4.	            this.task = task;  
5.	        }  
6.	        protected void done() { completionQueue.add(task); }  
7.	        private final Future<V> task;  
8.	    }  

而通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。

1.	public Future<V> take() throws InterruptedException {  
2.	    return completionQueue.take();  
3.	}  
4.	  
5.	public Future<V> poll() {  
6.	    return completionQueue.poll();  
7.	}
标签: 多线程
共有 人打赏支持
粉丝 1
博文 15
码字总数 9369
×
傅小水water
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: