文档章节

线程池工作窃取实例

go4it
 go4it
发布于 2017/09/11 21:32
字数 484
阅读 23
收藏 0
点赞 0
评论 0

本文主要来展示一下简版的work stealing线程池的实现。

Executors

Executors默认提供了几个工厂方法

/**
     * Creates a thread pool that maintains enough threads to support
     * the given parallelism level, and may use multiple queues to
     * reduce contention. The parallelism level corresponds to the
     * maximum number of threads actively engaged in, or available to
     * engage in, task processing. The actual number of threads may
     * grow and shrink dynamically. A work-stealing pool makes no
     * guarantees about the order in which submitted tasks are
     * executed.
     *
     * @param parallelism the targeted parallelism level
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code parallelism <= 0}
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

    /**
     * Creates a work-stealing thread pool using all
     * {@link Runtime#availableProcessors available processors}
     * as its target parallelism level.
     * @return the newly created thread pool
     * @see #newWorkStealingPool(int)
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

思路

ForkJoinPool主要用到的是双端队列,不过这里我们粗糙的实现的话,也可以不用到deque。

public class WorkStealingChannel<T> {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkStealingChannel.class);

    BlockingDeque<T>[] managedQueues;

    AtomicLongMap<Integer> stat = AtomicLongMap.create();

    public WorkStealingChannel() {
        int nCPU = Runtime.getRuntime().availableProcessors();
        int queueCount = nCPU / 2 + 1;
        managedQueues = new LinkedBlockingDeque[queueCount];
        for(int i=0;i<queueCount;i++){
            managedQueues[i] = new LinkedBlockingDeque<T>();
        }
    }

    public void put(T item) throws InterruptedException {
        int targetIndex = Math.abs(item.hashCode() % managedQueues.length);
        BlockingQueue<T> targetQueue = managedQueues[targetIndex];
        targetQueue.put(item);
    }

    public T take() throws InterruptedException {
        int rdnIdx = ThreadLocalRandom.current().nextInt(managedQueues.length);
        int idx = rdnIdx;
        while (true){
            idx = idx % managedQueues.length;
            T item = null;
            if(idx == rdnIdx){
                item = managedQueues[idx].poll();
            }else{
                item = managedQueues[idx].pollLast();
            }
            if(item != null){
                LOGGER.info("take ele from queue {}",idx);
                stat.addAndGet(idx,1);
                return item;
            }
            idx++;
            if(idx == rdnIdx){
                break;
            }
        }

        //走完一轮没有,则随机取一个等待
        LOGGER.info("wait for queue:{}",rdnIdx);
        stat.addAndGet(rdnIdx,1);
        return managedQueues[rdnIdx].take();
    }

    public AtomicLongMap<Integer> getStat() {
        return stat;
    }
}

测试实例

public class WorkStealingDemo {

    static final WorkStealingChannel<String> channel = new WorkStealingChannel<>();

    static volatile boolean running = true;

    static class Producer extends Thread{
        @Override
        public void run() {
            while(running){
                try {
                    channel.put(UUID.randomUUID().toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer extends Thread{
        @Override
        public void run() {
            while(running){
                try {
                    String value = channel.take();
                    System.out.println(value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void stop(){
        running = false;
        System.out.println(channel.getStat());
    }


    public static void main(String[] args) throws InterruptedException {
        int nCPU = Runtime.getRuntime().availableProcessors();
        int consumerCount = nCPU / 2 + 1;
        for (int i = 0; i < nCPU; i++) {
            new Producer().start();
        }

        for (int i = 0; i < consumerCount; i++) {
            new Consumer().start();
        }

        Thread.sleep(30*1000);
        stop();
    }
}

输出

{0=660972, 1=660613, 2=661537, 3=659846, 4=659918}

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 50
博文 646
码字总数 437254
作品 0
深圳
Java Fork/Join 框架

简介 从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。 这种思想和MapReduce很像(input...

Java工程师-Distance ⋅ 05/23 ⋅ 0

Java7中的Fork/Join框架,到底在什么情况下才会采用工作窃取算法?

在学习《Java 7并发编程实战手册.pdf》第五章时,有一段话让我思考很久都很难理解,测试结果也不准确和信服,还请大家指教。 “当你使用同步方法,调用这些方法(比如:invokeAll()方法)的任...

JarvisZhu ⋅ 2016/11/19 ⋅ 1

java 中的Fork/Join框架

什么是Fork/Join框架 Fork/Join框架是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 我们再通过Fork和Join这两个单词来理...

hgfgoodcreate ⋅ 2016/07/03 ⋅ 1

Java进阶篇:多线程并发实践

关于作者 郭孝星,程序员,吉他手,主要从事Android平台基础架构方面的工作,欢迎交流技术方面的问题,可以去我的Github提issue或者发邮件至guoxiaoxingse@163.com与我交流。 文章目录 一 线...

郭孝星 ⋅ 2017/09/22 ⋅ 0

《Java并发编程的艺术》笔记

《Java并发编程的艺术》 方腾飞 魏鹏 程晓明 著 笔记: 第一章:并发编程的挑战 1、上下文切换: 系统的多线程并发执行,就是CPU通过给每个线程分配CPU时间片来实现的。当一个线程的时间用完...

tsmyk0715 ⋅ 2016/08/20 ⋅ 0

Fork/Join(3):ForkJoinPool之API翻译

  ForkJoinPool是Fork/Join框架的两大核心类之一,这一节先翻译API,具体使用方法和实现原理留待后续章节再谈。   虽然fork/join框架从JDK1.7开始就已经存在,但介绍Fork/Join框架的文章...

Xcafe ⋅ 2016/12/29 ⋅ 0

Java7任务并行执行神器:Fork&Join框架

Fork/Join是什么? Fork/Join框架是Java7提供的并行执行任务框架,思想是将大任务分解成小任务,然后小任务又可以继续分解,然后每个小任务分别计算出结果再合并起来,最后将汇总的结果作为大...

架构之路 ⋅ 2017/12/27 ⋅ 0

Java7中的ForkJoin并发框架初探(中)——JDK中实现简要分析

根据前文描述的Doug Lea的理论基础,在JDK1.7中已经给出了Fork Join的实现。在Java SE 7的API中,多了ForkJoinTask、ForkJoinPool、ForkJoinWorkerThread、RecursiveAction、RecursiveTask这...

Nori ⋅ 2016/05/20 ⋅ 0

ForkJoinPool和并行流

目录 一、ForkJoinPool核心原理 二、常用方法 三、应用示例 内容 一、ForkJoinPool核心原理 ForkJoinPool是JDK7引入的线程池,核心思想是将大的任务拆分成多个小任务(即fork),然后在将多个...

u012557298 ⋅ 02/05 ⋅ 0

java虚拟机并发编程3-4章-阅读笔记

第四章 Scalability and Thread Safety(可扩展性和线程安全) 4.1Managing Threads with ExecutorService(使用ExxtutorService管理线程) 每个ExecutorService代表一个线程池,其目的是将线程...

上官胡闹 ⋅ 2016/10/30 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

JDK1.6和JDK1.7中,Collections.sort的区别,

背景 最近,项目正在集成测试阶段,项目在服务器上运行了一段时间,点击表格的列进行排序的时候,有的列排序正常,有的列在排序的时候,在后台会抛出如下异常,查询到不到数据,而且在另外一...

tsmyk0715 ⋅ 28分钟前 ⋅ 0

spring RESTful

spring RESTful官方文档:http://spring.io/guides/gs/rest-service/ 1. 可以这么去理解RESTful:其实就是web对外提供的一种基于URL、URI的资源供给服务。不是一个原理性知识点。是一个方法论...

BobwithB ⋅ 29分钟前 ⋅ 0

C++ 中命名空间的 5 个常见用法

相信小伙伴们对C++已经非常熟悉,但是对命名空间经常使用到的地方还不是很明白,这篇文章就针对命名空间这一块做了一个叙述。 命名空间在1995年被引入到 c++ 标准中,通常是这样定义的: 命名...

柳猫 ⋅ 32分钟前 ⋅ 0

@Conditional派生注解

@Conditional派生注解(Spring注解版原生的@Conditional作用) 作用:必须是@Conditional指定的条件成立,才给容器中添加组件,配置配里面的所有内容才生效; @Conditional扩展注解 作用(判...

小致dad ⋅ 33分钟前 ⋅ 0

适配器模式

适配器模式 对象适配器 通过私有属性来实现的类适配器 通过继承来实现的接口适配器 通过继承一个默认实现的类实现的

Cobbage ⋅ 36分钟前 ⋅ 0

Java 限流策略

概要 在大数据量高并发访问时,经常会出现服务或接口面对暴涨的请求而不可用的情况,甚至引发连锁反映导致整个系统崩溃。此时你需要使用的技术手段之一就是限流,当请求达到一定的并发数或速...

轨迹_ ⋅ 40分钟前 ⋅ 0

GridView和子View之间的间隙

默认的情况下GridView和子View之间会有一个间隙,原因是GridView为了在子View被选中时在子View周围显示一个框。去掉的办法如下: android:listSelector="#0000" 或 setSelector(new ColorDra...

国仔饼 ⋅ 44分钟前 ⋅ 0

idea插件开发

1 刷新页面要使用多线程 2 调试要使用restart bug 不要去关闭调试的idea 否则再次启动会卡住

林伟琨 ⋅ 44分钟前 ⋅ 0

Java 内存模型

物理机并发处理方案 绝大多数计算任务,并不是单纯依赖 cpu 的计算完成,不可避免需要与内存交互,获取数据。内存要拿到数据,需要和硬盘发生 I/O 操作。计算机存储设备与 cpu 之间的处理速度...

长安一梦 ⋅ 50分钟前 ⋅ 0

思路分析 如何通过反射 给 bean entity 对象 的List 集合属性赋值?

其实 这块 大家 去 看 springmvc 源码 肯定可以找到实现办法。 因为 spirngmvc 的方法 是可以 为 对象 参数里面的 list 属性赋值的。 我也没有看 具体的 mvc 源码实现,我这里只是 写一个 简...

之渊 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部