FutureTask源码分析
FutureTask源码分析
AbeJeffrey 发表于8个月前
FutureTask源码分析
  • 发表于 8个月前
  • 阅读 27
  • 收藏 1
  • 点赞 0
  • 评论 0

华为云·免费上云实践>>>   

摘要: FutureTask是一个可取消的异步计算,它继承自RunnableFuture,因此FutureTask同时也可表示一个可执行任务,理解FutureTask对理解JCU中线程池的实现非常重要,本文将结合源码详细分析FutureTask的原理。基于JDK1.8

在JCU中,FutureTask是Future的具体实现,且实现了Runnable接口,即FutureTask满足了Task的行为,是一个可以被用来执行的Future。FutureTask是JCU提供的线程池实现用到的任务基本单元,线程池主要接收两种对象,一个是Runnable任务,一种是Callable任务,按照ExecutorService接口定义的行为,可以将Runnable或Callable任务提交到线程池让其去被执行,而被提交的Runnable或Callable任务都会被包装成FutureTask,由线程池的工作线程去执行。

state

state用于表示任务的状态,初始为NEW,state仅在方法set,setException和cancel中转换为终止状态。任务完成时,state可能持有瞬时态的值COMPLETING或INTERRUPTING。由于state唯一且在将来不会被修改,从中间状态到最终状态的转换使用顺序写入或延迟写入。FutureTask的几种状态值如下:

    private volatile int state;
    //初始创建时的状态
    private static final int NEW          = 0;
    //当任务被执行完毕,FutureTask会将执行结果设置给FutureTask的outcome属性,
    //在设置之前会将FutureTask的状态修改为COMPLETING。
    private static final int COMPLETING   = 1;
    //当任务被执行完毕,FutureTask会将执行结果设置给FutureTask的outcome属性,
    //在设置之后会将FutureTask的状态修改为NORMAL。
    private static final int NORMAL       = 2;
    //当任务在被执行的过程中抛了异常,FutureTask会将异常信息设置给FutureTask的outcome属性,
    //在设置之前会将FutureTask的状态修改为COMPLETING,在设置之后会将FutureTask的状态修改为EXCEPTIONAL。
    private static final int EXCEPTIONAL  = 3;
    //当外部想要取消任务,而又不允许当任务正在执行的时候被取消时会将FutureTask的状态修改为CANCELLED。
    private static final int CANCELLED    = 4;
    //当外部想要取消任务,同时允许当任务正在执行的时候被取消时,会先将FutureTask的状态设置为INTERRUPTING,
    //然后设置执行任务的线程的中断标记位。
    private static final int INTERRUPTING = 5;
    //当外部想要取消任务,同时允许当任务正在执行的时候被取消时,会先将FutureTask的状态设置为INTERRUPTING,
    //然后设置执行任务的线程的中断标记位,最后将Future的状态设置为INTERRUPTED。
    private static final int INTERRUPTED  = 6;

综上,FutureTask的状态流转可能流程:

NEW—>COMPLETING—>NORMAL(任务执行正常)

NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常)

NEW—>CANCELLED(不允许执行中的取消)

NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消)

FutureTask中使用CAS操作更新state来表示任务完成,极大地降低了使用加锁进行同步控制的性能开销。

Treiber堆

FutureTask中使用简单的Treiber堆栈来保存等待线程,Treiber堆是非阻塞的,使用Treiber算法实现,即使用CAS来实现节点的出栈和入栈操作。FutureTask中使用WaitNode来表示等待节点,参见源码:

    private volatile WaitNode waiters;
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

这里暂不介绍出栈和入栈的实现,后续会详细说明。

构造方法

在说明FutureTask的构造方法前,先介绍FutureTask中的另外几个属性。

    /** 底层callable,运行后则失效 */
    private Callable<V> callable;
    /** get()返回或异常抛出的结果,非volatile,受state读/写保护*/
    private Object outcome;
    /**正在运行Callable任务的线程,在run()方法中使用CAS操作赋值 */
    private volatile Thread runner;

FutureTask的构造方法将提交的Runnable或Callable任务都会被包装成FutureTask。

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;
    }

FutureTask最终将Runnabl转化为Callable,这里使用了适配器模式实现,由Executors源码即可知。

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

下面重点分析FutureTask中的主要操作。

run

    public void run() {
        if (state != NEW ||   //状态为NEW且无线程用于运行当前任务,否则返回
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();//执行任务
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);//修改state并记录异常
                }
                if (ran)
                    set(result);//修改状态
            }
        } finally {
            //state被设置前必须保证runner非空,以阻止 run()被并发调用,进入finally 代码块,则state已经被设置
            runner = null;
            // runner置为null后,必须重新读取state以防止有中断发生
            int s = state;
            if (s >= INTERRUPTING)//说明其他线程调用了cancel(true)将state修改为INTERRUPTING
                handlePossibleCancellationInterrupt(s);
        }
    }
    

运行任务前,需保证状态为NEW且无线程用于运行当前任务,执行中若发生异常则调用setException修改state并记录异常。

    protected void setException(Throwable t) {
        //将状态由NEW修改为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 修改状态为EXCEPTIONAL
            finishCompletion();
        }
    }

除非state已经被改变(不为NEW),否则将记录Throwable,同时最终修改状态为EXCEPTIONAL,流程:NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常),最后调用finishCompletion。

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {//还有等待者
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//将waiters设为null
                for (;;) {//成功则进入自旋,唤醒堆栈中的后续所有线程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//唤醒线程
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;
    }

finishCompletion用于唤醒等待队列中的所有后续线程(若有),调用done()。FutureTask中done()什么也不做,该方法主要用于子类个性化定制,如ExecutorCompletionService中QueueingFuture实现FutureTask,实现done()以达到任务完成自动将Future加入结果队列。关于ExecutorCompletionService的源码分析参见https://my.oschina.net/7001/blog/874638

回到run方法,当任务正常完成时,将调用set方法修改state。流程:NEW—>COMPLETING—>NORMAL(任务执行正常)

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

在run方法的finally代码块中,需要将runner置为空,使后续等待线程可继续执行。同时,由于可能是多线程运行,如果state被其他线程调用cancel(true)修改为INTERRUPTING,表示将有中断会发生,将调用handlePossibleCancellationInterrupt确保中断完成。

    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // 暂停线程,释放资源,等待中断
    }

在while中调用Thread.yield()是为了保证线程能够成功暂停,执行yield()的线程有可能在进入到暂停状态后马上又被执行。

runAndReset()

runAndReset被设计用于本质上可执行多次的任务。也即如果当前执行正常完成,则不调用set方法修改state,而是保持初始状态NEW。除非计算遇到异常或被取消。源码如下:

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // 不设置结果
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            runner = null;
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

 cancel

取消任务相对较简单,当外部想要取消任务,同时允许当任务正在执行的时候被取消时,即mayInterruptIfRunning为true,走如下流程:

NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消)

否则,修改state为CANCELLED:NEW—>CANCELLED(不允许执行中的取消)

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { 
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

get

get操作主要用于计算完成后获取结果,下面根据源码查看其实现:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

state小于或等于COMPLETING时,说明执行结果还未写入outcome属性,需调用awaitDone等待完成。

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //当前线程是否已中断,该方法会清除线程状态,也就是说第一次调用返回true,
            //并且没有再次被中断时,第二次调用将返回false
            if (Thread.interrupted()) {
                removeWaiter(q);//移除等待者
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {//任务已完成或被取消
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) //表示任务马上完成,不必进入等待队列
                Thread.yield();
            else if (q == null)//此时s只可能为NEW
                q = new WaitNode();
            else if (!queued)//添加等待节点
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);//等待指定时间间隔后挂起
            }
            else
                LockSupport.park(this);//挂起线程
        }
    }
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }

在此没有详细分析FutureTask中Treiber堆的实现,为了深入理解get和run操作,可阅读https://my.oschina.net/7001/blog/875714,其中结合实例详细分析了FutureTask中Treiber堆的实现。

综上所述,FutureTask作为可取消的异步任务,仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/875658

共有 人打赏支持
粉丝 22
博文 36
码字总数 89060
×
AbeJeffrey
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: