文档章节

ForkJoin 学习使用笔记

小灰灰Blog
 小灰灰Blog
发布于 2017/09/08 20:39
字数 1273
阅读 342
收藏 23
点赞 0
评论 0

ForkJoin 学习使用笔记

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架

背景

在日常的业务需求中,经常出现的批量查询,批量写入等接口的提供,一般来说,最简单最low的方式就是写一个for循环来一次执行,但是当业务方对接口的性能要求较高时,就比较尴尬了

通常可以想到的方式是采用并发操作,首先想到可以实现的方式就是利用线程池来做

通常实现方式如下

// 1. 创建线程池

ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("biz-exec"),
      new ThreadPoolExecutor.CallerRunsPolicy());

// 2. 创建执行任务
List<Future<Object>> futureList = new ArrayList<>();
for(Object arg : list) {
        futureList.add(executorService.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
              // xxx
            }
        }));
}

// 3. 结果获取
for(Future f: futureList) {
    Object obj = f.get();
}

用上面的这种方式并没有什么问题,我们接下来考虑的是如何使用ForkJoin框架来实现类似的功能

ForkJoin 基本知识

Fork: 将大任务拆分成若干个可以并发执行的小任务

Join: 合并所有小任务的执行结果

forkjoin

任务分割

ForkJoinTask : 基本任务,使用forkjoin框架必须创建的对象,提供fork,join操作,常用的两个子类

  • RecursiveAction : 无结果返回的任务
  • RecursiveTask : 有返回结果的任务

说明:

  1. fork : 让task异步执行
  2. join : 让task同步执行,可以获取返回值
  3. ForkJoinTask 在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行

结果合并

ForkJoinPool 执行 ForkJoinTask

  • 任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。
  • 当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务

三中提交方式:

  1. execute 异步,无返回结果
  2. submit 异步,有返回结果 (返回Future<T>
  3. invoke 同步,有返回结果 (会阻塞)

使用说明

结合两个场景,给出使用姿势

1. 累加

实现从 start - end 的累加求和

首先是定义一个CountTask 来实现求和

首先是确定任务分割的阀值,当 end-start 的差值大于阀值时,将任务一分为二

public class CountTask extends RecursiveTask<Integer> {

    private int start;
    private int end;

    private static final int THRED_HOLD = 30;


    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRED_HOLD;
        if (canCompute) { // 不需要拆分
            for (int i = start; i <= end; i++) {
                sum += i;
            }

            System.out.println("thread: " + Thread.currentThread() + " start: " + start + " end: " + end);
        } else {
            int mid = (end + start) / 2;
            CountTask left = new CountTask(start, mid);
            CountTask right = new CountTask(mid + 1, end);
            left.fork();
            right.fork();

            sum = left.join() + right.join();
        }
        return sum;
    }
}

调用case

@Test
public void testFork() throws ExecutionException, InterruptedException {
    int start = 0;
    int end = 200;

    CountTask task = new CountTask(start, end);
    ForkJoinPool pool = ForkJoinPool.commonPool();
    Future<Integer> ans = pool.submit(task);
    int sum = ans.get();
    System.out.println(sum);
}

输出结果:

thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 51 end: 75
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 101 end: 125
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 0 end: 25
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 126 end: 150
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 76 end: 100
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 151 end: 175
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 26 end: 50
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 176 end: 200
20100

2. 排序

int 数组进行排序

同样先定义一个SortTask, 主要是为了演示ForkJoin的使用姿势,具体的排序和合并的逻辑比较简陋的实现了一下(这块不是重点)

public class SortTask extends RecursiveTask<List<Integer>> {

    private List<Integer> list;

    private final static int THRESHOLD = 5;

    public SortTask(List<Integer> list) {
        this.list = list;
    }

    @Override
    protected List<Integer> compute() {
        if (list.size() < THRESHOLD) {
            Collections.sort(list);

            System.out.println("thread: " + Thread.currentThread() + " sort: " + list);
            return list;
        }


        int mid = list.size() >> 1;


        SortTask l = new SortTask(list.subList(0,  mid));
        SortTask r = new SortTask(list.subList(mid, list.size()));

        l.fork();
        r.fork();

        List<Integer> left = l.join();
        List<Integer> right = r.join();

        return merge(left, right);
    }

    private List<Integer> merge(List<Integer> left, List<Integer> right) {
        List<Integer> result = new ArrayList<>(left.size() + right.size());

        int rightIndex = 0;
        for (int i = 0; i < left.size(); i++) {
            if (rightIndex >= right.size() || left.get(i) <= right.get(rightIndex)) {
                result.add(left.get(i));
            } else {
                result.add(right.get(rightIndex++));
                i -= 1;
            }
        }

        if (rightIndex < right.size()) {
            result.addAll(right.subList(rightIndex, right.size()));
        }

        return result;
    }
}

测试case和上面基本一样,我们改用 invoke 替换上面的 submit

@Test
public void testMerge() throws ExecutionException, InterruptedException {
    List<Integer> list = Arrays.asList(100, 200, 150, 123, 4512, 3414, 3123, 34, 5412, 34, 1234, 893, 213, 455, 6, 123, 23);
    SortTask sortTask = new SortTask(list);
    ForkJoinPool pool = ForkJoinPool.commonPool();
    List<Integer> ans = pool.invoke(sortTask);
    System.out.println(ans);
}

输出结果

thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [34, 3123, 3414, 4512]
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] sort: [100, 123, 150, 200]
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [34, 893, 1234, 5412]
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [213, 455]
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [6, 23, 123]
[6, 23, 34, 34, 100, 123, 123, 150, 200, 213, 455, 893, 1234, 3123, 3414, 4512, 5412]

参考

其他

个人博客:一灰的个人博客

公众号获取更多:

个人信息

© 著作权归作者所有

共有 人打赏支持
小灰灰Blog
粉丝 161
博文 152
码字总数 261883
作品 0
武汉
程序员
RxJS学习笔记(1)

RxJS 概述 RxJS全名Reactive Extensions for JavaScript,起源于Reactive Extensions项目,该项目主要实现各种语言的响应式编程(Reactive programming)库,被认为是观察者模式与函数式编程...

谦啸 ⋅ 01/02 ⋅ 0

Java 8 的 JVM 有多快?Fork-Join 性能基准测试

Java 8 已经发布一段时间了,许多开发者已经开始使用 Java 8。本文也将讨论最新发布在 JDK 中的并发功能更新。事实上,JDK 中已经有多处 改动,但本文重点将是 Fork-Join 框架的改进。我们将...

OneAPM蓝海讯通 ⋅ 2015/12/24 ⋅ 0

Java7中的ForkJoin并发框架初探(中)——JDK中实现简要分析

根据前文描述的Doug Lea的理论基础,在JDK1.7中已经给出了Fork Join的实现。在Java SE 7的API中,多了ForkJoinTask、ForkJoinPool、ForkJoinWorkerThread、RecursiveAction、RecursiveTask这...

Nori ⋅ 2016/05/20 ⋅ 0

just_一米阳光/parallel-task

parallel-task 项目介绍 构建任务依赖图,可多作业并行工作,构建一组任务执行单元,跟踪任务执行时间和状态,可增加页面控制任务是否执行。 关键设计 特性说明 支持多Job并行执行,各个Job...

just_一米阳光 ⋅ 06/07 ⋅ 0

playframework 2.x 自定义routes文件报错:Router not found: conf/sanguosha

我把默认的routes(conf/routes)文件删除了 然后,新建了一个conf/sanguosha 文件 然后,在application.conf里面配置了 application.router=conf/sanguosha 运行,却报错了: play.api.Con...

西夏一品堂 ⋅ 2014/05/01 ⋅ 0

Java读取文件夹大小的6种方法及代码

这篇文章介绍了JAVA读取文件夹大小的几种方法实例,有需要的朋友可以参考一下。 (一)单线程递归方式 package com.taobao.test; import java.io.File; public class TotalFileSizeSequential...

天蚕宝衣 ⋅ 2016/03/31 ⋅ 0

java并发的四种风味:Thread、Executor、ForkJoin和Actor

本文由 ImportNew - shenggordon 翻译自 Oleg Shelajev。欢迎加入翻译小组。转载请见文末要求。 这篇文章讨论了Java应用中并行处理的多种方法。从自己管理Java线程,到各种更好几的解决方法,...

fdhay ⋅ 2016/03/31 ⋅ 0

【JDK7】新特性(5) fork/join 框架

对于框架的原理,可以阅读 Doug Lea 的文章“A Java Fork/Join Framework”:了解 Fork/Join 模式的实现机制和执行性能。 原理解析:fork分解,join结合。这个框架的本质是将一个任务分解成多...

12qw90op ⋅ 2012/12/11 ⋅ 0

Java并发系列6-Fork/Join框架

声明:原创文章,转载请注明出处。http://www.jianshu.com/u/e02df63eaa87 一、介绍 1、Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终...

唐影若凡 ⋅ 2017/07/27 ⋅ 0

《Pro ASP.NET MVC 3 Framework》学习笔记目录

《Pro ASP.NET MVC 3 Framework》简介: 作者: Adam Freeman 和 Steven Sanderson 出版社: Apress; New 平装: 820页 语种: 英语 ISBN: 1430234040 声明:笔记里面按我自己的理解翻译了大部分...

mszhangxuefei ⋅ 2012/02/07 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Gitee 生成并部署SSH key

1.如何生成ssh公钥 你可以按如下命令来生成 sshkey: ssh-keygen -t rsa -C "xxxxx@xxxxx.com" # Generating public/private rsa key pair...# 三次回车即可生成 ssh key 查看你的 ...

晨猫 ⋅ 43分钟前 ⋅ 0

zblog2.3版本的asp系统是否可以超越卢松松博客的流量[图]

最近访问zblog官网,发现zlbog-asp2.3版本已经进入测试阶段了,虽然正式版还没有发布,想必也不久了。那么作为aps纵横江湖十多年的今天,blog2.2版本应该已经成熟了,为什么还要发布这个2.3...

原创小博客 ⋅ 今天 ⋅ 0

聊聊spring cloud的HystrixCircuitBreakerConfiguration

序 本文主要研究一下spring cloud的HystrixCircuitBreakerConfiguration HystrixCircuitBreakerConfiguration spring-cloud-netflix-core-2.0.0.RELEASE-sources.jar!/org/springframework/......

go4it ⋅ 今天 ⋅ 0

二分查找

二分查找,也称折半查找、二分搜索,是一种在有序数组中查找某一特定元素的搜索算法。搜素过程从数组的中间元素开始,如果中间元素正好是要查找的元素,则搜素过程结束;如果某一特定元素大于...

人觉非常君 ⋅ 今天 ⋅ 0

VS中使用X64汇编

需要注意的是,在X86项目中,可以使用__asm{}来嵌入汇编代码,但是在X64项目中,再也不能使用__asm{}来编写嵌入式汇编程序了,必须使用专门的.asm汇编文件来编写相应的汇编代码,然后在其它地...

simpower ⋅ 今天 ⋅ 0

ThreadPoolExecutor

ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, ......

4rnold ⋅ 昨天 ⋅ 0

Java正无穷大、负无穷大以及NaN

问题来源:用Java代码写了一个计算公式,包含除法和对数和取反,在页面上出现了-infinity,不知道这是什么问题,网上找答案才明白意思是负的无穷大。 思考:为什么会出现这种情况呢?这是哪里...

young_chen ⋅ 昨天 ⋅ 0

前台对中文编码,后台解码

前台:encodeURI(sbzt) 后台:String param = URLDecoder.decode(sbzt,"UTF-8");

west_coast ⋅ 昨天 ⋅ 0

实验楼—MySQL基础课程-挑战3实验报告

按照文档要求创建数据库 sudo sercice mysql startwget http://labfile.oss.aliyuncs.com/courses/9/createdb2.sqlvim /home/shiyanlou/createdb2.sql#查看下数据库代码 代码创建了grade......

zhangjin7 ⋅ 昨天 ⋅ 0

一起读书《深入浅出nodejs》-node模块机制

node 模块机制 前言 说到node,就不免得提到JavaScript。JavaScript自诞生以来,经历了工具类库、组件库、前端框架、前端应用的变迁。通过无数开发人员的努力,JavaScript不断被类聚和抽象,...

小草先森 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部