文档章节

ForkJoinPool线程池

 杨凯123
发布于 2019/12/15 11:28
字数 1152
阅读 24
收藏 0

1.  拆分线程池的使用场景是什么?

答: 是对一组连续的数据进行耗时操作,例如 一个 大小为 1000万 的集合 进行操作。

       例子: 对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。

 

2.   如何使用 ForkJoinPool?
 答: (1)客户端中使用:构建一个任务,将任务推到 线程池中 

          (2)构建任务:

                      2.1 三变量

                          数组数据

                          begin;   //数组数据中开始下标

                          end   //数组数据中结束

                  2.2   compare中 执行,拆分任务(关键是拆分数据),合并

    例子(该代码未运行): 

public class RecursiveActionTest extends RecursiveTask<Integer> {
    private static final long serialVersionUID = -3611254198265061729L;
    //阀值(是数组的大小/线程数算出来的)
    public  final int threshold = 0;
    //这儿应该有一个数组
    private int[] bigNum={1,2,3,.....1000000000000};
    private int start;
    private int end;
    private int threadNum=1;

    public RecursiveActionTest(int start, int end) {
        this.start = start;
        this.end = end;
    }
    
   //设置线程数并设置阀值
    public void init(int threadNum){
        this.threadNum=threadNum;
        threshold = end/threadNum;
    }

    //compute 是一个递归任务
    @Override
    protected Integer compute() {
        int sum = 0;

        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {    //执行加法任务(任务为 对 数组 start位的数据到end位,求和)
            for (int i = start; i <= end; i++) {
                // 每个任务为:sum+i
                sum += bigNum[i];
            }
        } else {
            // 如果任务大于阈值,就分裂成10子任务计算,step:每一份的任务数
            int step = (start + end) /threadNum ;
            ArrayList<RecursiveActionTest> subTaskList = new ArrayList<RecursiveActionTest>();
            int pos = start;
            //设置每个小任务的始起始值和终止值

            for (int i = 0; i < threadNum; i++) {
                int lastOne = pos + step;
                if (lastOne > end) lastOne = end;
                //pos和 lostOne其实对应数组中下标
                RecursiveActionTest subTask = new RecursiveActionTest(pos, lastOne);
                pos += step + 1;
                subTaskList.add(subTask);
                //把子任务推向线程池
                subTask.fork();
            }
            // 等待所有子任务完成,并计算值
            for (RecursiveActionTest task : subTaskList) {
                sum += task.join();
            }
        }

      return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4
        RecursiveActionTest task = new RecursiveActionTest(1, 1000000000000);

        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            System.out.println(result.get());
        } catch (Exception e) {
            System.out.println(e);
        }
    }

}

            

                      

 

 

3. 用 forkJoinPool 将 100 万个 User对象 放进一个 JsonArray,用我的电脑双核 用时 5.6 S,用单线程,一个For循环,用时 6.6 S

 @Test
    public void test2(){
       JsonArray jsonArray= new JsonArray();
        long start = System.currentTimeMillis();
        System.out.println(start);
        for (int i = 0; i <1000000 ; i++) {
            User user = new User();
            user.setId(i);
            user.setAge("232"+i);
            user.setName("张三");
            JsonObject jObj=new JsonObject();
            jObj.addProperty("id", i);
            jObj.addProperty("age", 20);
            jObj.addProperty("name", "张三");

            jsonArray.add(jObj);
        }
        long middel = System.currentTimeMillis();

        //将 JsonArray中数据转换成集合
        List<User> users = JsonUtils.jsonToList(jsonArray.toString(), User.class);
        long end = System.currentTimeMillis();
        System.out.println("将对象一个一个转换成JsonArray:"+(middel-start));


        System.out.println("JsonArray转换成List"+(end-middel));
 }

 

 

package com.xuecheng.manage_cms;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;

import java.util.ArrayList;
import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class ForkJoinListTest extends RecursiveTask<JsonArray> {
    private static final long serialVersionUID = -3611254198265061729L;
    //阀值
    public static  int threshold=0 ;
    private int start;
    private int end;
    //线程数
    private int threadNum = 4;
    public ForkJoinListTest(int start, int end) {
        this.start = start;
        this.end = end;


    }

    /**
     * 初始化阀值   只在主线程中调用
     */
    public void init(){

        threshold = end/threadNum;
    }


    @Override
    protected JsonArray compute() {
        JsonArray bigJsonArray= new JsonArray();

        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {    //执行加法任务
           // JsonArray jsonArray= new JsonArray();
            for (int i = start; i <= end; i++) {
                User user = new User();
                user.setId(i);
                user.setAge("232"+i);
                user.setName("张三");
                JsonObject jObj=new JsonObject();
                jObj.addProperty("id", i);
                jObj.addProperty("age", 20);
                jObj.addProperty("name", "张三");

                bigJsonArray.add(jObj);
            }
        } else {
            // 如果任务大于阈值,就分裂成10子任务计算,step:每一份的任务数
            int step = (start + end) / threadNum;
            ArrayList<ForkJoinListTest> subTaskList = new ArrayList<ForkJoinListTest>();
            int pos = start;
            //设置每个小任务的始起始值和终止值

            for (int i = 0; i < threadNum; i++) {
                int lastOne = pos + step;
                if (lastOne > end) lastOne = end;

                ForkJoinListTest subTask = new ForkJoinListTest(pos, lastOne);
                pos += step + 1;
                subTaskList.add(subTask);
                //把子任务推向线程池
                subTask.fork();
            }
            // 等待所有子任务完成,并计算值

            for (ForkJoinListTest task : subTaskList) {

                bigJsonArray.addAll(task.join());
            }
        }

        return bigJsonArray;
    }


    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4
        ForkJoinListTest task = new ForkJoinListTest(1, 1000000);
        //需要初始化阀值
        task.init();
        long begain = System.currentTimeMillis();
        //执行一个任务
        Future<JsonArray> result = forkjoinPool.submit(task);

        try {
            JsonArray jsonElements = result.get();
            long end = System.currentTimeMillis();
           // System.out.println(jsonElements);
            System.out.println("耗时:"+(end-begain));
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

 

© 著作权归作者所有

上一篇: Java题目
下一篇: 常用工具
粉丝 0
博文 65
码字总数 26922
作品 0
天水
私信 提问
ForkJoinPool和并行流

目录 一、ForkJoinPool核心原理 二、常用方法 三、应用示例 内容 一、ForkJoinPool核心原理 ForkJoinPool是JDK7引入的线程池,核心思想是将大的任务拆分成多个小任务(即fork),然后在将多个...

u012557298
2018/02/05
0
0
Java7中的ForkJoin并发框架初探(中)——JDK中实现简要分析

根据前文描述的Doug Lea的理论基础,在JDK1.7中已经给出了Fork Join的实现。在Java SE 7的API中,多了ForkJoinTask、ForkJoinPool、ForkJoinWorkerThread、RecursiveAction、RecursiveTask这...

Nori
2016/05/20
202
0
死磕 java线程系列之ForkJoinPool深入解析

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本。 注:本文基于ForkJoinPool分治线程池类。 简介 随着在硬件上多核处理器的发展和广泛使用,并发编程成为程序...

彤哥读源码
2019/11/09
68
0
Java Fork/Join 框架

简介 从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。 这种思想和MapReduce很像(input...

Java工程师-Distance
2018/05/23
47
0
Java7中的Fork/Join框架,到底在什么情况下才会采用工作窃取算法?

在学习《Java 7并发编程实战手册.pdf》第五章时,有一段话让我思考很久都很难理解,测试结果也不准确和信服,还请大家指教。 “当你使用同步方法,调用这些方法(比如:invokeAll()方法)的任...

JarvisZhu
2016/11/19
961
1

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 宇宙银河乱弹英雄传 —— @FalconChen

1Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @巴拉迪维 :Axxis的单曲《Only God Knows》 最近只听摇滚,挖了好多以前没听过的歌,蛮好。#今日歌曲推荐# 《Only God Knows》- Axxis 手机...

小小编辑
今天
137
1
Safari Date() 函数对日期时间字符串(yyyy-MM-dd HH:mm:ss) 提示NaN的问题

今天发现一个奇怪的问题,在iPhone使用 safari 选择定时发布文章到OSC,选择时间后提示不是合法的时间,判断时间的代码如下: var d = new Date('2020-01-23 23:15'); if (isNaN(d)) {...

FalconChen
昨天
124
0
ActiveMQ学习之通讯协议

一、支持的通讯协议 ActiveMQ支持的client-broker通讯协议有:TCP、NIO、UDP、SSL、HTTP(S)、VM 其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的<TransportCon...

冥焱
昨天
91
0
应急广播户户通平台

一、平台概述 应急广播户户通平台为软硬一体化广播服务解决方案。实现了应急广播、视音频及图片文字信息、调频及数字广播FM、天气预报信息接收功能,以及视音频播放、智能机器人、电子日历等...

neocean
昨天
133
0
如何为Apache 2.2启用mod_rewrite

我已经在我的Vista机器上安装了新的Apache 2.2,一切正常,除了mod重写。 我没有注释 LoadModule rewrite_module modules/mod_rewrite.s 但是我的重写规则都没有,即使是简单的重写规则 Re...

javail
昨天
53
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部