文档章节

java - concurrent 之 CompletionService

yuzn
 yuzn
发布于 2016/08/17 16:40
字数 704
阅读 10
收藏 0
  • CompletionService
/**
 * A service that decouples the production of new asynchronous tasks
 * from the consumption of the results of completed tasks.  Producers
 * {@code submit} tasks for execution. Consumers {@code take}
 * completed tasks and process their results in the order they
 * complete.  A {@code CompletionService} can for example be used to
 * manage asynchronous I/O, in which tasks that perform reads are
 * submitted in one part of a program or system, and then acted upon
 * in a different part of the program when the reads complete,
 * possibly in a different order than they were requested.**/

参与java doc可以看到如上描述。简单来说就是CompletionService使(批)任务异步执行与任务结果处理分离:即生产者执行任务,消费者处理任务结果;并且在后面描述一个在异步I/O上的一个使用场景。

  • API
Future<V> submit(Callable<V> task);
@param result the result to return upon successful completion
Future<V> submit(Runnable task, V result);
/**
 * 此方法阻塞获取已完成的任务Future,并从任务列表中移除
 * @return the Future representing the next completed task
 * @throws InterruptedException if interrupted while waiting
 */
Future<V> take() throws InterruptedException;
/**
 * 比较take,此方法是非阻塞的,如果没有完成的任务,返回null
 */
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
  • 实现 ExecutorCompletionService

ExecutorCompletionService是CompletionService的唯一实现

private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
public ExecutorCompletionService(Executor executor) {
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)

每个任务的提交都会构造一个QueueingFutrue

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

而QueueingFutrue有个回调方法,在任务执行完成后,放到completionQueue阻塞队列中

protected void done() { completionQueue.add(task); }

由此实现方式很明显了。

  • Demo

查询文档可以看到官方提供的案例

/** Suppose you have a set of solvers for a certain problem, each
* returning a value of some type {@code Result}, and would like to
* run them concurrently, processing the results of each of them that
* return a non-null value, in some method {@code use(Result r)}. You
* could write this as:
* 大意就是:假设你有一批需要回执的任务要并发处理,就可以使用如下方式(ps: 也可以利用Futrue方式的实现
* ,这里不再说明)
*/
 void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException, ExecutionException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     for (Callable<Result> s : solvers)
         ecs.submit(s);
     int n = solvers.size();
     for (int i = 0; i < n; ++i) {
         Result r = ecs.take().get();
         if (r != null)
             use(r);
     }
}

/** Suppose instead that you would like to use the first non-null result
* of the set of tasks, ignoring any that encounter exceptions,
* and cancelling all other tasks when the first one is ready:
* 大意是:如果只想得到率先执行完任务的返回值,忽略其他的任务执行情况,并且在第一个任务执行结束后取消其他任务
*/

 void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     int n = solvers.size();
     List<Future<Result>> futures
         = new ArrayList<Future<Result>>(n);
     Result result = null;
     try {
         for (Callable<Result> s : solvers)
             futures.add(ecs.submit(s));
         for (int i = 0; i < n; ++i) {
             try {
                 Result r = ecs.take().get();
                 if (r != null) {
                     result = r;
                     break;
                 }
             } catch (ExecutionException ignore) {}
         }
     }
    finally {
         for (Future<Result> f : futures)
             f.cancel(true);
     }

     if (result != null)
         use(result);
}
  •  

© 著作权归作者所有

共有 人打赏支持
yuzn
粉丝 13
博文 32
码字总数 14772
作品 0
项目经理
私信 提问
Java 并发:Executors 和线程池

本文译自:Java Concurrency – Part 7 : Executors and thread pools 让我们开始来从入门了解一下 Java 的并发编程。 本文主要介绍如何开始创建线程以及管理线程池,在 Java 语言中,一个最...

红薯
2010/09/15
32.9K
18
JDK5多线程框架java.util.concurrent

JDK5中的一个亮点就是将Doug Lea的并发库引入到Java标准库中。Doug Lea确实是一个牛人,能教书,能出书,能编码,不过这在国外还是比较普遍的,而国内的教授们就相差太远了。 一般的服务器都...

johnnyhg
2009/05/08
1K
2
Java里的nio和concurrent是在什么时候使用的?是在高性能web应用中使用吗?

Java里的nio和concurrent是在什么应用中使用的? 是在高并发的网站中使用的吗? 我发现只有少数的工作强调core Java的multi-thread,比如concurrent,这是为什么?...

文心雕码
2013/12/20
926
4
Java 并发 – 第七部分:Executors 与线程池

现在让我们开始 Java 并发系列的新篇章。这壹次我们会学习如何干净的启动壹個新线程,以及如何在线程池中管理它。在 Java 中,假设你有壹個像下面这样的 Runnable 线程: Runnable runnable ...

苗哥
2013/10/07
0
5
ThreadLocal in Java - Example Program and Tutorial

ThreadLocal in Java is another way to achieve thread-safety apart from writing immutable classes. If you have been writing multi-threaded or concurrent code in Java then you mus......

perfectspr
2014/12/10
0
0

没有更多内容

加载失败,请刷新页面

加载更多

border实现等高布局

效果图 实现上图效果的全部html+css代码 <div class="box"> <nav> <h3 class="nav">导航1</h3> <h3 class="nav">导航2</h3> </nav> <section> <div cla......

呵呵闯
13分钟前
0
0
MaxCompute 表(Table)设计规范

表的限制项 表(Table)设计规范 表设计主要目标 表设计的影响 表设计步骤 表数据存储规范 按数据分层规范数据生命周期 按数据的变更和历史规范数据的保存 数据导入通道与表设计 分区设计与逻辑...

阿里云云栖社区
20分钟前
0
0
局域网共享文件读写的实现方式

代码片段 首先是设置共享目录,支持用户和密码等权限控制 然后我们可以使用Windows资源管理器操作共享目录下的文件 这中间隐藏了资源管理器帮我们建立目录映射和连接的过程,如果设置了用户名...

夏至如沫
29分钟前
2
0
Elasticsearch安装与配置

一、Docker安装ES 开发模式 可以使用以下命令快速启动Elasticsearch以进行开发或测试: $ docker run -p 9200:9200 -p 9300:9300 -d --name es -e "discovery.type=single-node" docker.ela...

吴伟祥
35分钟前
1
0
移动页面滚动穿透解决方案(荐)

移动页面滚动穿透解决方法目前有多种解决方案,我介绍下几种方案: 解决方案1:阻止冒泡。 //关键代码$(".sliders,.modals").on("touchmove",function(event){    event.preventDefau...

壹峰
36分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部