文档章节

java - concurrent 之 CompletionService

yuzn
 yuzn
发布于 2016/08/17 16:40
字数 704
阅读 9
收藏 0
点赞 0
评论 0
  • CompletionService
/**
 * A service that decouples the production of new asynchronous tasks
 * from the consumption of the results of completed tasks.  Producers
 * {@code submit} tasks for execution. Consumers {@code take}
 * completed tasks and process their results in the order they
 * complete.  A {@code CompletionService} can for example be used to
 * manage asynchronous I/O, in which tasks that perform reads are
 * submitted in one part of a program or system, and then acted upon
 * in a different part of the program when the reads complete,
 * possibly in a different order than they were requested.**/

参与java doc可以看到如上描述。简单来说就是CompletionService使(批)任务异步执行与任务结果处理分离:即生产者执行任务,消费者处理任务结果;并且在后面描述一个在异步I/O上的一个使用场景。

  • API
Future<V> submit(Callable<V> task);
@param result the result to return upon successful completion
Future<V> submit(Runnable task, V result);
/**
 * 此方法阻塞获取已完成的任务Future,并从任务列表中移除
 * @return the Future representing the next completed task
 * @throws InterruptedException if interrupted while waiting
 */
Future<V> take() throws InterruptedException;
/**
 * 比较take,此方法是非阻塞的,如果没有完成的任务,返回null
 */
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
  • 实现 ExecutorCompletionService

ExecutorCompletionService是CompletionService的唯一实现

private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
public ExecutorCompletionService(Executor executor) {
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)

每个任务的提交都会构造一个QueueingFutrue

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

而QueueingFutrue有个回调方法,在任务执行完成后,放到completionQueue阻塞队列中

protected void done() { completionQueue.add(task); }

由此实现方式很明显了。

  • Demo

查询文档可以看到官方提供的案例

/** Suppose you have a set of solvers for a certain problem, each
* returning a value of some type {@code Result}, and would like to
* run them concurrently, processing the results of each of them that
* return a non-null value, in some method {@code use(Result r)}. You
* could write this as:
* 大意就是:假设你有一批需要回执的任务要并发处理,就可以使用如下方式(ps: 也可以利用Futrue方式的实现
* ,这里不再说明)
*/
 void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException, ExecutionException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     for (Callable<Result> s : solvers)
         ecs.submit(s);
     int n = solvers.size();
     for (int i = 0; i < n; ++i) {
         Result r = ecs.take().get();
         if (r != null)
             use(r);
     }
}

/** Suppose instead that you would like to use the first non-null result
* of the set of tasks, ignoring any that encounter exceptions,
* and cancelling all other tasks when the first one is ready:
* 大意是:如果只想得到率先执行完任务的返回值,忽略其他的任务执行情况,并且在第一个任务执行结束后取消其他任务
*/

 void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     int n = solvers.size();
     List<Future<Result>> futures
         = new ArrayList<Future<Result>>(n);
     Result result = null;
     try {
         for (Callable<Result> s : solvers)
             futures.add(ecs.submit(s));
         for (int i = 0; i < n; ++i) {
             try {
                 Result r = ecs.take().get();
                 if (r != null) {
                     result = r;
                     break;
                 }
             } catch (ExecutionException ignore) {}
         }
     }
    finally {
         for (Future<Result> f : futures)
             f.cancel(true);
     }

     if (result != null)
         use(result);
}
  •  

© 著作权归作者所有

共有 人打赏支持
yuzn
粉丝 13
博文 25
码字总数 14730
作品 0
项目经理
javacv转流,jvm异常停止

下面是hserrpid.log的异常信息 # SIGSEGV (0xb) at pc=0x00007f399ca531ff, pid=4843, tid=0x00007f399dfa3700 JRE version: Java(TM) SE Runtime Environment (8.0161-b12) (build 1.8.0161......

菩提树下的猫 ⋅ 05/02 ⋅ 0

JVM自动内存管理机制—读这篇就够了

之前看过JVM的相关知识,当时没有留下任何学习成果物,有些遗憾。这次重新复习了下,并通过博客来做下笔记(只能记录一部分,因为写博客真的很花时间),也给其他同行一些知识分享。 Java自动内...

java高级架构牛人 ⋅ 06/13 ⋅ 0

Java编程基础知识点和技术点归纳

Java是一种可以撰写跨平台应用软件的面向对象的程序设计语言。Java 技术具有卓越的通用性、高效性、平台移植性和安全性,广泛应用于PC、数据中心、游戏控制台、科学超级计算机、移动电话和互...

Java小辰 ⋅ 05/23 ⋅ 0

Java 面试知识点解析(三)——JVM篇

前言: 在遨游了一番 Java Web 的世界之后,发现了自己的一些缺失,所以就着一篇深度好文:知名互联网公司校招 Java 开发岗面试知识点解析 ,来好好的对 Java 知识点进行复习和学习一番,大部...

我没有三颗心脏 ⋅ 05/16 ⋅ 0

CentOS 6.5 安装JDK(包含卸载原有默认JDK)

卸载原有1.7 JDK 查看是否安装了JDK 若有内容就进一步查看JDK信息 卸载 安装jdk ===================================== 安装wget 新建目录 进入目录 下载JDK 安装JDK 配置环境变量 往文件内...

阿白 ⋅ 05/23 ⋅ 0

Java就业变难了?你需要对自己有点信心

伴随着IT的火热,越来越多的人进入了IT领域,这在进一步推动着IT发展的同时也极大增加了就业压力。伴随着激烈的岗位竞争,越来越多的人开始感叹工作难找,越火的行业越是如此,Java自是首当其...

糖宝_d864 ⋅ 06/08 ⋅ 0

sharding-jdbc源码分析—准备工作

原文作者:阿飞Javaer 原文链接:https://www.jianshu.com/p/7831817c1da8 接下来对sharding-jdbc源码的分析基于tag为源码,根据sharding-jdbc Features深入学习sharding-jdbc的几个主要特性...

飞哥-Javaer ⋅ 05/03 ⋅ 0

MVN package install error javac: invalid target release: 1.8

现象: --------------------------------- [ERROR] Failure executing javac, but could not parse the error: javac: invalid target release: 1.8 Usage: javac <options> <source files>......

孟飞阳 ⋅ 05/04 ⋅ 0

Oracle Java Mission Control 帮助

缩写 含义 JDK Java 开发工具包 JDP Java Discovery Protocol JFR Java 飞行记录器 JMC Java Mission Control JMX Java Management Extensions JVM Java 虚拟机 MBean 托管 Bean (Java) RCP ......

光斑 ⋅ 04/27 ⋅ 0

培训云计算学校,虚拟机基本结构讲解

我们要对JVM虚拟机的结构有一个感性的认知。毕竟我们不是编程人员,认知程度达不到那么深入。一个运行时的Java虚拟机实例的天职是:负责运行一个java程序。当启动一个Java程序时,一个虚拟机...

长沙千锋 ⋅ 05/17 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

1.01-StringUtils的使用

import org.apache.commons.lang.StringUtils; 一、StringUtils 常用的方法: 1. 判断某一字符串是否为空 , 为空的标准是 str==null 或 str.length()==0 StringUtils.isEmpty(null) ==>tr......

静以修身2025 ⋅ 20分钟前 ⋅ 0

几道Spring 面试题

1、BeanFactory 接口和 ApplicationContext 接口有什么区别? ApplicationContext 接口继承BeanFactory接口 Spring核心工厂是BeanFactory BeanFactory采取延迟加载,第一次getBean时才会初始...

职业搬砖20年 ⋅ 21分钟前 ⋅ 0

包饺子

http://storage.slide.news.sina.com.cn/slidenews/77_ori/2018_24/74766_826131_625489.gif

霜叶情 ⋅ 23分钟前 ⋅ 0

兑吧:从自建HBase迁移到阿里云HBase实战经验

摘要: 业务介绍 兑吧集团包含兑吧网络和推啊网络,兑吧网络是一家致力于帮助互联网企业提升运营效率的用户运营服务平台,提供积分商城和媒体运营服务。推啊网络是一家互动式广告平台,经过多...

猫耳m ⋅ 34分钟前 ⋅ 0

xml解析

方法一: String s_xml1 = "<xml>" + "<head>lalalalal</head>" + "<body>1234</body>" + "</xml>"; try { DocumentBuilderFactory documentBuilderFactory......

GithubXD ⋅ 35分钟前 ⋅ 0

reuse stream

Although Java streams were designed to be operated only once, programmers still ask how to reuse a stream. From a simple web search, we can find many posts with this same issue ......

idoz ⋅ 35分钟前 ⋅ 0

兑吧:从自建HBase迁移到阿里云HBase实战经验

摘要: 业务介绍 兑吧集团包含兑吧网络和推啊网络,兑吧网络是一家致力于帮助互联网企业提升运营效率的用户运营服务平台,提供积分商城和媒体运营服务。推啊网络是一家互动式广告平台,经过多...

阿里云云栖社区 ⋅ 37分钟前 ⋅ 0

从世界杯看国内运动体育社交新能量

2018年世界杯已正式拉开帷幕,一场全世界球迷的狂欢也正式开始。 世界杯影响力:30亿+球迷的狂欢+社交话题 世界杯这个超级IP和对社交网络的引爆让更多的人目光聚焦到国内运动体育社交这个层...

ThinkSNS账号 ⋅ 37分钟前 ⋅ 0

不固定值替换

<?php$arr = 20;$data = str_replace(array(10,20,30,40),array("blue","red","green","yellow"),$arr);print_r($data);...

nsns ⋅ 39分钟前 ⋅ 0

Job for nginx.service failed 错误解决方案

今天刚在centos7上安装了nginx-1.2.11,/etc/init.d/nginx start启动时,出现 Job for nginx.service failed because the control process exited with error code. See "systemctl status n......

河图再现 ⋅ 40分钟前 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部