文档章节

FutureTask源码分析

AbeJeffrey
 AbeJeffrey
发布于 2017/04/09 15:59
字数 2212
阅读 37
收藏 1
点赞 0
评论 0

在JCU中,FutureTask是Future的具体实现,且实现了Runnable接口,即FutureTask满足了Task的行为,是一个可以被用来执行的Future。FutureTask是JCU提供的线程池实现用到的任务基本单元,线程池主要接收两种对象:一个是Runnable任务,一种是Callable任务。按照ExecutorService接口定义的行为,可以将Runnable或Callable任务提交到线程池执行,而被提交的Runnable或Callable任务都会被包装成FutureTask,由线程池的工作线程去执行。

状态管理

state用于表示任务的状态,初始为NEW,state仅在方法set,setException和cancel中转换为终止状态。任务完成时,state可能持有瞬时态的值COMPLETING或INTERRUPTING。由于state唯一且在将来不会被修改,从中间状态到最终状态的转换使用顺序写入或延迟写入。FutureTask的几种状态值如下:

    private volatile int state;
    //初始创建时的状态
    private static final int NEW          = 0;
    //当任务执行完毕,FutureTask会将执行结果设置给outcome属性,在设置之前会将FutureTask的状态修改为COMPLETING。
    private static final int COMPLETING   = 1;
    //当任务执行完毕,FutureTask会将执行结果设置给outcome属性,在设置之后会将FutureTask的状态修改为NORMAL。
    private static final int NORMAL       = 2;
    //当任务在执行的过程中抛了异常,FutureTask会将异常信息设置给outcome属性,
    //在设置之前会将FutureTask的状态修改为COMPLETING,在设置之后将状态修改为EXCEPTIONAL。
    private static final int EXCEPTIONAL  = 3;
    //当外部想要取消任务,而又不允许当任务正在执行的时候被取消时会将FutureTask的状态修改为CANCELLED。
    private static final int CANCELLED    = 4;
    //当外部想要取消任务,同时允许当任务正在执行的时候被取消时,会先将FutureTask的状态设置为INTERRUPTING,
    //然后设置执行任务的线程的中断标记位。
    private static final int INTERRUPTING = 5;
    //当外部想要取消任务,同时允许当任务正在执行的时候被取消时,会先将FutureTask的状态设置为INTERRUPTING,
    //然后设置执行任务的线程的中断标记位,最后将Future的状态设置为INTERRUPTED。
    private static final int INTERRUPTED  = 6;

综上,FutureTask的状态流转可能流程:

NEW—>COMPLETING—>NORMAL(任务执行正常)

NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常)

NEW—>CANCELLED(不允许执行中的取消)

NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消)

FutureTask中使用CAS操作更新state来表示任务完成,极大地降低了使用加锁进行同步控制的性能开销。

Treiber堆

FutureTask中使用简单的Treiber堆栈来保存等待线程,Treiber堆是非阻塞的,使用CAS操作来实现节点的出栈和入栈操作。FutureTask中使用WaitNode来表示等待节点,源码如下:

    private volatile WaitNode waiters;
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

这里暂不介绍出栈和入栈的实现,后续会详细说明。

构造方法

在说明FutureTask的构造方法前,先介绍FutureTask中的另外几个属性。

    /** 底层callable,运行后则失效 */
    private Callable<V> callable;
    /** get()返回或异常抛出的结果,非volatile,受state读/写保护*/
    private Object outcome;
    /**正在运行Callable任务的线程,在run()方法中使用CAS操作赋值 */
    private volatile Thread runner;

FutureTask的构造方法将提交的Runnable或Callable任务都会被包装成FutureTask。

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;
    }

FutureTask最终将Runnabl转化为Callable,这里使用了适配器模式实现,由Executors源码可知。

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

下面重点分析FutureTask中的主要操作。

运行任务

FutureTask中使用run方法来执行任务,源码如下:

    public void run() {
        if (state != NEW ||   //状态为NEW且无线程用于运行当前任务,否则返回
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();//执行任务
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);//修改state并记录异常
                }
                if (ran)
                    set(result);//修改状态
            }
        } finally {
            //state被设置前必须保证runner非空,以阻止 run()被并发调用,进入finally 代码块,则state已经被设置
            runner = null;
            // runner置为null后,必须重新读取state以防止有中断发生
            int s = state;
            if (s >= INTERRUPTING)//说明其他线程调用了cancel(true)将state修改为INTERRUPTING
                handlePossibleCancellationInterrupt(s);
        }
    }
    

运行任务前,需保证状态为NEW且无线程用于运行当前任务,执行中若发生异常则调用setException修改state并记录异常。

    protected void setException(Throwable t) {
        //将状态由NEW修改为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 修改状态为EXCEPTIONAL
            finishCompletion();
        }
    }

除非state已经被改变(不为NEW),否则将记录Throwable,同时最终修改状态为EXCEPTIONAL,流程:NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常),最后调用finishCompletion。

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {//还有等待者
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//将waiters设为null
                for (;;) {//成功则进入自旋,唤醒堆栈中的后续所有线程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//唤醒线程
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;
    }

finishCompletion用于唤醒等待队列中的所有后续线程(若有)。当任务未完成时,调用get()方法会被加入等待队列并阻塞。FutureTask中done()什么也不做,该方法主要用于子类个性化定制,如ExecutorCompletionService中QueueingFuture实现FutureTask,实现done()以达到任务完成自动将Future加入结果队列。关于ExecutorCompletionService可移步ExecutorCompletionService源码分析

回到run方法,当任务正常完成时,将调用set方法修改state。流程:NEW—>COMPLETING—>NORMAL(任务执行正常)

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

在run方法的finally代码块中,需要将runner置为空,使后续等待线程可继续执行。同时,由于可能是多线程运行,如果state被其他线程调用cancel(true)修改为INTERRUPTING,表示将有中断会发生,将调用handlePossibleCancellationInterrupt确保中断完成。

    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // 暂停线程,释放资源,等待中断
    }

在while中调用Thread.yield()让出CPU,保证线程能够成功暂停,执行yield()的线程有可能在进入到暂停状态后马上又被执行。

runAndReset()

runAndReset被设计用于本质上可执行多次的任务。也即如果当前执行正常完成,则不调用set方法修改state,而是保持初始状态NEW。除非计算遇到异常或被取消。源码如下:

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // 不设置结果
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            runner = null;
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

取消执行任务

取消任务相对较简单,当外部想要取消任务,同时允许当任务正在执行的时候被取消时,即mayInterruptIfRunning为true,走如下流程:

NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消)

否则,修改state为CANCELLED:NEW—>CANCELLED(不允许执行中的取消)

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { 
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

获取任务结果

get操作主要用于计算完成后获取结果,下面根据源码查看其实现:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

state小于或等于COMPLETING时,说明执行结果还未写入outcome属性,需调用awaitDone方法等待完成。关于Treiber堆的实现或awaitDone实现原理,可以参见FutureTask中Treiber堆的实现。其中结合实例详细分析了FutureTask中Treiber堆的实现。

综上所述,FutureTask可用于获取任务执行的结果或取消执行任务。当且仅当任务完成时才能获取结果,如果任务尚未完成,调用get 方法会被阻塞,进入等待 。一旦任务完成,就不能再重新开始或取消计算。

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/875658

© 著作权归作者所有

共有 人打赏支持
AbeJeffrey
粉丝 29
博文 43
码字总数 116062
作品 0
杭州
高级程序员

暂无文章

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式 Factory工厂模式 Singleton单例模式 Delegate委派模式 Strategy策略模式 Prototype原型模式 Template模板模式 Spring5 beans 接口实例化 代理Bean操作 ...

小致dad
7分钟前
0
0
SpringBoot | 第十章:Swagger2的集成和使用

前言 前一章节介绍了mybatisPlus的集成和简单使用,本章节开始接着上一章节的用户表,进行Swagger2的集成。现在都奉行前后端分离开发和微服务大行其道,分微服务及前后端分离后,前后端开发的...

oKong
今天
9
0
Python 最小二乘法 拟合 二次曲线

Python 二次拟合 随机生成数据,并且加上噪声干扰 构造需要拟合的函数形式,使用最小二乘法进行拟合 输出拟合后的参数 将拟合后的函数与原始数据绘图后进行对比 import numpy as npimport...

阿豪boy
今天
9
0
云拿 无人便利店

附近(上海市-航南路)开了家无人便利店.特意进去体验了一下.下面把自己看到的跟大家分享下. 经得现场工作人员同意后拍了几张照片.从外面看是这样.店门口的指导里强调:不要一次扫码多个人进入....

周翔
昨天
1
0
Java设计模式学习之工厂模式

在Java(或者叫做面向对象语言)的世界中,工厂模式被广泛应用于项目中,也许你并没有听说过,不过也许你已经在使用了。 简单来说,工厂模式的出现源于增加程序序的可扩展性,降低耦合度。之...

路小磊
昨天
200
1
npm profile 新功能介绍

转载地址 npm profile 新功能介绍 npm新版本新推来一个功能,npm profile,这个可以更改自己简介信息的命令,以后可以不用去登录网站来修改自己的简介了 具体的这个功能的支持大概是在6这个版...

durban
昨天
1
0
Serial2Ethernet Bi-redirection

Serial Tool Serial Tool is a utility for developing serial communications, custom protocols or device testing. You can set up bytes to send accordingly to your protocol and save......

zungyiu
昨天
1
0
python里求解物理学上的双弹簧质能系统

物理的模型如下: 在这个系统里有两个物体,它们的质量分别是m1和m2,被两个弹簧连接在一起,伸缩系统为k1和k2,左端固定。假定没有外力时,两个弹簧的长度为L1和L2。 由于两物体有重力,那么...

wangxuwei
昨天
0
0
apolloxlua 介绍

##项目介绍 apolloxlua 目前支持javascript到lua的翻译。可以在openresty和luajit里使用。这个工具分为两种模式, 一种是web模式,可以通过网页使用。另外一种是tool模式, 通常作为大规模翻...

钟元OSS
昨天
2
0
Mybatis入门

简介: 定义:Mybatis是一个支持普通SQL查询、存储过程和高级映射的持久层框架。 途径:MyBatis通过XML文件或者注解的形式配置映射,实现数据库查询。 特性:动态SQL语句。 文件结构:Mybat...

霍淇滨
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部