java.util.concurrent.ExecutorCompletionService<V>

原创
2014/06/21 10:35
阅读数 137

具体来看一下ExecutorCompletionService是如何实现CompletionService(点击查看源代码),并且实现上篇中所说优雅的实现两种路径。

上源代码

package java.util.concurrent;

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}

ExecutorCompletionService是CompletionService(点击查看源码)的实现。ExecutorCompletionService是通过构造函数注入Executor和阻塞队列来实现任务的执行和返回的。

首先先看成员变量

private final Executor executor; 这是一个最基础的执行器(点击查看源码

private final AbstractExecutorService aes; 这是一个带生命周期的执行器(点击查看源码

private final BlockingQueue<Future<V>> completionQueue; 这是一个阻塞队列,用于保存任务执行结果


private class QueueingFuture extends FutureTask<Void>

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

这是一个内部类,主要继承了FutureTask,构造方法,将RunnableFuture提交给父类FutureTask的构造方法,同时他实现了done这个方法,意思很简单,在任务执行完成的时候,将RunnableFuture放入阻塞队列中。

可能看到这里有点晕,RunnableFuture(点击查看源码),FutureTask究竟是什么关系?

简单介绍下

RunnableFuture点击查看源码是一个接口,他继承了Runnable和Future(点击查看源码) 而 FutureTask则是RunnableFuture的实现。 是一个可同步或异步执行,可取消,可获得计算结果。

看一下他的方法


private RunnableFuture<V> newTaskFor(Callable<V> task)

private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

参数:Callable(点击查看源码) 一个执行任务

返回:RunnableFuture 上面刚介绍过

实现:

  • 如果AbstractExecutorService(点击查看源码)是空的化,那么直接实例化一个FutureTask并返回,否则调用AbstractExecutorService的newTaskFor方法

解读:实现逻辑很简单,关键在于这个逻辑判断,还记得AbstractExecutorService的newTaskFor方法吗?和上面null的实现一模一样。为什么还要这样做呢?很简单,AbstractExecutorService是可以被继承和重写的。如果传入的AbstractExecutorService是被重写过的。那么就要用重写后的AbstractExecutorService。

还记得职责单一的设计原则吗?一个类只做一件事情,如果AbstractExecutorService是负责生命周期,那么我们这里就不能去做和生命周期相关的实现。这样才是正确的。当然,如果是和Executor合作,因为Executor只负责执行策略,那么AbstractExecutorService是可以做返回结果的封装的。


private RunnableFuture<V> newTaskFor(Runnable task, V result)

同上


public ExecutorCompletionService(Executor executor)

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

参数:Executor 一个执行器

实现:

  1. 非空判断

  2. 将Executor赋值给成员变量

  3. 如果Executor是一个AbstractExecutorService,那么同时也给AbstractExecutorService赋值

  4. 新建一个链式的阻塞队列

解读:AbstractExecutorService是Executor的实现,所以当然可以赋值给Executor, 那么执行的任务就交给了Executor来完成。如果你像取消,观察执行的状态等,那么在构造的时候就应该是用AbstractExecutorService的实现。


public ExecutorCompletionService(Executor executor,  BlockingQueue<Future<V>> completionQueue)

同上, 实现阻塞队列的方式会更灵活一点。


public Future<V> submit(Callable<V> task)

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

解读:提交一个任务到Executor来执行,提交的对象会被先封装成QueueingFuture. QueueingFuture会把执行结果放入阻塞队列


public Future<V> submit(Runnable task, V result)

同上


take() poll() poll(long timeout, TimeUnit unit)

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

这三个方法放在一起讲,主要还是调用了阻塞队列的三个方法, 具体请见阻塞队列这一篇。


展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部