文档章节

FutureTask中Treiber堆的实现

AbeJeffrey
 AbeJeffrey
发布于 2017/04/09 18:37
字数 1739
阅读 27
收藏 1
点赞 0
评论 0

在文章FutureTask源码分析中简单说明了FutureTask中使用Treiber堆栈来保存等待结果的线程,本文将详细分析其原理。

Treiber堆使用CAS操作来实现节点的入栈和出栈,由于CAS操作只是保证操作的原子性,多线程并发时,其并不能保证可见性,因此必须依赖volatile来保证写入的可见性。这样就可以不必使用加锁来实现对共享数据结构的访问。下面先看一个实现Treiber堆的例子:

public class TreiberStack<E> {
    AtomicReference<Node<E>> head = new AtomicReference<Node<E>>();
    public void push(E item) {//总是在堆顶加入新节点
        Node<E> node= new Node<E>(item);
        Node<E> old;
        do {
            old = head.get();
            node.next = old;
        } while (!head.compareAndSet(old, node));
    }
    public E pop() {//总是从堆顶移除
        Node<E> old;
        Node<E> node;
        do {
            old = head.get();
            if (old == null) 
                return null;
            node = old.next;
        } while (!head.compareAndSet(old,node));
        return old.item;
    }
    private static class Node<E>{
        public final  E  item;
        public Node<E>   next;
        public Node(E item){this.item=item;}
    }

FutureTask中WaitNode 作为节点,并将当前线程保存在其中,而且将栈顶元素保存在waiters中。

入栈

首先看入栈操作,当调用FutureTask的get方法时,若任务未完成则会将当前等待结果的线程加入到等待队列,并挂起当前线程。

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //当前线程是否已中断,该方法会清除线程状态,也就是说第一次调用返回true,
            //并且没有再次被中断时,第二次调用将返回false
            if (Thread.interrupted()) {
                removeWaiter(q);//移除等待者
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {//任务已完成或被取消
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) //表示任务马上完成,不必进入等待队列
                Thread.yield();
            else if (q == null)//此时s只可能为NEW
                q = new WaitNode();
            else if (!queued)//添加等待节点
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);//等待指定时间间隔后挂起
            }
            else
                LockSupport.park(this);//挂起线程
        }
    }

由源码可知,当前线程未被中断时,使用CAS操作加入当前等待节点q,通过将q设为新的栈顶元素,即waiters,同时修改q.next指针指向上一次的waiters。这里使用自旋操作来保证操作一定成功。

看下面例子:

public class FutureTaskStackTest {
    private static FutureTask<Integer> ft=new FutureTask<Integer>(new ComputeTask(0,"task"));
    public static void main(String[] args) throws InterruptedException,ExecutionException{
        ExecutorService   executor=Executors.newSingleThreadExecutor();
        executor.submit(ft);
        executor.shutdown();
        for(int i=0;i<5;i++){
            System.out.println("for loop "+i);
            System.out.println(Thread.currentThread()+":"+ ft.get());
        }
    }
    private static class ComputeTask implements Callable<Integer>{
        private Integer result ;  
        private String  taskName ; 
        public ComputeTask(Integer init, String taskName){  
            result = init;  
            this.taskName = taskName;  
        }  
        @Override
        public Integer call() throws Exception {
            for (int i = 0; i < 100; i++) {  
                result =+ i;  
            } 
            Thread.sleep(5000);
            System.out.println(taskName+" finish!");  
            return result;
        }
        
    }
}

输出:

for loop 0
task finish!
Thread[main,5,main]:99
for loop 1
Thread[main,5,main]:99
for loop 2
Thread[main,5,main]:99
for loop 3
Thread[main,5,main]:99
for loop 4
Thread[main,5,main]:99

ComputeTask将sleep 5000ms才会完成任务,主线程中循环调用5次future.get()。那么等待队列中会加入5个节点吗?实际不是,只会加入一个,当加入一个时,当前main线程会被挂起,即输出“for loop 0”之后被挂起,直到任务完成被唤醒。这就说明同一个线程中调用多次future.get()和调用一次在FutureTask中都将只会加入一个节点,当线程被唤醒时,future.get()将不会被阻塞。

那么使用如下例子:

public class FutureTaskStackTset {
    private static FutureTask<Integer> ft=new FutureTask<Integer>(new ComputeTask(0,"task"));
    public static void main(String[] args) throws InterruptedException,ExecutionException{
        ExecutorService   executor=Executors.newSingleThreadExecutor();
        executor.submit(ft);
        executor.shutdown();
        MyThread thread1=new MyThread();
        thread1.setName("thread1");
        MyThread thread2=new MyThread();
        thread2.setName("thread2");
        thread1.start();
        thread2.start();
        System.out.println(Thread.currentThread().getName()+" : "+ ft.get());
    }
    private static class ComputeTask implements Callable<Integer>{
        private Integer result ;  
        private String  taskName ; 
        public ComputeTask(Integer init, String taskName){  
            result = init;  
            this.taskName = taskName;  
        }  
        @Override
        public Integer call() throws Exception {
            for (int i = 0; i < 100; i++) {  
                result =+ i;  
            } 
            Thread.sleep(5000);
            System.out.println(taskName+" finish!");  
            return result;
        }
        
    }
    private static class MyThread extends Thread{
        public void run() {
            try {
                System.out.println(this.getName()+" : "+ ft.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

输出:

task finish!
main : 99
thread1 : 99
thread2 : 99

在这个例子中,在不同的线程中调用future.get(),因此Treiber堆中会加入3个等待节点。这里main线程中的future.get()必须在启动其他线程之后调用,否则,main线程被阻塞,那么子线程就不会被启动,自然也不会加入等待队列。

移除

awaitDone实现可知,若当前线程被中断,FutureTask将会清理等待队列,移除已经被中断线程的节点。

private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {//移除thread为null的节点
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }//q和pred的thread均为null,将s设为新的栈顶元素,即waiters
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;//失败则重新进入循环
                }
                break;
            }
        }
    }

removeWaiter首先将要移除节点的thread变量置为null,这一步很关键,因为后续移除等待节点就是根据thread是否为null来实现。下面分别分析几种可能:

1 waiters即为要移除的元素

从代码分析,q指向waiters的节点,s=q->next,则s指向T1,此时q.thread为null,pred也为null,进入else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)),将s设为waiters,成功移除为null的节点。

2 thread为null元素位于堆栈中间

当找到q.thread为null的元素时,pred将指向T1,而S则指向T2,那么程序将会进入else if (pred != null)的代码块,将pred的next指针指向S,则成功将q移除。

完全有可能出现thread为null的元素位于中间甚至末尾的情况,当多线程调用get()方法时,CAS保证同时只能有一个线程将节点加入等待队列。失败的线程将继续进行自旋操作,直到成功。同时,上文已提过,相同线程多次调用get也只会加入一个节点到等待队列,因此removeWaiter一次调用实际只会移除一个节点。

removeWaiter操作的作用在于移除无效节点,避免造成垃圾累积,当堆栈中节点较多,removeWaiter操作会很慢。通常情况下,不会有太多线程同时等待一个任务的结果。

出栈

当任务执行完成后,将在 finishCompletion()中唤醒所有节点,此时所有线程都可以拿到结果。

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;        // to reduce footprint
    }

 finishCompletion()实现即可知,首先使用CAS操作将waiters置为null,然后从栈顶到栈底唤醒所有等待节点,并将节点的thread和next置空,这有助于GC回收内存。

欢迎指出本文有误的地方,转载请注明原文出处https://my.oschina.net/7001/blog/875714

© 著作权归作者所有

共有 人打赏支持
AbeJeffrey
粉丝 27
博文 43
码字总数 116062
作品 0
杭州
高级程序员
线程池源码分析-FutureTask

1 系列目录 - 线程池接口分析以及FutureTask设计实现- 线程池源码分析-ThreadPoolExecutor 该系列打算从一个最简单的Executor执行器开始一步一步扩展到ThreadPoolExecutor,希望能粗略的描述...

乒乓狂魔 ⋅ 2016/04/27 ⋅ 1

callable future futuretask

1.Callable接口 我们先回顾一下java.lang.Runnable接口,就声明了run(),其返回值为void,当然就无法获取结果了。 [java] view plain copy print? public interface Runnable { public abstr...

hgqxjj ⋅ 2017/11/23 ⋅ 0

更好地理解与使用Future

一个多月没有写东西了,今天想写的也是想记录下来的一些学习及思考结果,记忆能力有限,避免时间长久就忘记了,今天想写的也还是一些基础的东西,为什么我总是关注这些平时码业务代码很少能用...

Float_Luuu ⋅ 2016/05/02 ⋅ 3

源码|使用FutureTask的正确姿势

线程池的实现核心之一是FutureTask。在提交任务时,用户实现的Callable实例task会被包装为FutureTask实例ftask;提交后任务异步执行,无需用户关心;当用户需要时,再调用FutureTask#get()获...

猴子007 ⋅ 2017/11/20 ⋅ 0

java Future 接口介绍

在Java中,如果需要设定代码执行的最长时间,即超时,可以用Java线程池ExecutorService类配合Future接口来实现。 Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口...

程序袁_绪龙 ⋅ 2014/11/25 ⋅ 0

ThreadPoolExecutor的PriorityBlockingQueue支持问题

最近在使用ThreadPoolExecutor时遇到一个问题:当ThreadPoolExecutor使用的BlockingQueue为PriorityBlockingQueue时,会出现异常,原因是java.util.concurrent.FutureTask cannot be cast to......

canghailan ⋅ 2011/12/12 ⋅ 1

并发编程之Callable和Future接口、FutureTask类

Callable接口代表一段可以调用并返回结果的代码;Future接口表示异步任务,是还没有完成的任务给出的未来结果。所以说Callable用于产生结果,Future用于获取结果。 Java 5在concurrency包中引...

小菜鸡1 ⋅ 2016/08/11 ⋅ 0

Future和FutureTask

public interface Future<V> Future 表示异步计算的结果。 Future有个get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。 Future 主...

肖文锋 ⋅ 2012/12/07 ⋅ 0

java Callable & Future & FutureTask

实现Runnable接口的线程类与一个缺陷,就是在任务执行完之后无法取得任务的返回值。 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦 ...

Key_Stone ⋅ 2016/09/16 ⋅ 0

深度学习Java Future (一)

作者: 一字马胡 转载标志 【2017-12-07】 更新日志 日期 更新内容 备注 2017-12-07 学习Future的总结 关于Future的深入学习内容 Future 上面这段文字已经说明了Future的本质,一个Future代表...

一字马胡 ⋅ 2017/12/07 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Centos7重置Mysql 8.0.1 root 密码

问题产生背景: 安装完 最新版的 mysql8.0.1后忘记了密码,向重置root密码;找了网上好多资料都不尽相同,根据自己的问题总结如下: 第一步:修改配置文件免密码登录mysql vim /etc/my.cnf 1...

豆花饭烧土豆 ⋅ 57分钟前 ⋅ 0

熊掌号收录比例对于网站原创数据排名的影响[图]

从去年下半年开始,我在写博客了,因为我觉得业余写写博客也还是很不错的,但是从2017年下半年开始,百度已经推出了原创保护功能和熊掌号平台,为此,我也提交了不少以前的老数据,而这些历史...

原创小博客 ⋅ 今天 ⋅ 0

LVM讲解、磁盘故障小案例

LVM LVM就是动态卷管理,可以将多个硬盘和硬盘分区做成一个逻辑卷,并把这个逻辑卷作为一个整体来统一管理,动态对分区进行扩缩空间大小,安全快捷方便管理。 1.新建分区,更改类型为8e 即L...

蛋黄Yolks ⋅ 今天 ⋅ 0

Hadoop Yarn调度器的选择和使用

一、引言 Yarn在Hadoop的生态系统中担任了资源管理和任务调度的角色。在讨论其构造器之前先简单了解一下Yarn的架构。 上图是Yarn的基本架构,其中ResourceManager是整个架构的核心组件,它负...

p柯西 ⋅ 今天 ⋅ 0

uWSGI + Django @ Ubuntu

创建 Django App Project 创建后, 可以看到路径下有一个wsgi.py的问题 uWSGI运行 直接命令行运行 利用如下命令, 可直接访问 uwsgi --http :8080 --wsgi-file dj/wsgi.py 配置文件 & 运行 [u...

袁祾 ⋅ 今天 ⋅ 0

JVM堆的理解

在JVM中,我们经常提到的就是堆了,堆确实很重要,其实,除了堆之外,还有几个重要的模块,看下图: 大 多数情况下,我们并不需要关心JVM的底层,但是如果了解它的话,对于我们系统调优是非常...

不羁之后 ⋅ 昨天 ⋅ 0

推荐:并发情况下:Java HashMap 形成死循环的原因

在淘宝内网里看到同事发了贴说了一个CPU被100%的线上故障,并且这个事发生了很多次,原因是在Java语言在并发情况下使用HashMap造成Race Condition,从而导致死循环。这个事情我4、5年前也经历...

码代码的小司机 ⋅ 昨天 ⋅ 2

聊聊spring cloud gateway的RetryGatewayFilter

序 本文主要研究一下spring cloud gateway的RetryGatewayFilter GatewayAutoConfiguration spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/G......

go4it ⋅ 昨天 ⋅ 0

创建新用户和授予MySQL中的权限教程

导读 MySQL是一个开源数据库管理软件,可帮助用户存储,组织和以后检索数据。 它有多种选项来授予特定用户在表和数据库中的细微的权限 - 本教程将简要介绍一些选项。 如何创建新用户 在MySQL...

问题终结者 ⋅ 昨天 ⋅ 0

android -------- 颜色的半透明效果配置

最近有朋友问我 Android 背景颜色的半透明效果配置,我网上看资料,总结了一下, 开发中也是常常遇到的,所以来写篇博客 常用的颜色值格式有: RGB ARGB RRGGBB AARRGGBB 这4种 透明度 透明度...

切切歆语 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部