文档章节

FutureTask源码分析

AbeJeffrey
 AbeJeffrey
发布于 2017/04/09 15:59
字数 2212
阅读 39
收藏 1

在JCU中,FutureTask是Future的具体实现,且实现了Runnable接口,即FutureTask满足了Task的行为,是一个可以被用来执行的Future。FutureTask是JCU提供的线程池实现用到的任务基本单元,线程池主要接收两种对象:一个是Runnable任务,一种是Callable任务。按照ExecutorService接口定义的行为,可以将Runnable或Callable任务提交到线程池执行,而被提交的Runnable或Callable任务都会被包装成FutureTask,由线程池的工作线程去执行。

状态管理

state用于表示任务的状态,初始为NEW,state仅在方法set,setException和cancel中转换为终止状态。任务完成时,state可能持有瞬时态的值COMPLETING或INTERRUPTING。由于state唯一且在将来不会被修改,从中间状态到最终状态的转换使用顺序写入或延迟写入。FutureTask的几种状态值如下:

    private volatile int state;
    //初始创建时的状态
    private static final int NEW          = 0;
    //当任务执行完毕,FutureTask会将执行结果设置给outcome属性,在设置之前会将FutureTask的状态修改为COMPLETING。
    private static final int COMPLETING   = 1;
    //当任务执行完毕,FutureTask会将执行结果设置给outcome属性,在设置之后会将FutureTask的状态修改为NORMAL。
    private static final int NORMAL       = 2;
    //当任务在执行的过程中抛了异常,FutureTask会将异常信息设置给outcome属性,
    //在设置之前会将FutureTask的状态修改为COMPLETING,在设置之后将状态修改为EXCEPTIONAL。
    private static final int EXCEPTIONAL  = 3;
    //当外部想要取消任务,而又不允许当任务正在执行的时候被取消时会将FutureTask的状态修改为CANCELLED。
    private static final int CANCELLED    = 4;
    //当外部想要取消任务,同时允许当任务正在执行的时候被取消时,会先将FutureTask的状态设置为INTERRUPTING,
    //然后设置执行任务的线程的中断标记位。
    private static final int INTERRUPTING = 5;
    //当外部想要取消任务,同时允许当任务正在执行的时候被取消时,会先将FutureTask的状态设置为INTERRUPTING,
    //然后设置执行任务的线程的中断标记位,最后将Future的状态设置为INTERRUPTED。
    private static final int INTERRUPTED  = 6;

综上,FutureTask的状态流转可能流程:

NEW—>COMPLETING—>NORMAL(任务执行正常)

NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常)

NEW—>CANCELLED(不允许执行中的取消)

NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消)

FutureTask中使用CAS操作更新state来表示任务完成,极大地降低了使用加锁进行同步控制的性能开销。

Treiber堆

FutureTask中使用简单的Treiber堆栈来保存等待线程,Treiber堆是非阻塞的,使用CAS操作来实现节点的出栈和入栈操作。FutureTask中使用WaitNode来表示等待节点,源码如下:

    private volatile WaitNode waiters;
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

这里暂不介绍出栈和入栈的实现,后续会详细说明。

构造方法

在说明FutureTask的构造方法前,先介绍FutureTask中的另外几个属性。

    /** 底层callable,运行后则失效 */
    private Callable<V> callable;
    /** get()返回或异常抛出的结果,非volatile,受state读/写保护*/
    private Object outcome;
    /**正在运行Callable任务的线程,在run()方法中使用CAS操作赋值 */
    private volatile Thread runner;

FutureTask的构造方法将提交的Runnable或Callable任务都会被包装成FutureTask。

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;
    }

FutureTask最终将Runnabl转化为Callable,这里使用了适配器模式实现,由Executors源码可知。

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

下面重点分析FutureTask中的主要操作。

运行任务

FutureTask中使用run方法来执行任务,源码如下:

    public void run() {
        if (state != NEW ||   //状态为NEW且无线程用于运行当前任务,否则返回
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();//执行任务
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);//修改state并记录异常
                }
                if (ran)
                    set(result);//修改状态
            }
        } finally {
            //state被设置前必须保证runner非空,以阻止 run()被并发调用,进入finally 代码块,则state已经被设置
            runner = null;
            // runner置为null后,必须重新读取state以防止有中断发生
            int s = state;
            if (s >= INTERRUPTING)//说明其他线程调用了cancel(true)将state修改为INTERRUPTING
                handlePossibleCancellationInterrupt(s);
        }
    }
    

运行任务前,需保证状态为NEW且无线程用于运行当前任务,执行中若发生异常则调用setException修改state并记录异常。

    protected void setException(Throwable t) {
        //将状态由NEW修改为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 修改状态为EXCEPTIONAL
            finishCompletion();
        }
    }

除非state已经被改变(不为NEW),否则将记录Throwable,同时最终修改状态为EXCEPTIONAL,流程:NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常),最后调用finishCompletion。

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {//还有等待者
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//将waiters设为null
                for (;;) {//成功则进入自旋,唤醒堆栈中的后续所有线程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//唤醒线程
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;
    }

finishCompletion用于唤醒等待队列中的所有后续线程(若有)。当任务未完成时,调用get()方法会被加入等待队列并阻塞。FutureTask中done()什么也不做,该方法主要用于子类个性化定制,如ExecutorCompletionService中QueueingFuture实现FutureTask,实现done()以达到任务完成自动将Future加入结果队列。关于ExecutorCompletionService可移步ExecutorCompletionService源码分析

回到run方法,当任务正常完成时,将调用set方法修改state。流程:NEW—>COMPLETING—>NORMAL(任务执行正常)

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

在run方法的finally代码块中,需要将runner置为空,使后续等待线程可继续执行。同时,由于可能是多线程运行,如果state被其他线程调用cancel(true)修改为INTERRUPTING,表示将有中断会发生,将调用handlePossibleCancellationInterrupt确保中断完成。

    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // 暂停线程,释放资源,等待中断
    }

在while中调用Thread.yield()让出CPU,保证线程能够成功暂停,执行yield()的线程有可能在进入到暂停状态后马上又被执行。

runAndReset()

runAndReset被设计用于本质上可执行多次的任务。也即如果当前执行正常完成,则不调用set方法修改state,而是保持初始状态NEW。除非计算遇到异常或被取消。源码如下:

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // 不设置结果
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            runner = null;
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

取消执行任务

取消任务相对较简单,当外部想要取消任务,同时允许当任务正在执行的时候被取消时,即mayInterruptIfRunning为true,走如下流程:

NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消)

否则,修改state为CANCELLED:NEW—>CANCELLED(不允许执行中的取消)

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { 
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

获取任务结果

get操作主要用于计算完成后获取结果,下面根据源码查看其实现:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

state小于或等于COMPLETING时,说明执行结果还未写入outcome属性,需调用awaitDone方法等待完成。关于Treiber堆的实现或awaitDone实现原理,可以参见FutureTask中Treiber堆的实现。其中结合实例详细分析了FutureTask中Treiber堆的实现。

综上所述,FutureTask可用于获取任务执行的结果或取消执行任务。当且仅当任务完成时才能获取结果,如果任务尚未完成,调用get 方法会被阻塞,进入等待 。一旦任务完成,就不能再重新开始或取消计算。

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/875658

© 著作权归作者所有

共有 人打赏支持
AbeJeffrey
粉丝 31
博文 43
码字总数 116062
作品 0
杭州
高级程序员

暂无文章

js实现产生n个随机数,并且随机数之和是固定值

function getrandom(minnum , maxnum ,total,size){ var num = total; //定义整数 var length= size; //定义多个整数的数量 var numArr = []; while(length > 1){ var rnd = Math.floor(Mat......

开源昕昕
12分钟前
1
0
精选Spring Boot三十五道必知必会知识点!

Spring Boot 是微服务中最好的 Java 框架. 我们建议你能够成为一名 Spring Boot 的专家。本文精选了三十五个常见的Spring Boot知识点,祝你一臂之力! 问题一 Spring Boot、Spring MVC 和 Sp...

Java填坑之路
13分钟前
2
0
MyBatis学习笔记

相关文档 mybatis深入理解(一)之 # 与 $ 区别以及 sql 预编译 MyBatis 处理sql中的 大于,小于,大于等于,小于等于

OSC_fly
14分钟前
0
0
Gradle从不同地方复制文件到一个文件夹/打zip包

复制 task copySDK(type: Copy, dependsOn: [":fatJarTask"]) { delete JAR_NAME delete SDK_OUT_PATH delete "$ROOT_BUILD_PATH/$SDK_ZIP_NAME" into('/jniLibs') {......

SuShine
14分钟前
0
0
CentOS关闭占用端口,修改Apache默认端口,并重启Apache

查找并关闭进程 在Linux系统中可以使用lsof命令和kill命令,两个命令配合查找并关闭占用端口的进程 查看某一端口使用情况的命令: lsof -i:端口号 效果如下,PID即进程的ID 根据PID关闭进程,...

临江仙卜算子
20分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部