文档章节

Java并发编程:Callable、Future和FutureTask原理解析

刺猬一号
 刺猬一号
发布于 2017/05/25 11:53
字数 2831
阅读 14
收藏 0

返回结果的任务Callable与Future

Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,它不能返回一个值或抛出一个受检查的异常。Runnable接口:

public interface Runnable {
    public abstract void run();
}
  • 由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

许多任务实际上都是存在延迟的计算,对于这些任务,Callable是一种更好的抽象:它会返回一个值,并可能抛出一个异常。Callable接口:

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
  • 可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

Runnable和Callable描述的都是抽象的计算任务。这些任务通常是有生命周期的。Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。由于有些任务可能要执行很长时间,因此通常希望可以取消这些任务。在Executor框架中,已提交但尚未开始的任务可以取消,对于已经开始执行的任务,只有当它们响应中断时才能取消。

Future表示一个任务的生命周期,并提供了方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。Future接口:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • 在Future接口中声明了5个方法,下面依次解释每个方法的作用:
  1. cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。

  2. isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

  3. isDone方法表示任务是否已经完成,若任务完成,则返回true;

  4. get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

  5. get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

也就是说实际上Future提供了三种功能:

  1. 判断任务是否完成;
  2. 中断任务;
  3. 获取任务执行结果。

Future与Callable的关系与ExecutorService与Executor的关系对应。

FutureTask

Future只是一个接口,无法直接创建对象,因此有了FutureTask。 
我们先来看下FutureTask的实现:

public class FutureTask<V> implements RunnableFuture<V>
  • FutureTask类实现了RunnableFuture接口:
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
  • RunnableFuture继承了Runnable和Future接口,而FutureTask实现了RunnableFuture接口。

FutureTask的继承关系和方法如图所示: 
FutureTask

FutureTask是一个可取消的异步计算,FutureTask 实现了Future的基本方法,提供start cancel 操作,可以查询计算是否已经完成,并且可以获取计算的结果。结果只可以在计算完成之后获取,get方法会阻塞当计算没有完成的时候,一旦计算已经完成, 那么计算就不能再次启动或是取消。

一个FutureTask 可以用来包装一个 Callable 或是一个Runnable对象。因为FurtureTask实现了Runnable方法,所以一个 FutureTask可以提交(submit)给一个Excutor执行(excution). 它同时实现了Callable, 所以也可以作为Future得到Callable的返回值。

FutureTask有两个很重要的属性分别是state和runner, FutureTask之所以支持canacel操作,也是因为这两个属性。 
其中state为枚举值:

    private volatile int state; // 注意volatile关键字
    /**
     * 在构建FutureTask时设置,同时也表示内部成员callable已成功赋值,
     * 一直到worker thread完成FutureTask中的run();
     */
    private static final int NEW = 0;

    /**
     * woker thread在处理task时设定的中间状态,处于该状态时,
     * 说明worker thread正准备设置result.
     */
    private static final int COMPLETING = 1;

    /**
     * 当设置result结果完成后,FutureTask处于该状态,代表过程结果,
     * 该状态为最终状态final state,(正确完成的最终状态)
     */
    private static final int NORMAL = 2;

    /**
     * 同上,只不过task执行过程出现异常,此时结果设值为exception,
     * 也是final state
     */
    private static final int EXCEPTIONAL = 3;

    /**
     * final state, 表明task被cancel(task还没有执行就被cancel的状态).
     */
    private static final int CANCELLED = 4;

    /**
     * 中间状态,task运行过程中被interrupt时,设置的中间状态
     */
    private static final int INTERRUPTING = 5;

    /**
     * final state, 中断完毕的最终状态,几种情况,下面具体分析
     */
    private static final int INTERRUPTED = 6;
  • state初始化为NEW。只有在set, setException和cancel方法中state才可以转变为终态。在任务完成期间,state的值可能为COMPLETING或INTERRUPTING。 

state有四种可能的状态转换:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

其他成员变量:

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;   // 具体run运行时会调用其方法call(),并获得结果,结果时置为null.

    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes   没必要为votaile,因为其是伴随state 进行读写,而state是FutureTask的主导因素。

    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;   //具体的worker thread.

    /** Treiber stack of waiting threads */  
    private volatile WaitNode waiters;     //Treiber stack 并发stack数据结构,用于存放阻塞在该futuretask#get方法的线程。

下面分析下Task的状态变化,也就一个任务的生命周期: 
创建一个FutureTask首先调用构造方法:

 public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
 }
  • 此时将state设置为初始态NEW。这里注意Runnable是怎样转换为Callable的,看下this.callable = Executors.callable(runnable, result); 调用Executors.callable:
public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
}
  •  
static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
}
  • 其实就是通过Callable的call方法调用Runnable的run方法,把传入的 T result 作为callable的返回结果;

当创建完一个Task通常会提交给Executors来执行,当然也可以使用Thread来执行,Thread的start()方法会调用Task的run()方法。看下FutureTask的run()方法的实现:

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
  • 首先判断任务的状态,如果任务状态不是new,说明任务状态已经改变(说明他已经走了上面4种可能变化的一种,比如caller调用了cancel,此时状态为Interrupting, 也说明了上面的cancel方法,task没运行时,就interrupt, task得不到运行,总是返回);

如果状态是new, 判断runner是否为null, 如果为null, 则把当前执行任务的线程赋值给runner,如果runner不为null, 说明已经有线程在执行,返回。此处使用cas来赋值worker thread是保证多个线程同时提交同一个FutureTask时,确保该FutureTask的run只被调用一次, 如果想运行多次,使用runAndReset()方法。

这里

!UNSAFE.compareAndSwapObject(this, runnerOffset,                                   null, Thread.currentThread())
  • 语义相当于
if (this.runner == null ){
    this.runner = Thread.currentThread();
}

接着开始执行任务,如果要执行的任务不为空,并且state为New就执行,可以看到这里调用了Callable的call方法。如果执行成功则set结果,如果出现异常则setException。最后把runner设为null。

接着看下set方法:

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
  • 如果现在的状态是NEW就把状态设置成COMPLETING,然后设置成NORMAL。这个执行流程的状态变化就是: NEW->COMPLETING->NORMAL。

最后执行finishCompletion()方法:

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
 }

接下来分析FutureTask非常重要的get方法:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
  • 首先判断FutureTask的状态是否为完成状态,如果是完成状态,说明已经执行过set或setException方法,返回report(s):
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
  • 可以看到,如果FutureTask的状态是NORMAL, 即正确执行了set方法,get方法直接返回处理的结果, 如果是取消状态,即执行了setException,则抛出CancellationException异常。

如果get时,FutureTask的状态为未完成状态,则调用awaitDone方法进行阻塞。awaitDone():

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
  • awaitDone方法可以看成是不断轮询查看FutureTask的状态。在get阻塞期间:
  • 如果执行get的线程被中断,则移除FutureTask的所有阻塞队列中的线程(waiters),并抛出中断异常;

  • 如果FutureTask的状态转换为完成状态(正常完成或取消),则返回完成状态;

  • 如果FutureTask的状态变为COMPLETING, 则说明正在set结果,此时让线程等一等;

  • 如果FutureTask的状态为初始态NEW,则将当前线程加入到FutureTask的阻塞线程中去;

  • 如果get方法没有设置超时时间,则阻塞当前调用get线程;如果设置了超时时间,则判断是否达到超时时间,如果到达,则移除FutureTask的所有阻塞列队中的线程,并返回此时FutureTask的状态,如果未到达时间,则在剩下的时间内继续阻塞当前线程。

注:本文源码基于JDK1.7。(1.6通过AQS实现)

© 著作权归作者所有

刺猬一号
粉丝 12
博文 373
码字总数 616361
作品 0
深圳
私信 提问
java并发编程——FutureTask源码分析

FutureTask的简单示例: FutureTask的应用场景,如果在当前线程中需要执行比较耗时的操作,但又不想阻塞当前线程时,可以把这些作业交给FutureTask,另开一个线程在后台完成,当当前线程将来...

长头发-dawn
2018/09/07
0
0
java Future 接口介绍

在Java中,如果需要设定代码执行的最长时间,即超时,可以用Java线程池ExecutorService类配合Future接口来实现。 Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口...

程序袁_绪龙
2014/11/25
0
0
读书笔记之《Java并发编程的艺术》-线程池和Executor的子孙们

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
CountDownLatch源码解析

CountDownLatch 相比ReentranceLock,CountDownLatch的流程还是相对比较简单的,CountDownLatch也是基于AQS,它是AQS的共享功能的一个实现。 下面从源代码的实现上详解CountDownLatch。 1、C...

maxam0128
2018/01/23
0
0
Java中高级面试必问之多线程TOP50(含答案)

以下为大家整理了今年一线大厂面试被问频率较高的多线程面试题,由于本人的见识局限性,所以可能不是很全面,也欢迎大家在后面留言补充,谢谢。 1、什么是线程? 2、什么是线程安全和线程不安...

老道士
2018/08/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

任务调度-单体应用定时任务解决方案

1. 应用场景: 单体应用(并发少、就公司内部使用)、业务比较简单、单一、稳定,传统行业首选,项目初期。 2. 主要方式: Spring XML配置方式,timer。 <bean id="cycleBonusTimer" class="...

秋日芒草
30分钟前
3
0
EditText中singleLine过期替代方法

android:lines="1" android:inputType="text"

球球
44分钟前
1
0
删除 Tomcat-webapps 目录自带项目

本文将 %CATALINA_HOME% 目录称为“tomcat”目录。 1.webapps目录中的项目 在 Tomcat 8.0 的 tomcat/webapps 目录中,含有 5 个 Tomcat 自带的 Web 项目,如下所示: docs 有关于 Tomcat 的介...

Airship
48分钟前
3
0
好文:华杉:我等用功,不求日增,但求日减。减一分人欲,则增一分天理,这是何等简易!何等洒脱!

#写在前面1.怎么理解“减一分人欲,则增一分天理,这是何等简易!”?1)华杉提倡 “一劳永逸” 排除浪费,少干活,多赚钱,一战而定,降低作业成本。2)华杉提倡学海无涯,回头是岸...

阿锋zxf
58分钟前
3
0
vue 的bus总线

bus声明 global.bus = new Vue() 事件发送 controlTabbar () {global.bus.$emit('pickUp', 'ddd')}, 事件接收 global.bus.$on('pickUp', (res) => {this.isFocus = true})......

Js_Mei
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部