文档章节

浅谈Java的Fork/Join并发框架

珂jack
 珂jack
发布于 2017/08/03 20:05
字数 1600
阅读 2665
收藏 175
点赞 6
评论 5

    前几天有写到整合并发结果的文章,于是联想到了Fork/Join。因为在我看来整合并发结果其实就是Fork/Join中的Join步骤。所以今天我就把自己对Fork/Join一些浅显的理解记录下来。

1. Fork/Join是什么

    Oracle的官方给出的定义是:Fork/Join框架是一个实现了ExecutorService接口的多线程处理器。它可以把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率。

    Fork/Join实现了ExecutorService,所以它的任务也需要放在线程池中执行。它的不同在于它使用了工作窃取算法,空闲的线程可以从满负荷的线程中窃取任务来帮忙执行。(我个人理解的工作窃取大意就是:由于线程池中的每个线程都有一个队列,而且线程间互不影响。那么线程每次都从自己的任务队列的头部获取一个任务出来执行。如果某个时候一个线程的任务队列空了,而其余的线程任务队列中还有任务,那么这个线程就会从其他线程的任务队列中取一个任务出来帮忙执行。就像偷取了其他人的工作一样

    Fork/Join框架的核心是继承了AbstractExecutorService的ForkJoinPool类,它保证了工作窃取算法和ForkJoinTask的正常工作。

下面是引用Oracle官方定义的原文:

The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.

As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.

The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.

2. Fork/Join的基本用法

(1)Fork/Join基类

    上文已经提到,Fork/Join就是要讲一个大的任务分割成若干小的任务,所以第一步当然是要做任务的分割,大致方式如下:

if (这个任务足够小){
  执行要做的任务
} else {
  将任务分割成两小部分
  执行两小部分并等待执行结果
}

要实现FrokJoinTask我们需要一个继承了RecursiveTask或RecursiveAction的基类,并根据自身业务情况将上面的代码放入基类的coupute方法中。RecursiveTask和RecursiveAction都继承了FrokJoinTask,它俩的区别就是RecursiveTask有返回值而RecursiveAction没有。下面是我做的一个选出字符串列表中还有"a"的元素的Demo:

    @Override
    protected List<String> compute() {
        // 当end与start之间的差小于阈值时,开始进行实际筛选
        if (end - this.start < threshold) {
            List<String> temp = list.subList(this.start, end);
            return temp.parallelStream().filter(s -> s.contains("a")).collect(Collectors.toList());
        } else {
            // 如果当end与start之间的差大于阈值时
            // 将大任务分解成两个小任务。
            int middle = (this.start + end) / 2;
            ForkJoinTest left = new ForkJoinTest(list, this.start, middle, threshold);
            ForkJoinTest right = new ForkJoinTest(list, middle, end, threshold);
            // 并行执行两个“小任务”
            left.fork();
            right.fork();
            // 把两个“小任务”的结果合并起来
            List<String> join = left.join();
            join.addAll(right.join());
            return join;
        }
    }

(2)执行类

    做好了基类就可以开始调用了,调用时首先我们需要Fork/Join线程池ForkJoinPool,然后向线程池中提交一个ForkJoinTask并得到结果。ForkJoinPool的submit方法的入参是一个ForkJoinTask,返回值也是一个ForkJoinTask,它提供一个get方法可以获取到执行结果。

代码如下:

        ForkJoinPool pool = new ForkJoinPool();
        // 提交可分解的ForkJoinTask任务
        ForkJoinTask<List<String>> future = pool.submit(forkJoinService);
        System.out.println(future.get());
        // 关闭线程池
        pool.shutdown();

就这样我们就完成了一个简单的Fork/Join的开发。

提示:Java8中java.util.Arrays的parallelSort()方法和java.util.streams包中封装的方法也都用到了Fork/Join。(细心的读者可能注意到我在Fork/Join中也有用到stream,所以其实这个Fork/Join是多余的,因为stream已经实现了Fork/Join,不过这只是一个Demo展示,没有任何实际用处也就无所谓了)

引用官方原文:

One such implementation, introduced in Java SE 8, is used by the java.util.Arrays class for its parallelSort() methods. These methods are similar to sort(), but leverage concurrency via the fork/join framework. Parallel sorting of large arrays is faster than sequential sorting when run on multiprocessor systems. 

Another implementation of the fork/join framework is used by methods in the java.util.streams package, which is part of Project Lambda scheduled for the Java SE 8 release.

附完整代码以便以后参考:

1. 定义抽象类(用于拓展,此例中没有实际作用,可以不定义此类):

import java.util.concurrent.RecursiveTask;

/**
 * Description: ForkJoin接口
 * Designer: jack
 * Date: 2017/8/3
 * Version: 1.0.0
 */
public abstract class ForkJoinService<T> extends RecursiveTask<T>{
    @Override
    protected abstract T compute();
}

2. 定义基类

import java.util.List;
import java.util.stream.Collectors;

/**
 * Description: ForkJoin基类
 * Designer: jack
 * Date: 2017/8/3
 * Version: 1.0.0
 */
public class ForkJoinTest extends ForkJoinService<List<String>> {

    private static ForkJoinTest forkJoinTest;
    private int threshold;  //阈值
    private List<String> list; //待拆分List

    private ForkJoinTest(List<String> list, int threshold) {
        this.list = list;
        this.threshold = threshold;
    }

    @Override
    protected List<String> compute() {
        // 当end与start之间的差小于阈值时,开始进行实际筛选
        if (list.size() < threshold) {
            return list.parallelStream().filter(s -> s.contains("a")).collect(Collectors.toList());
        } else {
            // 如果当end与start之间的差大于阈值时,将大任务分解成两个小任务。
            int middle = list.size() / 2;
            List<String> leftList = list.subList(0, middle);
            List<String> rightList = list.subList(middle, list.size());
            ForkJoinTest left = new ForkJoinTest(leftList, threshold);
            ForkJoinTest right = new ForkJoinTest(rightList, threshold);
            // 并行执行两个“小任务”
            left.fork();
            right.fork();
            // 把两个“小任务”的结果合并起来
            List<String> join = left.join();
            join.addAll(right.join());
            return join;
        }
    }

    /**
     * 获取ForkJoinTest实例
     * @param list  待处理List
     * @param threshold 阈值
     * @return ForkJoinTest实例
     */
    public static ForkJoinService<List<String>> getInstance(List<String> list, int threshold) {
        if (forkJoinTest == null) {
            synchronized (ForkJoinTest.class) {
                if (forkJoinTest == null) {
                    forkJoinTest = new ForkJoinTest(list, threshold);
                }
            }
        }
        return forkJoinTest;
    }
}

3. 执行类 

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

/**
 * Description: Fork/Join执行类
 * Designer: jack
 * Date: 2017/8/3
 * Version: 1.0.0
 */
public class Test {

    public static void main(String args[]) throws ExecutionException, InterruptedException {

        String[] strings = {"a", "ah", "b", "ba", "ab", "ac", "sd", "fd", "ar", "te", "se", "te",
                "sdr", "gdf", "df", "fg", "gh", "oa", "ah", "qwe", "re", "ty", "ui"};
        List<String> stringList = new ArrayList<>(Arrays.asList(strings));

        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinService<List<String>> forkJoinService = ForkJoinTest.getInstance(stringList, 20);
        // 提交可分解的ForkJoinTask任务
        ForkJoinTask<List<String>> future = pool.submit(forkJoinService);
        System.out.println(future.get());
        // 关闭线程池
        pool.shutdown();

    }

}

附源码地址:http://git.oschina.net/jack90john/forkJoin

------------------------------------------------------------------------------

欢迎关注我的个人公众号,推送最新文章

© 著作权归作者所有

共有 人打赏支持
珂jack
粉丝 41
博文 16
码字总数 20976
作品 0
成都
后端工程师
加载中

评论(5)

为为02
为为02
正好用到fork/join,以前都是直接拿线程池做的,代码很是复杂
-TNT-
-TNT-
自己用的最多的场景是,submit进去一个带parallel的stream操作,貌似只有这样才能指定parallel用哪个线程池…
珂jack
珂jack

引用来自“_vince”的评论

maven坐标是什么?找不到,麻烦提供下..感激不尽
不需要用maven,安装jdk 1.7或以上版本即可,如果安装的是1.7需要将例子里的stream修改一下。:grin:
_vince
_vince
maven坐标是什么?找不到,麻烦提供下..感激不尽
_vince
_vince
不明觉厉...还没有碰过,试下
Java 5 、6、 7中新特性

JDK5新特性(与1.4相比)【转】 1 循环 for (type variable : array){ body} for (type variable : arrayList){body} 而1.4必须是: for (int i = 0; i < array.length; i++){ type variabl......

thinkyoung ⋅ 2014/10/14 ⋅ 0

《Java并发编程与高并发解决方案》课程相关手记汇总 - 持续更新

给《Java并发编程与高并发解决方案》课程准备的手记列表,为了方便大家阅读,单独整理成一篇汇总,学习时结合手记效果会更好哦~ 更多手记可点击我的个人首页:http://www.imooc.com/t/598062...

_Jimin_ ⋅ 05/09 ⋅ 0

Java多线程学习(五)线程间通信知识点补充

系列文章传送门: Java多线程学习(一)Java多线程入门 Java多线程学习(二)synchronized关键字(1) java多线程学习(二)synchronized关键字(2) Java多线程学习(三)volatile关键字 Ja...

一只蜗牛呀 ⋅ 04/16 ⋅ 0

书单丨5本Java后端技术书指引你快速进阶

一名Java开发工程师 不仅要对Java语言及特性有深层次的理解 而且需要掌握与Java相关的 框架、生态及后端开发知识 本文涉及多种后端开发需要掌握的技能 对于帮助提高开发能力非常有帮助 NO.1...

Java高级架构 ⋅ 05/30 ⋅ 0

Java高级程序员面试大纲——错过了金三,你还要错过银四吗

跳槽时时刻刻都在发生,但是我建议大家跳槽之前,先想清楚为什么要跳槽。切不可跟风,看到同事一个个都走了,自己也盲目的开始面试起来(期间也没有准备充分),到底是因为技术原因(影响自己...

Java高级架构 ⋅ 04/27 ⋅ 0

Java程序员面试大纲—错过了金三银四,你还要错过2018吗?

跳槽时时刻刻都在发生,但是我建议大家跳槽之前,先想清楚为什么要跳槽。切不可跟风,看到同事一个个都走了,自己也盲目的开始面试起来(期间也没有准备充分),到底是因为技术原因(影响自己...

java高级架构牛人 ⋅ 04/27 ⋅ 0

编写高性能 Java 代码的最佳实践

摘要:本文首先介绍了负载测试、基于APM工具的应用程序和服务器监控,随后介绍了编写高性能Java代码的一些最佳实践。最后研究了JVM特定的调优技巧、数据库端的优化和架构方面的调整。以下是译...

这篇文章 ⋅ 前天 ⋅ 0

浅谈Kotlin(一):简介及Android Studio中配置

浅谈Kotlin(一):简介及Android Studio中配置 浅谈Kotlin(二):基本类型、基本语法、代码风格 浅谈Kotlin(三):类 浅谈Kotlin(四):控制流 前言:   今日新闻:谷歌宣布,将Kotli...

听着music睡 ⋅ 2017/05/18 ⋅ 0

面试必看!2018年4月份阿里最新的java程序员面试题目

目录 技术一面(23问) 技术二面(3大块) 性能优化(21点) 项目实战(34块) JAVA方向技术考察点(15点) JAVA开发技术面试中可能问到的问题(17问) 阿里技术面试1 1.Java IO流的层次结构...

美的让人心动 ⋅ 04/16 ⋅ 0

使用Dubbo中需要注意的事项

一、前言 Dubbo作为高性能RPC框架,已经进入Apache卵化器项目,虽然官方给出了dubbo使用的用户手册,但是大多是一概而过,使用dubbo时候要尽量了解源码,不然会很容易入坑。 二 、服务消费端...

加多 ⋅ 01/02 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Confluence 6 从其他备份中恢复数据

一般来说,Confluence 数据库可以从 Administration Console 或者 Confluence Setup Wizard 中进行恢复。 如果你在恢复压缩的 XML 备份的时候遇到了问题,你还是可以对整个站点进行恢复的,如...

honeymose ⋅ 5分钟前 ⋅ 0

myeclipse10 快速搭建spring boot开发环境(入门)

1.创建一个maven的web项目 注意上面标红的部分记得选上 2.创建的maven目录结构,有缺失的目录可以自己建立目录补充 补充后 这时候一个maven的web项目创建完成 3.配置pom.xml配置文件 <proje...

小海bug ⋅ 17分钟前 ⋅ 0

nginx.conf

=========================================================================== nginx.conf =========================================================================== user nobody; #......

A__17 ⋅ 20分钟前 ⋅ 0

645. Set Mismatch - LeetCode

Question 645. Set Mismatch Solution 思路: 遍历每个数字,然后将其应该出现的位置上的数字变为其相反数,这样如果我们再变为其相反数之前已经成负数了,说明该数字是重复数,将其将入结果r...

yysue ⋅ 34分钟前 ⋅ 0

Python这么强?红包杀手、消息撤回也可以无视,手机App辅助!

论述 标题也许有点不好理解,其实就是一款利用Python实现的可以监控微信APP内的红包与消息撤回的助手。不得不说,这确实是一款大家钟意的神器。 消息撤回是一件很让人恶心的事,毕竟人都是有...

Python燕大侠 ⋅ 49分钟前 ⋅ 0

压缩打包介绍、gzip压缩工具、bzip2压缩工具、xz压缩工具

压缩打包介绍 压缩的好处不仅能节省磁盘空间而且在传输的时候节省传输时间和网络带宽 windows系统下文件带有 .rar .zip .7z 后缀的就是压缩文件 linux系统下则是 .zip, .gz, .bz2, .xz, ...

黄昏残影 ⋅ 54分钟前 ⋅ 0

观察者模式

1.利用java原生类进行操作 package observer;import java.util.Observable;import java.util.Observer;/** * @author shadow * @Date 2016年8月12日下午7:29:31 * @Fun 观察目标 **/......

Cobbage ⋅ 57分钟前 ⋅ 0

Ubuntu打印服务器配置

参考:https://blog.csdn.net/gsls200808/article/details/50950586 https://blog.csdn.net/jiay2/article/details/80252369 https://wiki.gentoo.org/wiki/HPLIP 由于媳妇儿要大量打印资料,......

大熊猫 ⋅ 今天 ⋅ 0

面试的角度诠释Java工程师(二)

原文出处: locality 续言: 相信每一位简书的作者,都会有我这样的思考:怎么写好一篇文章?或者怎么写好一篇技术类的文章?我就先说说我的感悟吧,写文章其实和写程序是一样的。为什么我会...

颖伙虫 ⋅ 今天 ⋅ 0

github中SSH的Key

https://help.github.com/articles/connecting-to-github-with-ssh/ https://help.github.com/articles/testing-your-ssh-connection/ https://help.github.com/articles/adding-a-new-ssh-k......

whoisliang ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部