文档章节

Fork/Join实现原理

多弗哥
 多弗哥
发布于 2017/07/29 21:26
字数 2271
阅读 25
收藏 0

    作为一个轻量级的并发执行框架,Fork/Join实现事实上由3个角色构成:任务队列(WorkQueue)、工作者线程(ForkJoinWorkerThread)、执行任务(ForkJoinTask),任务队列负责存放程序提交给执行者(ForkJoinPool)的任务,工作者线程负责执行这些任务,然后他们通过执行者的接口来对外提供服务。

    对于这些角色如何协调来并发执行任务,推荐查看JDK8的源码,它比JDK7更利于理解,而这里我们先着重讨论实现Fork/Join机制的两个解决方案:双端队列和抢断/闲置,而后通过三个方面通过源码来进行阐述:提交任务(submit),执行任务(fork),和联结任务(join)。

 

一、双端队列

    为了获取高效和可扩展,我们希望对执行任务的管理能越快越好。我们使用了双端队列,它允许元素可以从两端弹出,且限定了插入和删除操作必须在队列的两端进行。它是高效的,创建-push、发布-pop和弹出-take任务能够在程序调用中的开销更低廉,使得程序员能够构建更小粒度的任务,最终也能更好的利用并行所带来的益处。

    双端队列的基本结构采用了很常规的一个结构——数组(尽管是可变长的)来表示每个队列,同时附带两个索引:top索引和base索引。top索引类似于数组中的栈指针,通过push和pop操作来改变;而base索引只能通过take操作来改变。把base索引定义为volitile变量可以保证当队列中元素不止一个时,take操作可以在不加锁的情况下进行。而且,通过比较两个索引来检查操作是否会导致双端队列变成一个空队列,来防止潜在的冲突。

/**
 * 支持工作窃取和外部任务提交的队列
 */
static final class WorkQueue {

    /**
     * 工作队列数组初始化时的容量
     * 必须是2的幂次方,至少是4或者更大,以减少队列间的共享缓存行
     */
    static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

    /**
     * 队列数组的最大大小
     * 必须是2的幂小于或等于1 <(31 -队列元素宽度),以确保减少索引计算的环绕
     */
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

    // 实例字段
    volatile int scanState;
    int stackPred;
    int nsteals;               // 窃取次数
    int hint;                  // 随机数和窃取的索引
    int config;                // pool索引和模式
    volatile int qlock;        // 1: 锁定, < 0: 终止;0 其它
    volatile int base;         // 下个poll的索引
    int top;                   // 下一个push的索引
    ForkJoinTask<?>[] array;   // 元素(最初未分配)
    final ForkJoinPool pool;   // 包含池(可能为null)
    final ForkJoinWorkerThread owner; // 拥有的线程,如果共享则为null
    volatile Thread parker;
    volatile ForkJoinTask<?> currentJoin;  // 等待加入的任务
    volatile ForkJoinTask<?> currentSteal;

    WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
        this.pool = pool;
        this.owner = owner;
        // Place indices in the center of array (that is not yet allocated)
        base = top = INITIAL_QUEUE_CAPACITY >>> 1;
    }

    。。。。。。。
}

    因为执行者ForkJoinPool的操作都是无缝的绑定到双端队列的细节之中,我们把这个数据结构直接放在执行者ForkJoinPool类之中(静态内部类),而不是作为一个单独的组件。

    由于每个push和pop操作都需要获取锁以保证同步前后的一致性,这将成为性能瓶颈。所以,对于双端队列而言,我们有以下的调整策略:

  • push和pop操作仅可以被工作线程的拥有者所调用;
  • 对task的操作会因为窃取任务线程在某一个时间点对take的操作而采取加锁限制。这样,控制冲突将被降低为两个部分(工作线程和窃取任务线程)同步的层次;
  • pop和take操作只有在双端队列为空的时候才会发生冲突,否则的话,双端队列会保证他们在不同的数组元素上面进行操作。

 

二、工作窃取

    Fork/Join框架的核心在于工作窃取算法(work-stealing),它是种轻量级调度机制,采用了work-stealing基本的调度策略:

  • 每一个工作线程维护自己的调度队列中的可运行任务;
  • 队列以双端队列的形式被维护,支持后进先出(LIFO)的push和pop操作,和先进先出(FIFO)的take操作;
  • 对于一个给定的工作线程来说,任务所产生的子任务将会被放入到工作者自己的双端队列中;
  • 工作线程使用LIFO(最早的优先)的顺序,通过弹出任务来处理队列中的任务。当一个工作线程的本地没有任务去运行的时候,它将使用FIFO规则尝试随机的从别的工作线程中“偷窃”一个任务去运行;
  • 当一个工作线程触及了join操作,可能的话它将处理其他任务,直到目标任务被告知已经结束;
  • 当一个工作线程无法再从其他线程中获取任务和处理失败的时候,它就会退出,并经过一段时间之后再度尝试直到所有的工作线程都被告知他们都处于空闲的状态。在这种情况下,他们会一直阻塞直到其他的任务再度被上层调用。

    这里主要的问题在于,当一个工作线程既没有本地任务也无法从别的工作线程中窃取任务时怎么办。经管在多核处理器上,可以依赖于硬件的忙等待、自旋循环的去尝试窃取一个任务,但是即使这样,尝试窃取任务还是会增加线程竞争,甚至会导致那些不是闲置的工作线程降低效率(锁协议的关系)。

    Java中并没有十分健壮的机制来保证这种情况是少发生的,但是在实际场景中它往往也是可以让人接受的。一个窃取失败的线程在尝试另外的窃取之前会降低自己的优先级,在尝试窃取期间执行Thread#yeild()方法,将自己的状态在线程池中设置为不活跃,他们会一直阻塞直到有新的任务进来。其他情况下,在进行一定的自旋次数之后,线程将进入休眠阶段(即所谓的“线程强制睡眠”),但不是放弃窃取。强化的休眠机制会给人造成一种需要花费很长时间去分解任务的假象,但这却是最好也是通用的折中方案。Fork/Join框架的未来版本也许会支持额外的控制方法,以便于让程序员在感觉性能受到影响时可以重写默认的实现。

 

三、提交任务

    当你实例化一个执行者ForkJoinPool时,通常情况下它会创建一个线程数等于计算机处理器数的池(通过Runtime.availableProcessors()方法获得)①。当ForkJoinPool对象被创建时,这些线程被创建并且在池中等待,直到有任务到达让它们执行。

    ForkJoinPool有三种提交任务的方式:execute(无返回值)、submit(返回异步future)、invoke(返回join操作得到的结果),都会调用externalPush()方法:

/*
 * 将提交的任务添加到提交者当前的提交队列中
 */
final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws; WorkQueue q; int m;
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        ForkJoinTask<?>[] a; int am, n, s;
        if ((a = q.array) != null &&
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);
            U.putOrderedInt(q, QTOP, s + 1);
            U.putIntVolatile(q, QLOCK, 0);
            if (n <= 1)
                signalWork(ws, q);
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    externalSubmit(task);
}

 

一旦ForkJoinTask被启动,就会启动其子任务并等待它们执行完成。执行者ForkJoinPool负责将任务赋予线程池中处于等待任务状态的另一线程。线程池中的活动线程会尝试执行其他任务所创建的子任务。ForkJoinPool尝试在任何时候都维持与可用的处理器数目一样数目的活动线程数。

你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer 策略,仅仅关注如何划分任务和组合中间结果,将剩下的事情丢给 Fork/Join 框架。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

 

 

 

注:

    ① 对于工作线程数,通常情况下与平台所处的处理器数保持一致,但有的时候更少,用于处理其他相关的任务,或者有些情况下更多,来提升非计算密集型任务的性能。

© 著作权归作者所有

多弗哥
粉丝 16
博文 40
码字总数 149017
作品 0
杭州
高级程序员
私信 提问
Java7提供的并行执行任务框架:Fork、Join框架

一、定义 Fork/join主要是Java7提供的一个并行执行任务的框架,Fork就是把一个大任务切分为诺干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到大任务的结果。 如果1+2+3+……...

落地吃鸡
2016/03/31
192
0
聊聊并发(八)Fork/Join框架介绍

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 我们再通过Fork和Join这两个单词来理解下F...

陶邦仁
2015/03/23
434
1
线程基础:多任务处理(12)——Fork/Join框架(基本使用)

1. 概述 在进行系统存储专题的学习、总结、整理和写作的过程中感觉有点累了,加上最近在做书稿的第二次校稿工作,系统存储专题的学习和写作进度明显有些滞后,特别是编写的Ceph MON知识点。所...

yinwenjie
2017/05/14
0
0
Java并发系列6-Fork/Join框架

声明:原创文章,转载请注明出处。http://www.jianshu.com/u/e02df63eaa87 一、介绍 1、Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终...

唐影若凡
2017/07/27
0
0
Java7中的ForkJoin并发框架初探(上)——需求背景和设计原理

最近事情较多,好久没发文章了。前面关于Java并发的文章中主要介绍了并发的概念、思想、JavaSE5中java.util.concurrent包中的工具类的使用和实现源码的分析。这篇我们来简要了解一下JavaSE7...

Nori
2016/05/20
98
0

没有更多内容

加载失败,请刷新页面

加载更多

CentOS7.6中安装使用fcitx框架

内容目录 一、为什么要使用fcitx?二、安装fcitx框架三、安装搜狗输入法 一、为什么要使用fcitx? Gnome3桌面自带的输入法框架为ibus,而在使用ibus时会时不时出现卡顿无法输入的现象。 搜狗和...

技术训练营
昨天
5
0
《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
昨天
8
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
昨天
10
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
昨天
6
0
PHP+Ajax微信手机端九宫格抽奖实例

PHP+Ajax结合lottery.js制作的一款微信手机端九宫格抽奖实例,抽奖完成后有收货地址添加表单出现。支持可以设置中奖概率等。 奖品列表 <div class="lottery_list clearfix" id="lottery"> ......

ymkjs1990
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部