文档章节

Java并发编程(五) ForkJoinPool的使用

JackieRiver
 JackieRiver
发布于 2018/09/15 17:25
字数 1048
阅读 2672
收藏 0

一.前言

之前在整理线程使用的时候,无意间看到了ForkJoinPool,在JDK1.7时(新)加入的,就学习了如何使用;

二. ForkJoinPool 使用

2.1 ForkJoinPool的使用姿势

ForkJoinPool采用工作窃取算法,将一个大任务根据阈值分割成很多个子任务,最后根据场景是否要合并子任务运算结果;

根据是否需要合并子任务运算结果,任务需要继承抽象类RecursiveAction,RecursiveTask<V>,后者为需要合并子任务结果,泛型为结果类型; 

我们只需要实现抽象方法 protected Void compute(),定义子任务拆分规则和任务算法就可以了;

                                                                           

我们查看类继承图可知其属于Future的一个实现,Future的使用在之前有过介绍;

有2种使用方案:

  • fork : 递归划分子任务,无需合并子任务结果;
  • fork & join : 递归划分子任务,最后合并子任务计算结果;

2.2 Only Fork

不需要合并子任务运算结果的场景;

下面的模拟场景是将集合sender中的元素发送到receiver中;

package com.river.thread;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class ForkJoinPoolTest {

    private final static List<Integer> sender = new ArrayList<Integer>(21000000);

    private final static List<Integer> receiver = new ArrayList<>(21000000);
    private final static List<Integer> receiver2 = new ArrayList<>(21000000);

    private final static AtomicInteger i = new AtomicInteger(0);

    static {
        log.info("prepare data");
        while (i.get() < 21000000) {
            sender.add(i.get());
            i.incrementAndGet();
        }
        log.info("prepare over");
    }


    public static void main(String[] args) throws InterruptedException, ExecutionException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        SendTask sendTask = new SendTask(0, 210000, sender);
        log.info("Task Start !");
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        forkJoinPool.submit(sendTask);
        forkJoinPool.awaitTermination(10, TimeUnit.SECONDS);
        forkJoinPool.shutdown();
        stopWatch.stop();
        log.info("sender.size -> {}", sender.size());
        log.info("receiver.size -> {}", receiver.size());
        log.info("TotalTimeMillis1 -> "+stopWatch.getTotalTimeMillis());
       
    }

    @Slf4j
    @AllArgsConstructor
    public static class SendTask extends RecursiveTask<Void> {
        //定义递归子任务的阈值
        private final static int preSize = 100;

        private int start;

        private int end;

        private List<Integer> tempList;

        private final static AtomicInteger taskId = new AtomicInteger(0);

        /**
         * The main computation performed by this task.
         */
        @Override
        protected Void compute() {
            if (end - start < preSize) {
                //log.info("add start {} to end {}", start, end);
                for (int i = start; i < end; i++) {
                    add(this.tempList.get(i));
                }
            } else {
                int middle = (start + end) / 2;
                RecursiveTask sendTaskLeft = new SendTask(start, middle, this.tempList);
                RecursiveTask sendTaskRight = new SendTask(middle, end, this.tempList);
               
                SendTask.invokeAll(sendTaskLeft, sendTaskRight);
            }
            return null;
        }
           
        //防止并发,list.add()方法并发插入,因为在第一次没有在add方法做同步限制,导致并发,找个好久问题,如果发短信什么的业务操作不需要做同步处理
        public void add(int i){
            synchronized (SendTask.class) {
                 receiver.add(i);
            }
        }

    }
}

我们查看日志:

2018-09-15 16:49:41.383 myAppName [main] INFO  com.river.thread.ForkJoinPoolTest - prepare data
2018-09-15 16:49:46.986 myAppName [main] INFO  com.river.thread.ForkJoinPoolTest - prepare over
2018-09-15 16:49:46.987 myAppName [main] INFO  com.river.thread.ForkJoinPoolTest - Task Start !
2018-09-15 16:49:47.106 myAppName [main] INFO  com.river.thread.ForkJoinPoolTest - sender.size -> 21000000
2018-09-15 16:49:47.108 myAppName [main] INFO  com.river.thread.ForkJoinPoolTest - receiver.size -> 2100000
2018-09-15 16:49:47.108 myAppName [main] INFO  com.river.thread.ForkJoinPoolTest - TotalTimeMillis1 -> 119

我们在查看线程情况,有如下4条线程在执行任务,并且我的电脑就是4核的;

ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
main

在这可以看到,与我们之前使用多线程不同的是,这里将main也作为执行任务线程之一,

2.3 Fork & Join

package com.river.thread;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class ForkJoinPoolTest {

    private final static List<Integer> sender = new ArrayList<Integer>(21000000);

    private final static List<Integer> receiver = new ArrayList<>(21000000);
    private final static List<Integer> receiver2 = new ArrayList<>(21000000);

    private final static AtomicInteger i = new AtomicInteger(0);

    static {
        log.info("prepare data");
        while (i.get() < 21000000) {
            sender.add(i.get());
            i.incrementAndGet();
        }
        log.info("prepare over");
    }


    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int count  = 5000000;
        SumTask sumTask = new SumTask(0, count);
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        ForkJoinTask<Integer> submit = forkJoinPool.submit(sumTask);
        System.out.println(submit.get());
        int s = 0;
        for (int i = 0;i<=count;i++) {
            s +=i;
        }
        System.out.println(s);
    }

    @Slf4j
    @AllArgsConstructor
    public static class SumTask extends RecursiveTask<Integer>{

        private final static int threshold = 5000;

        private int start;

        private int end;


        /**
         * The main computation performed by this task.
         *
         * @return the result of the computation
         */
        @Override
        protected Integer compute() {
            int sum = 0;
            if (end - start < threshold){
                for (int i = start; i< end; i++){
                    sum +=i;
                }
            }else {
                int middle = (start + end) / 2;
                SumTask sumTask = new SumTask(start, middle);
                SumTask sumTask1 = new SumTask(middle, end);
                SumTask.invokeAll(sumTask, sumTask1);
                sum = sumTask.join() + sumTask1.join();
            }
            return sum;
        }
    }
}



三.相关使用

在JDK8中lamdba有个stream操作parallelStream,底层也是使用ForkJoinPool实现的;

我们可以通过Executors.newWorkStealingPool(int parallelism)快速创建ForkJoinPool线程池,无参默认使用CPU数量的线程数执行任务;

© 著作权归作者所有

JackieRiver
粉丝 29
博文 67
码字总数 97025
作品 0
广州
程序员
私信 提问
读书笔记之《Java并发编程的艺术》-并发编程容器和框架(重要)

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
749
1
ForkJoinPool 探索

介绍 “分而治之“是理清思路和解决问题的一个重要的方法。大到系统架构对功能模块的拆分,小到归并排序的实现,无一不在散发着分而治之的思想。在实现分而治之的算法的时候,我们通常使用递...

robinhan
01/10
0
0
Java并发教程-7高级并发对象

目前为止,该教程重点讲述了最初作为Java平台一部分的低级别API。这些API对于非常基本的任务来说已经足够,但是对于更高级的任务就需要更高级的API。特别是针对充分利用了当今多处理器和多核...

noday
2014/04/25
952
0
Java 并发工具包 java.util.concurrent 用户指南

译序 本指南根据 Jakob Jenkov 最新博客翻译,请随时关注博客更新:http://tutorials.jenkov.com/java-util-concurrent/index.html。 本指南已做成中英文对照阅读版的 pdf 文档,有兴趣的朋友...

pior
2015/10/26
207
0
《Hyperledger Fabric官方文档》之关键概念翻译邀请

10月并发网继续组织翻译区块链相关技术,欢迎大家踊跃参加,另外如果你有区块链技术相关文章也欢迎发布在并发网上。本月组织翻译《Hyperledger Fabric官方文档》。 如何领取 通过评论领取想要...

方 腾飞
2018/01/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

nginx主备模式笔记

(1)两台服务器 192.168.17.129 和 192.168.17.131 (2)在两台服务器安装 keepalived 安装 keepalived (1)使用 yum 命令进行安装 yum install keepalived –y (2)安装之后,在 etc 里面...

行者终成事
今天
4
0
004-Docker镜像

Docker镜像 一个通用的私有仓库,可以提升效率 Docker镜像构建分为两种,一种是手动构建,一种是Dockerfile(自动构建) 基于centos镜像构建手动制作nginx镜像 docker run --name testdocker -...

伟大源于勇敢的开始
今天
5
0
OSChina 周一乱弹 —— 我就加班,不去世不休息

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @巴拉迪维 :《For Forever》90后那些小鲜肉歌手中,好像只有花花的歌能吸引我,这小家伙对音乐的感觉真是天才一般!#今日歌曲推荐# 《For F...

小小编辑
今天
9
1
【领会要领】web前端-轻量级框架应用(jQuery基础)

作者 | Jeskson 来源 | 达达前端小酒馆 jquery的安装和语法,jquery的多种选择器,dom操作和jquery事件。 jQuery框架,简介,优势,安装,语法,jQuery选择器,id选择器,类选择器,标记选择...

达达前端小酒馆
今天
6
0
MySQL 常用命令

无须死记硬背,直接 copy 就好。 1. 查看目前 mysql 用户 select user,host,password from mysql.user; 2. 修改 root 密码(使用内置函数修改) set password for root@localhost=password('y......

HuaiAnGG
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部