文档章节

java 中的Fork/Join框架

hgfgoodcreate
 hgfgoodcreate
发布于 2016/07/03 12:04
字数 6663
阅读 137
收藏 13

什么是Fork/Join框架

Fork/Join框架是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+。。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。Fork/Join的运行流程图如下: ForkJoin框架流程

涉及到的类主要有:

  • ForkJoinPool:线程池,实现抽象类AbstractExecutorService(实现了ExecutorService
    • 负责维护全部的工作线程
    • 接收调用者分配的task
    • 本身持有一个全局的task队列
    • 实现任务窃取
  • ForkJoinWorkerThread:ForkJoinPool线程池中的worker线程,具体执行task。其中保存着对所在线程池的引用。
  • ForkJoinTask接口。task的抽象。
    • RecursiveTask:task执行完成后带返回值的task。
    • RecursiveAction:不带返回值的task。

ForkJoin框架能满足的需求

如果一个任务的问题集能被拆分,并且组合多个子任务的结果就能获取结果,那么这个问题就适合使用ForkJoin框架解决问题。例如:从数组中查找最大数,划分为查找局部最大数;

工作窃取

ForkJoin核心点:工作窃取工作窃取

工作窃取使得较空闲的线程可以帮助繁忙线程,而不是在空闲等待状态,让整个系统更快的解决问题集合。特别是每个线程处理的问题子集的大小是无法预估的情况下(这种情况下可能出现有些线程很繁忙,而有些比较空闲,在等待其它子任务完成才能算出最终结果。)

每个工作线程都有自己的工作队列,这是使用双端队列(或者叫做 deque)来实现的(Java 6 在类库中添加了几种 deque 实现,包括 ArrayDequeLinkedBlockingDeque)。

标准队列和双端队列实现工作窃取对比: 可以使用标准队列实现工作窃取,但是与标准队列相比,deque 具有两方面的优势:减少争用和窃取。因为只有工作线程会访问自身的 deque 的头部,deque 头部永远不会发生争用;因为只有当一个线程空闲时才会访问 deque 的尾部,所以也很少存在线程的 deque 尾部的争用(在 fork-join 框架中结合 deque 实现会使这些访问模式进一步减少协调成本)。

跟传统的基于线程池的方法相比,减少争用会大大降低同步成本。此外,这种方法暗含的后进先出(last-in-first-out,LIFO)任务排队机制意味着最大的任务排在队列的尾部,当另一个线程需要窃取任务时,它将得到一个大任务(能够分解成多个小任务的任务),从而避免了在未来窃取任务。因此,工作窃取实现了合理的负载平衡,无需进行协调并且将同步成本降到了最小。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

关键属性

ForkJoinTask<?>[] submissionQueue; // java.util.concurrent.ForkJoinPool
// Pool的task队列 初始容量为8192 由于submissionQueue是环形队列 而作者使用了特殊的求余算法 导致的容量必须为2的幂次方 
// 通过`java.util.concurrent.ForkJoinPool#growSubmissionQueue()`方法拓展 调用者线程submit的task都会提交到这个队列
// 然后唤醒worker去该队列steal task 如果在worker线程调用submit提交直接提交调用提交方法的worker线程的task队列中

ForkJoinTask<?>[] queue; // java.util.concurrent.ForkJoinWorkerThread   
// Worker的task队列 初始容量为8192 基本上与submissionQueue的维护方式一样 不过只能通过在worker线程调用fork()才能将
// task添加到这个队列中


volatile int queueBase; // java.util.concurrent.ForkJoinPool & java.util.concurrent.ForkJoinWorkerThread
// 队列尾部索引 task窃取时会更改此值
//由于几个线程可能同时访问 所以修饰符是volatile


int queueTop; // java.util.concurrent.ForkJoinPool & java.util.concurrent.ForkJoinWorkerThread
// task push与pop时会更改的索引 根据上面对队列属性的描述会发觉所有入队操作都是由"一个线程"来完成的(调用者线程往pool中
// push task 而worker线程自己给自己fork task) 所以其是非线程安全的 另 submissionLock 保证了向pool中提交task的安全性
// 但是这个保护只是防止调用者作死而存在的(比如并发往pool中提交task) 如果保证调用者单线程入数据 则不需要这个锁


volatile int status; // java.util.concurrent.ForkJoinTask
// task执行的状态 对应本类的四个状态常量 已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)

框架入口

提交任务

由于ForkJoinPool实现了ExecutorService,也就是支持通过submitexecute两种方法提交任务。

submitexecute方法中,可能需要使用ForkJoinTask.adapt()方法将RunnableCallable方法包装成ForkJoinTask类型的Job。 核型提交由forkOrSubmit完成。

源码:

private <T> void forkOrSubmit(ForkJoinTask<T> task) {
    ForkJoinWorkerThread w;
    Thread t = Thread.currentThread();
    if (shutdown)
        throw new RejectedExecutionException(); 
        // 已经被shutdown后则不再接收新的task 与传统线程池不同的是task最大数量是由ForkJoin自行管理的 外部不可更改
        // 环形队列把这点限制死了 不过也不错
    if ((t instanceof ForkJoinWorkerThread) &&
        (w = (ForkJoinWorkerThread)t).pool == this)
        w.pushTask(task); 
        // 如果提交task的线程是worker线程并且属于当前的pool 则直接将task添加到这个worker中
        //(即:调用着直接将task提交到具体的worker线程,而不是提交给线程池)
        // 这个方法由于面向具体的线程 所以不需要锁
    else
        addSubmission(task);
        // 否则将task添加到pool队列中
}

如果当前线程是ForkJoinWorkerThread类型的,则将任务追加到ForkJoinWorkerThread对象中维护的队列上,否则将新的任务放入ForkJoinPool的提交队列中,并通知线程工作。

pushTask方法实现原理在下面谈ForkJoinTask的fork方法实现原理时一起说。

简化提交方式

public <T> T invoke(ForkJoinTask<T> task) {
    Thread t = Thread.currentThread();
    if (task == null)
        throw new NullPointerException();
    if (shutdown)
        throw new RejectedExecutionException();
    if ((t instanceof ForkJoinWorkerThread) &&
        ((ForkJoinWorkerThread)t).pool == this)
        return task.invoke();  // 如果提交task的线程对象是当前pool中的worker 则直接让当前worker自己处理task
    else {
        addSubmission(task);   // 所有非worker提交的task全部由pool保存
        return task.join();  //等待task执行完毕
    }
}

调用者线程流程

即调用了提交接口的线程,在task提交后的主要流程。 入口处不管是submit,execute方法提交task,还是使用invoke直接提交task,如果是提交给线程池,那么都会进入addSubmission方法。

private void addSubmission(ForkJoinTask<?> t) {
    final ReentrantLock lock = this.submissionLock; 
    // 这里这个锁是防止调用者乱搞 task的生成大多都在worker中发生
    //并且不会使用ForkJoinPool.addSubmission方法来实现生成task, 所以调用这个方法的只有调用者
    lock.lock();
    try {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = submissionQueue) != null) {    
            long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; 
            // 获取top对应的数组位置(内存地址)
            UNSAFE.putOrderedObject(q, u, t); // 将新的task添加到队列中
            queueTop = s + 1;//更新队列头的位置
            if (s - queueBase == m)
                growSubmissionQueue(); // 在添加之后检查队列长度是否到极限 如果是则扩容
        }
    } finally {
        lock.unlock();
    }
    signalWork(); // 唤醒工作线程
}

方法结尾处会唤醒(或创建) worker线程 (创建用addWorker()) 而worker线程被创建之后 就会不断的调用scan方法去窃取其他worker或pool中的task 直到全部task结束 这里需要特别说明的地方有2点:

  1. (s = queueTop) & (m = q.length-1) ,相当于queueTop % (q.length - 1),返回的是队列下标(queueTop或queueBase)在环形队列数组中的真实位置,即数组中的Index。

  2. (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; 通过直接访问指定内存地址来替换和获取元素。

查看ASHIFT和ABASE的来源:

static {
    int s;
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class a = ForkJoinTask[].class;
        ABASE = UNSAFE.arrayBaseOffset(a);
        s = UNSAFE.arrayIndexScale(a);
    } catch (Exception e) {
        throw new Error(e);
    }
    if ((s & (s-1)) != 0)
        throw new Error("data type scale not a power of two");
    ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
}

此处的特殊的求余算法,就是为什么需要submissionQueue是2的整数幂次方 的原因!!!! 两个2的整数次幂方(x%y,其中x必须小余y) 余方法:x&(y-1)

Java数组在实际存储时有一个对象头,后面才是实际的数组数据,而UNSAFE.arrayBaseOffset就是用来获取实际数组数据的偏移量UNSAFE.arrayIndexScale则是获取对应数组元素占的字节数。这里的代码ABASE=16(数组对象头大小),s=4(ForkJoinTask对象引用占用字节数),ASIFT=2。

所以上面的Index << ASHIFT + ABASE合起来就是Index左移2位=Index*4,也就是算Index的在数组中的偏移量,再加上ABASE就是Index在对象中的偏移量。也就是那一行代码主要就是算出来queueTop在队列数组中的实际偏移量。

nativa函数:UNSAFE.putOrderedObject(q, u, t); 能够保证写写不会被重排序,但是不保证写会对其它线程可见;而volatile变量既保证写写不会被重排序,也保证写后对其它线程立即可见。可见Unsafe.putOrderedObject会比直接的volatile变量赋值速度会一点,这篇文章(需要翻墙)则指出Unsafe.putOrderedObject会比volatile写快3倍。

为什么要保证写不会重排序?线程安全性由subbmissionLock保证 因为只有保证了写不重排序,才能使用上面基于偏移的方式寻找queue中的元素地址。 那么为什么需要基于偏移的方式需找地址?这需要对比不使用上述方式插入task到submissionQueue。 如果自己实现,那么先根据queueTop找到下一个数组index,然后在数组中放入task。 总体来说,就是为了高效的插入task到数组中。[感觉理解不到位啊~,还有别的原因?]

从addSubmission源码中不难发现,addSubmission的核心是:growSubmissionQueuesignalWork

growSubmissionQueue

growSubmissionQueue主要是完成扩容功能(当容量为0或者对象为null,则创建)。

 /**
 * Creates or doubles submissionQueue array.
 * Basically identical to ForkJoinWorkerThread version.
 */
     /**
     * Creates or doubles submissionQueue array.
     * Basically identical to ForkJoinWorkerThread version.
     */
private void growSubmissionQueue() {
    ForkJoinTask[] oldQ = submissionQueue;
    // 为null则初始化size,否则容量翻倍。
    int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY; 
    if (size > MAXIMUM_QUEUE_CAPACITY)
        throw new RejectedExecutionException("Queue capacity exceeded"); 
    if (size < INITIAL_QUEUE_CAPACITY)
        size = INITIAL_QUEUE_CAPACITY;

    ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size];
    int mask = size - 1;
    int top = queueTop;
    int oldMask;
    if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) { 
        // 如果旧队列中有数据 则将其数据填充到新的队列数组中 
        for (int b = queueBase; b != top; ++b) {
            long u = ((b & oldMask) << ASHIFT) + ABASE; // 根据旧的mask获取元素在旧队列中的位置
            Object x = UNSAFE.getObjectVolatile(oldQ, u); // 获取元素

            // 获取后判断旧的队列中是否存在此元素 如果不存在则取消将其加入新的队列 因为在两个步骤之间 
            // task可能已经被执行过了 (其他线程或许会持有旧队列数组的引用) 反之 如果存在 则加入到新的队列中
            if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
                UNSAFE.putObjectVolatile
                    (q, ((b & mask) << ASHIFT) + ABASE, x);
        }
    }
}

signalWork 唤醒线程

while为ture 的条件:(worker总数量少 | 有至少一个等待中)and (active状态的worker太少 | 线程池正在结束)。

e是线程池控制字段,意义:

  • >0:释放一个waiter,唤醒线程
  • =0:没有等待创建的worker
  • <0:线程池正在关闭

创建worker主要是addWorker完成。

/**
 * Wakes up or creates a worker.
 */
final void signalWork() {
    long c; int e, u;
    while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
            (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {
        if (e > 0) {                         // release a waiting worker
            int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
            if ((ws = workers) == null ||
                (i = ~e & SMASK) >= ws.length ||
                (w = ws[i]) == null)
                break;
            long nc = (((long)(w.nextWait & E_MASK)) |
                       ((long)(u + UAC_UNIT) << 32));
            if (w.eventCount == e &&
                UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
                w.eventCount = (e + EC_UNIT) & E_MASK;
                if (w.parked)
                    UNSAFE.unpark(w);
                break;
            }
        }
        else if (UNSAFE.compareAndSwapLong
                 (this, ctlOffset, c,
                  (long)(((u + UTC_UNIT) & UTC_MASK) |
                         ((u + UAC_UNIT) & UAC_MASK)) << 32)) {
            addWorker();
            break;
        }
    }
}

addWorker主要使用ForkJoinWorkerThreadFactory生成worker线程。 源码如下:

private void addWorker() {
    Throwable ex = null;
    ForkJoinWorkerThread t = null;
    try {
        t = factory.newThread(this); 
        // 创建worker线程 下面的代码不解释了 只是根据发生异常的情况来决定是否告知调用者
    } catch (Throwable e) {
        ex = e;
    }
    if (t == null) {  // null or exceptional factory return
        long c;       // adjust counts
        do {} while (!UNSAFE.compareAndSwapLong
                     (this, ctlOffset, c = ctl,
                      (((c - AC_UNIT) & AC_MASK) |
                       ((c - TC_UNIT) & TC_MASK) |
                       (c & ~(AC_MASK|TC_MASK)))));
        // Propagate exception if originating from an external caller
        if (!tryTerminate(false) && ex != null &&
            !(Thread.currentThread() instanceof ForkJoinWorkerThread))
            UNSAFE.throwException(ex);
    }
    else
        t.start(); // 启动线程
}

worker线程流程

创建完线程后,线程直接启动了。Thread的start()会调用run()。

public void run() {
    Throwable exception = null;
    try {
        onStart(); // 首先初始化当前worker线程
        pool.work(this); // 调用pool将自己注册到pool中并表示自己可以开始工作
    } catch (Throwable ex) {
        exception = ex;
    } finally {
        onTermination(exception); 
        // 这个方法主要是将当前线程置为终结状态 在work方法种可以看到线程在获取task的时候是根据这个状态轮询的 
        // 一旦设置为false 就不再接收其他的task 另外也记录了这个线程发生的异常
    }
}

初始化工作,主要初始化窃取目标:

protected void onStart() {
    queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // 初始化队列
    int r = pool.workerSeedGenerator.nextInt(); // 初始化工作窃取的时候,要窃取的线程对象
    seed = (r == 0) ? 1 : r; //  must be nonzero
}

每个worker线程基本上在调用到这个work()方法后,就会一直循环,重复着 工作窃取、执行线程本身队列task的过程除非在执行task过程中发生异常或者pool被shutdown 或者按需调整工作线程总数,导致该线程被回收。

final void work(ForkJoinWorkerThread w) {
    boolean swept = false;                // true on empty scans
    long c;

    while (!w.terminate && (int)(c = ctl) >= 0) { 
        // 这个terminate就是上面onTermination(exeception)方法设置的状态位
        int a;                            // active count
        if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
            swept = scan(w, a); // 扫描
        else if (tryAwaitWork(w, c))
            swept = false;
    }
}

scan()如果返回false,则会让work()方法中的循环继续调用scan;返回true则会调用tryAwaitWork() 也就是等待task。迟早还会调用这个方法。

扫描发现其它任务, scan方法输入参数

  • w:当前worker。
  • a:active状态的worker数量。

scan方法:


private boolean scan(ForkJoinWorkerThread w, int a) {

    int g = scanGuard; // mask 0 avoids useless scans if only one active
    int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
    ForkJoinWorkerThread[] ws = workers;
    if (ws == null || ws.length <= m)         // staleness check
        return false;           // 安全检查 
    for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
        ForkJoinWorkerThread v = ws[k & m];
        // 以下部分是task窃取的一部分 新启动的worker线程会通过work方法调用到scan方法并且传递自己的对象(也就是w)
        // 到方法中 在下面的代码中他会尝试从现存的worker的队列中窃取一个task ForkJoin框架是在运行的期间不断的分裂
        // 在ForkJoinTask的实现类里调用fork()就可能会创建新的worker 并走到这个分支 所以 在worker数不足的情况下
        // 窃取的几率很高 但是当worker数稳定后 每个worker会给自己分配task 而不是再这样窃取其他线程的task

        if (v != null && (b = v.queueBase) != v.queueTop &&
            (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
            long u = (i << ASHIFT) + ABASE;
            if ((t = q[i]) != null && v.queueBase == b &&
                UNSAFE.compareAndSwapObject(q, u, t, null)) {
			// compareAndSwapObject将被窃取的位置置空,q的u位置上放入t,t原来的位置放入null

                int d = (v.queueBase = b + 1) - v.queueTop; // 看 他窃取了 他窃取了!
                v.stealHint = w.poolIndex;
                if (d != 0)
                    signalWork();// d!=0说明有多个task没有完成,继续创建worker去窃取task
                w.execTask(t); 
            }

            r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
            return false;                     // store next seed
        }
        else if (j < 0) {                     // xorshift
            r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
        }
        else
            ++k;
    }
    if (scanGuard != g)                       // staleness check
        return false;
    else {                                    // try to take submission
        // 第一个task并不是直接被分配到worker的线程里(因为创建task的并不是worker本身) 而是直接进入pool的队列中 
        // 然后调用者线程会主动创建一个新的worker 在上面的逻辑(说实话我看不懂上边的逻辑) 中无法从其他worker中
        // 窃取到task的时候 或者是其他worker分配的task已经执行完毕后 再从pool的队列中获取task

        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
        if ((b = queueBase) != queueTop &&
            (q = submissionQueue) != null &&
            (i = (q.length - 1) & b) >= 0) {
            long u = (i << ASHIFT) + ABASE;
            if ((t = q[i]) != null && queueBase == b &&
                UNSAFE.compareAndSwapObject(q, u, t, null)) {
                queueBase = b + 1;
                w.execTask(t);
            }
            return false;
        }

        return true;                         // all queues empty
    }
    // 无论走了哪个分支 最终都会调用 w.execTask(t); (没task的情况下除外)
}

最后都会进入executeTask

final void execTask(ForkJoinTask<?> t) {
    currentSteal = t; // 当前窃取到的task 在join的时候有用
    for (;;) {
        if (t != null)
            t.doExec(); 
            // 执行具体的task 通常在编写forkjoin的时候 都会在运行期间分裂出其他task
        if (queueTop == queueBase) // 判断自己的队列task是否已经全部完成 如果是则退出方法 
            // 这里需要特殊说明的是 只有当前线程会给自己的队列添加task 也就是说 当前线程如果不再fork task
            // queueTop就不会发生变化 所以这个方法的判断是安全的
            break;
        t = locallyFifo ? locallyDeqTask() : popTask(); // 弹出task
        // 这里唯一需要注意的是 ForkJoin支持FIFO(在外面设置) 如果设置了FIFO 就会跟其它"steal task"的线程一起
        // 从queueBase开始获取task 顺带一提 locallyDeqTask有两个版本 一个是针对其他线程的steal task实现
        // 另外一个是当前线程的实现
    }
    ++stealCount;
    currentSteal = null;
}

doExec()是执行task的基本方法,调用JoinForkTask.exe()实现具体的执行:

    final void doExec() {
        if (status >= 0) {
            boolean completed;
            try {
                completed = exec();
            } catch (Throwable rex) {
                setExceptionalCompletion(rex);
                return;
            }
            if (completed)
                setCompletion(NORMAL); // must be outside try block
        }
    }

具体task是怎么执行的得看JoinForkTask的具体实现。在jdk中的默认抽象实现类的实现如下:

protected final boolean exec() {
    result = compute();
    return true;
}

调用了抽象的compute方法。该方法是一般使用jdk的ForkJoin框架的程序实现。

小结: 先尝试做任务窃取( Work Stealing ),如果不满足条件则到提交队列中获取任务。而ForkJoinWorkerThread线程本身也维护了线程内fork和join任务操作得到的队列,结合起来,总体执行任务的顺序就是:

  1. 线程会先执行ForkJoinWorkerThread对象内维护的任务队列中的任务,即ForkJoinWorkerThread.execTask()方法中的循环实现。通常是LIFO,即去最新的任务。也有特殊情况,这个根据变量locallyFifo的值来判断。
  2. 之后会尝试做任务窃取,尝试从其他线程中获取任务任务窃取条件不满足时,到提交队列中获取提交的任务

Task流程

最核心的方法是forkjoin

fork方法

ForkJoinTask的fork方法实现原理。当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步的执行这个任务,然后立即返回结果。 代码如下:

public final ForkJoinTask<V> fork() {
    ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
    return this;
}

pushTask方法:

final void pushTask(ForkJoinTask t) { 
    ForkJoinTask[] q; int s, m; 
    if ((q = queue) != null) { // ignore if queue removed 
        long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE; UNSAFE.putOrderedObject(q, u, t); 
                    // 将task放入当前线程的队列中 
        queueTop = s + 1; // or use putOrderedInt
        if ((s -= queueBase) <= 2) 
            pool.signalWork(); // 如果队列中的未处理task小于2 则唤醒新的worker
            // task分解会一分为二,此时如果task <= 2说明可以尝试继续分解,唤醒的线程继续分解task。
        else if (s == m) 
            growQueue(); // 扩容 
    } 
} 

worker获取task

popTask: (当前worker从自己的队列获取任务时,)从队头获取task。

private ForkJoinTask<?> popTask() {
    int m;
    ForkJoinTask<?>[] q = queue;
    if (q != null && (m = q.length - 1) >= 0) {
        for (int s; (s = queueTop) != queueBase;) { // 轮循自己的队列 获取没有被窃取的task 是出栈的过程 
            // 通过工作窃取的索引和 当前worker自己弹出、压入队列的索引 来判断是否有剩余元素
            int i = m & --s;
            long u = (i << ASHIFT) + ABASE; // raw offset
            ForkJoinTask<?> t = q[i];
            if (t == null)    
                // 获取task的时候如果task已经为空 则被其他线程窃取掉 此时break 将剩下的判断操作委托给外围的execTask处理
                break;
            if (UNSAFE.compareAndSwapObject(q, u, t, null)) { // t位置 置为null 防止steal task的线程重复执行task
                queueTop = s; // or putOrderedInt // 更新top
                return t;
            }
        }
    }
    return null;
}

polltask:获取本地或者移除本地task | 窃取task

final ForkJoinTask<?> pollTask() {
    ForkJoinWorkerThread[] ws; 
    ForkJoinTask<?> t = pollLocalTask(); // 尝试从自己的队列中获取task
    if (t != null || (ws = pool.workers) == null)
        return t;
    int n = ws.length; // cheap version of FJP.scan
    int steps = n << 1; // 这里限制了尝试从其他workersteal task的次数 看起来貌似是worker容器长度的2倍
    int r = nextSeed();
    int i = 0;
    while (i < steps) {// 从其他的worker的task队列中steal task
        ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];

        // 先判断具体的worker队列中是否存在task 不存在则换到其他的worker 
        if (w != null && w.queueBase != w.queueTop && w.queue != null) {
            // 如果窃取到task则返回 否则重置steps  
            if ((t = w.deqTask()) != null)
                return t;
            i = 0;
        }
    }
    return null;
}

join方法与结果获取

join 方法:

public final V join() {
    if (doJoin() != NORMAL) 
        // doJoin就是调用子类实现的exec方法然后根据运行状况 设置不同的状态位
        // (比如发生异常设置一个状态 比如task取消是另外一个状态..)
        return reportResult();
    else
        return getRawResult(); // 状态为NORMAL 的时候说明执行完成 直接返回结果即可
}

它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断任务完成情况,进而判断返回什么结果。 任务状态有四种:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。

  • 如果任务状态是已完成,则直接返回任务结果。
  • 如果任务状态是被取消,则直接抛出CancellationException
  • 如果任务状态是抛出异常,则直接抛出对应的异常。
private int doJoin() {
    Thread t; ForkJoinWorkerThread w; int s; boolean completed;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { 
        // 如果调用join方法的线程为worker 则尝试让worker本身执行
        if ((s = status) < 0) // 判断task是否已经得到结果 得到结果则直接返回
            return s;
        if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) { 
            // unpushTask会检查queueTop位置上是不是当前task 如果是则直接执行
            try {
                completed = exec(); // 这里的流程基本上跟doExec()一样
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                return setCompletion(NORMAL);
        }
        return w.joinTask(this); // 调用当前worker线程的joinTask等待其安排task执行
    }
    else
        return externalAwaitDone(); // 否则object wait
}

joinTask如下:

final int joinTask(ForkJoinTask<?> joinMe) {
    // 这个替换有点类似方法调用 假设worker在执行task的时候执行了join() 被join的task同样也调用了join() 所以 像个栈..
    ForkJoinTask<?> prevJoin = currentJoin;
    currentJoin = joinMe;
    for (int s, retries = MAX_HELP;;) {
        if ((s = joinMe.status) < 0) {
            currentJoin = prevJoin;        
            // 所以 当前task的join一旦有了结果 则将currentJoin替换回之前的task
            return s;
        }
        if (retries > 0) {
            if (queueTop != queueBase) {
                if (!localHelpJoinTask(joinMe)) 
                    retries = 0;           // cannot help
                // 检查当前queueTop位置上的task是否是被join的task 如果不是 并且task已经完成 则直接将retries置为0 
                // 进入下一轮循环 如果queueTop位置就是当前task 则执行 返回true后会重新进入for循环并返回执行结果
            }
            else if (retries == MAX_HELP >>> 1) { 
                --retries;                 // check uncommon case 
                if (tryDeqAndExec(joinMe) >= 0) 
                    // 当前task队列中已经没有task 明显要被join的task已经被窃取 所以去其他worker的将task窃取回来并执行
                    Thread.yield();        // for politeness
            }
            else
                retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
                // 帮助其他的worker完成task 走到这个分支只能去steal task了
                // (steal task的代码已经示范过多次 这里就不示范了..)
        }
        else {
            retries = MAX_HELP;           // restart if not done
            pool.tryAwaitJoin(joinMe);
        }
    }
    // 这里需要注意的是 如果当前worker的task没有执行完毕 绝对不会去其他worker steal task
    // 其次 每次循环的第一个判断是判断调用join()的task的状态是否为已完成 也就是说在这期间如果task已经被处理完了 
    // 则直接退出方法 不再尝试steal task 而这个task可能是由子task执行的 也可能是当前线程自己执行的 
    //(即完成了的task在join的时候是直接退出的)
}

总结整体流程

  1. 客户端程序实现ForkJoinTask,主要是实现compute方法。
  2. 使用ForkJoinPool提交任务。
  3. 首次需addSubmission,将task添加到submissionQueue。创建Worker进程,并启动。
  4. worker线程使用scan获取队列中task,并最终调用execTask执行task。
  5. execTask会使用popTask将初始task弹出,然后调用doExec方法执行。
  6. 执行会最终调用自己实现的compute方法。
  7. 在compute方法会使用fork方法实现初始任务划分,将任务划分成小任务。
  8. 每个fork的任务会将自己放入当前线程的队列中,其实此时刚完成初始化分,两task在一个worker中,所以在同一个队列中。
  9. 由于队列的queue中,task数量小于2会执行pool.singleWorker创建/唤醒新的worker
  10. 此时前一个worker fork完成,在work方法的while循环中,继续执行任务(即步骤4中,在execTask会将本worker线程中所有task挨个doExec。);后一个worker窃取前一个worker的任务,并执行。
  11. fork完成后,此时任务分工完成,达到预期细粒度。进入join。
  12. 进入join后,查询任务执行的情况,如果是NORMAL状态的,就调用doJoin,获取返回值,并且使用调用TaskJoin将task与其它task合并(可能会出现工作窃取,并继续执行)。

ForkJoin框架反复递归执行自定义的compute方法,每次调用都会fork,不断划分task,就像一个二叉树不断往下划分子树,最终到达叶节点。然后打到叶节点的开始回溯,使用join合并结果。和最开始的图一样。

划分和合并规则

  1. 当一个任务划分一个新线程时,它将自己推到 deque 的头部。
  2. 当一个任务执行与另一个未完成任务的合并操作时,它会将另一个任务推到队列头部并执行,而不会休眠以等待另一任务完成(像 Thread.join() 的操作一样)。
  3. 当线程的任务队列为空,它将尝试从另一个线程的 deque 的尾部 窃取另一个任务。

参考

  1. http://onlychoice.github.io/blog/2013/09/17/java-concurrent-source-code-reading-3/
  2. jdk帮助文档

© 著作权归作者所有

下一篇: polysh入门
hgfgoodcreate
粉丝 13
博文 60
码字总数 131679
作品 0
海淀
程序员
私信 提问
加载中

评论(1)

引鸩怼孑
引鸩怼孑
fork join is a very very good pool
重做一道Java面试题(Fork/Join)

前几天参加了一场面试,当时有这莫一道题: 老实说,我当时并没有想出来具体该如何实现,只是有个大致的方向,肯定是的思想;这两天我一直在尝试将这些当时没做出来的题想办法做出来,查了一...

since1986
2017/09/17
0
0
读书笔记之《Java并发编程的艺术》-并发编程容器和框架(重要)

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
Java 7 的新特性一览表

官方说是 7月28日 正式发布 Java 7 ,正常的话我们应该在 7月29日 看到这个版本。很快了,就两天时间。 发布之前让我们先来看看 Java 7 都有什么新特性吧。 Java 7 的架构图: 新特性一览表:...

红薯
2011/07/27
76.4K
83
【JDK7】新特性(5) fork/join 框架

对于框架的原理,可以阅读 Doug Lea 的文章“A Java Fork/Join Framework”:了解 Fork/Join 模式的实现机制和执行性能。 原理解析:fork分解,join结合。这个框架的本质是将一个任务分解成多...

5W1H-
2012/12/11
0
0
Java并发教程-7高级并发对象

目前为止,该教程重点讲述了最初作为Java平台一部分的低级别API。这些API对于非常基本的任务来说已经足够,但是对于更高级的任务就需要更高级的API。特别是针对充分利用了当今多处理器和多核...

noday
2014/04/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

js动态设置元素高度

this.$refs.xxx.style.height= this.contentHeight; 元素需要绑定

Carbenson
49分钟前
2
0
今天的学习

今天学到了ci框架中的查询语句的where条件语句: 1、$this->db->select('')->from('')->where('id = ??')->get()->result_array();2、$this->db->select('')->from('')->where('id', '??'......

墨冥
59分钟前
2
0
MySQL在高并发下的订单撮合、系统使用、共享锁与排他锁保证数据一致性

前序 距离上次择文发表,两月余久。2018年也即将要结束了,目前的工作依然是与区块链应用相关的,也很荣幸在9月初受邀签约出版暂名为《区块链以太坊DApp实战开发》一书,预计在明年年初出版。...

我最喜欢三大框架
今天
2
0
深入理解Flutter多线程

该文章属于<简书 — 刘小壮>原创,转载请注明: <简书 — 刘小壮> https://www.jianshu.com/p/54da18ed1a9e Flutter默认是单线程任务处理的,如果不开启新的线程,任务默认在主线程中处理。 ...

刘小壮
今天
3
0
输入n个整数,找出其中最小的K个数。例如输入4,5,1,6,2,7,3,8这8个数字,则最小的4个数字是1,2,3,4,。

//有点投机啦 import java.util.ArrayList; public class Solution { public ArrayList<Integer> GetLeastNumbers_Solution(int [] input, int k) { ArrayList <Integer> s=new ArrayLi......

南桥北木
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部