ExecutorCompletionService源码分析
ExecutorCompletionService源码分析
AbeJeffrey 发表于8个月前
ExecutorCompletionService源码分析
  • 发表于 8个月前
  • 阅读 34
  • 收藏 2
  • 点赞 0
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

摘要: 本文将结合ExecutorCompletionService源码分析其实现原理和应用场景。

ExecutorCompletionService实现了接口CompletionService(关于CompletionService的说明可参见https://my.oschina.net/7001/blog/873089),其内部组合了Executor,AbstractExecutorService,BlockingQueue,其中Executor用于执行任务,AbstractExecutorService负责适配返回的FutureTask对象(不太明白组合AbstractExecutorService的好处),BlockingQueue则用于存放已完成任务的Future。看源码:

    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

构造方法

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

根据指定executor创建ExecutorCompletionService对象,内部默认使用LinkedBlockingQueue作为完成队列。

    public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

根据指定executor和completionQueue创建ExecutorCompletionService对象,completionQueue作为完成队列,用户可定制自己的完成队列。

下面看看ExecutorCompletionService如何包装自己的Future。

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

内部类QueueingFuture用于包装Future,当任务完成后自动调用done()将任务返回的Future添加到队列中,done()在FutureTask的finishCompletion()中调用,可自行查看。

同时ExecutorCompletionService定义自己的实现包装任务Runnable和Callable的结果。

    //包装Callable的返回结果
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else //查看AbstractExecutorService中的newTaskFor实现可知,实际就是new FutureTask<V>(task)
             //为什么还要组合一个AbstractExecutorService呢,感觉多此一举
            return aes.newTaskFor(task);
    }
    //包装Runnable
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);//内部也是使用适配器模式将Runnable包装为Callable
        else
            return aes.newTaskFor(task, result);
    }

submit

下面看看提交任务的实现:

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

代码非常简单,具体执行都由Executor代劳。

获取结果

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

提供以上三种获取任务结果的Future,take和poll都会在取得结果后从队列移除元素,不同的是,当队列为空的表现不同,take会阻塞,poll将返回null,poll(long timeout, TimeUnit unit)则会等待指定时间。

以上则是ExecutorCompletionService的全部源码,非常简单。下面看一下ExecutorCompletionService的应用场景:

现在有一组需要并发执行的任务,各任务执行周期不相同,客户端希望完成一个任务则立即取得执行结果。该场景下,使用ExecutorCompletionService即可优雅的实现。这里选用Runnable作为任务,上文已经说明,对ExecutorCompletionService来说,Runnable和Callable都一样,Runnable最终也会封装为Callable,代码如下:

public class CompletionServiceTest {
    private   static ExecutorService            executor;
    private   static int                        numTask=50;
    private   static CompletionService<String>  completionService;
    private static class MyTask implements Runnable{
        private String taskName;
        private MyTask(String taskName){
            this.taskName=taskName;
        }
        @Override
        public void run() {
            long s=System.currentTimeMillis();
            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(this.taskName+" finished! The total time is "+(System.currentTimeMillis()-s)+"ms");
        }
    }
    public static void main(String[] args) {
        startAll();
        try {
            get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        executor.shutdown();//切记需要关闭线程池,否则,进程不会结束
    }
    private static void startAll(){
        if(null==executor){
            executor=Executors.newFixedThreadPool(numTask);
        }
        if(null==completionService){
            completionService=new ExecutorCompletionService<String>(executor);//使用默认队列
        }
        for(int i=0;i<numTask;i++){
            completionService.submit(new MyTask("Task"+i),"task"+i+" return");
        }
    }
    private static void get() throws InterruptedException,ExecutionException{
        if(null!=completionService){
            for(int i = 0;i<numTask;i++ ){ 
                System.out.println(completionService.take().get());
            }  
        }
    }
}

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/874638

共有 人打赏支持
粉丝 22
博文 38
码字总数 98171
×
AbeJeffrey
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: