文档章节

ForkJoin 学习使用笔记

小灰灰Blog
 小灰灰Blog
发布于 2017/09/08 20:39
字数 1273
阅读 395
收藏 23

ForkJoin 学习使用笔记

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架

背景

在日常的业务需求中,经常出现的批量查询,批量写入等接口的提供,一般来说,最简单最low的方式就是写一个for循环来一次执行,但是当业务方对接口的性能要求较高时,就比较尴尬了

通常可以想到的方式是采用并发操作,首先想到可以实现的方式就是利用线程池来做

通常实现方式如下

// 1. 创建线程池

ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("biz-exec"),
      new ThreadPoolExecutor.CallerRunsPolicy());

// 2. 创建执行任务
List<Future<Object>> futureList = new ArrayList<>();
for(Object arg : list) {
        futureList.add(executorService.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
              // xxx
            }
        }));
}

// 3. 结果获取
for(Future f: futureList) {
    Object obj = f.get();
}

用上面的这种方式并没有什么问题,我们接下来考虑的是如何使用ForkJoin框架来实现类似的功能

ForkJoin 基本知识

Fork: 将大任务拆分成若干个可以并发执行的小任务

Join: 合并所有小任务的执行结果

forkjoin

任务分割

ForkJoinTask : 基本任务,使用forkjoin框架必须创建的对象,提供fork,join操作,常用的两个子类

  • RecursiveAction : 无结果返回的任务
  • RecursiveTask : 有返回结果的任务

说明:

  1. fork : 让task异步执行
  2. join : 让task同步执行,可以获取返回值
  3. ForkJoinTask 在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行

结果合并

ForkJoinPool 执行 ForkJoinTask

  • 任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。
  • 当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务

三中提交方式:

  1. execute 异步,无返回结果
  2. submit 异步,有返回结果 (返回Future<T>
  3. invoke 同步,有返回结果 (会阻塞)

使用说明

结合两个场景,给出使用姿势

1. 累加

实现从 start - end 的累加求和

首先是定义一个CountTask 来实现求和

首先是确定任务分割的阀值,当 end-start 的差值大于阀值时,将任务一分为二

public class CountTask extends RecursiveTask<Integer> {

    private int start;
    private int end;

    private static final int THRED_HOLD = 30;


    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRED_HOLD;
        if (canCompute) { // 不需要拆分
            for (int i = start; i <= end; i++) {
                sum += i;
            }

            System.out.println("thread: " + Thread.currentThread() + " start: " + start + " end: " + end);
        } else {
            int mid = (end + start) / 2;
            CountTask left = new CountTask(start, mid);
            CountTask right = new CountTask(mid + 1, end);
            left.fork();
            right.fork();

            sum = left.join() + right.join();
        }
        return sum;
    }
}

调用case

@Test
public void testFork() throws ExecutionException, InterruptedException {
    int start = 0;
    int end = 200;

    CountTask task = new CountTask(start, end);
    ForkJoinPool pool = ForkJoinPool.commonPool();
    Future<Integer> ans = pool.submit(task);
    int sum = ans.get();
    System.out.println(sum);
}

输出结果:

thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 51 end: 75
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 101 end: 125
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 0 end: 25
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 126 end: 150
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 76 end: 100
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 151 end: 175
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 26 end: 50
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 176 end: 200
20100

2. 排序

int 数组进行排序

同样先定义一个SortTask, 主要是为了演示ForkJoin的使用姿势,具体的排序和合并的逻辑比较简陋的实现了一下(这块不是重点)

public class SortTask extends RecursiveTask<List<Integer>> {

    private List<Integer> list;

    private final static int THRESHOLD = 5;

    public SortTask(List<Integer> list) {
        this.list = list;
    }

    @Override
    protected List<Integer> compute() {
        if (list.size() < THRESHOLD) {
            Collections.sort(list);

            System.out.println("thread: " + Thread.currentThread() + " sort: " + list);
            return list;
        }


        int mid = list.size() >> 1;


        SortTask l = new SortTask(list.subList(0,  mid));
        SortTask r = new SortTask(list.subList(mid, list.size()));

        l.fork();
        r.fork();

        List<Integer> left = l.join();
        List<Integer> right = r.join();

        return merge(left, right);
    }

    private List<Integer> merge(List<Integer> left, List<Integer> right) {
        List<Integer> result = new ArrayList<>(left.size() + right.size());

        int rightIndex = 0;
        for (int i = 0; i < left.size(); i++) {
            if (rightIndex >= right.size() || left.get(i) <= right.get(rightIndex)) {
                result.add(left.get(i));
            } else {
                result.add(right.get(rightIndex++));
                i -= 1;
            }
        }

        if (rightIndex < right.size()) {
            result.addAll(right.subList(rightIndex, right.size()));
        }

        return result;
    }
}

测试case和上面基本一样,我们改用 invoke 替换上面的 submit

@Test
public void testMerge() throws ExecutionException, InterruptedException {
    List<Integer> list = Arrays.asList(100, 200, 150, 123, 4512, 3414, 3123, 34, 5412, 34, 1234, 893, 213, 455, 6, 123, 23);
    SortTask sortTask = new SortTask(list);
    ForkJoinPool pool = ForkJoinPool.commonPool();
    List<Integer> ans = pool.invoke(sortTask);
    System.out.println(ans);
}

输出结果

thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [34, 3123, 3414, 4512]
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] sort: [100, 123, 150, 200]
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [34, 893, 1234, 5412]
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [213, 455]
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [6, 23, 123]
[6, 23, 34, 34, 100, 123, 123, 150, 200, 213, 455, 893, 1234, 3123, 3414, 4512, 5412]

参考

其他

个人博客:一灰的个人博客

公众号获取更多:

个人信息

© 著作权归作者所有

共有 人打赏支持
小灰灰Blog
粉丝 177
博文 185
码字总数 321534
作品 0
武汉
程序员
私信 提问
RxJS学习笔记(1)

RxJS 概述 RxJS全名Reactive Extensions for JavaScript,起源于Reactive Extensions项目,该项目主要实现各种语言的响应式编程(Reactive programming)库,被认为是观察者模式与函数式编程...

谦啸
01/02
0
0
Java 8 的 JVM 有多快?Fork-Join 性能基准测试

Java 8 已经发布一段时间了,许多开发者已经开始使用 Java 8。本文也将讨论最新发布在 JDK 中的并发功能更新。事实上,JDK 中已经有多处 改动,但本文重点将是 Fork-Join 框架的改进。我们将...

OneAPM蓝海讯通
2015/12/24
90
0
Java7中的ForkJoin并发框架初探(中)——JDK中实现简要分析

根据前文描述的Doug Lea的理论基础,在JDK1.7中已经给出了Fork Join的实现。在Java SE 7的API中,多了ForkJoinTask、ForkJoinPool、ForkJoinWorkerThread、RecursiveAction、RecursiveTask这...

Nori
2016/05/20
126
0
just_一米阳光/parallel-task

parallel-task 项目介绍 构建任务依赖图,可多作业并行工作,构建一组任务执行单元,跟踪任务执行时间和状态,可增加页面控制任务是否执行。 关键设计 特性说明 支持多Job并行执行,各个Job...

just_一米阳光
06/07
0
0
playframework 2.x 自定义routes文件报错:Router not found: conf/sanguosha

我把默认的routes(conf/routes)文件删除了 然后,新建了一个conf/sanguosha 文件 然后,在application.conf里面配置了 application.router=conf/sanguosha 运行,却报错了: play.api.Con...

西夏一品堂
2014/05/01
580
0

没有更多内容

加载失败,请刷新页面

加载更多

Alibaba Java诊断利器Arthas实践--使用redefine排查应用奇怪的日志来源

背景 随着应用越来越复杂,依赖越来越多,日志系统越来越混乱,有时会出现一些奇怪的日志,比如: [] [] [] No credential found 那么怎样排查这些奇怪的日志从哪里打印出来的呢?因为搞不清...

hengyunabc
今天
1
0
home hosts

home hosts lwk@qwfys:~$ cat /etc/hosts127.0.0.1 localhost127.0.1.1 qwfys192.168.56.101vm600.qwfys.com39.108.212.91alpha1.ppy.com39.108.117.122alpha2.p......

qwfys
今天
1
0
大数据教程(6.1)hadoop生态圈介绍及就业前景

1. HADOOP背景介绍 1.1、什么是HADOOP 1.HADOOP是apache旗下的一套开源软件平台 2.HADOOP提供的功能:利用服务器集群,根据用户的自定义业务逻辑,对海量数据进行分布式处理 3.HADOOP的核心组...

em_aaron
今天
4
0
hadoop垃圾回收站

在生产生,hdfs回收站必须是开启的,一般设置为7天。 fs.trash.interval 为垃圾回收站保留时间,如果为0则禁用回收站功能。 fs.trash.checkpoint.interval 回收站检查点时间,一般设置为小于...

hnairdb
昨天
3
0
腾讯与Github的魔幻会面背后的故事…

10月22日,腾讯开源管理办公室有幸邀请到Github新晋CEO Nat Friedman,前来鹅厂参观交流。目前腾讯已经有近70个项目在Github上开源,共获得17w stars,世界排名11位。Github是腾讯开源的主阵...

腾讯开源
昨天
19
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部