文档章节

线程池工作窃取实例

go4it
 go4it
发布于 2017/09/11 21:32
字数 484
阅读 23
收藏 0

本文主要来展示一下简版的work stealing线程池的实现。

Executors

Executors默认提供了几个工厂方法

/**
     * Creates a thread pool that maintains enough threads to support
     * the given parallelism level, and may use multiple queues to
     * reduce contention. The parallelism level corresponds to the
     * maximum number of threads actively engaged in, or available to
     * engage in, task processing. The actual number of threads may
     * grow and shrink dynamically. A work-stealing pool makes no
     * guarantees about the order in which submitted tasks are
     * executed.
     *
     * @param parallelism the targeted parallelism level
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code parallelism <= 0}
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

    /**
     * Creates a work-stealing thread pool using all
     * {@link Runtime#availableProcessors available processors}
     * as its target parallelism level.
     * @return the newly created thread pool
     * @see #newWorkStealingPool(int)
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

思路

ForkJoinPool主要用到的是双端队列,不过这里我们粗糙的实现的话,也可以不用到deque。

public class WorkStealingChannel<T> {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkStealingChannel.class);

    BlockingDeque<T>[] managedQueues;

    AtomicLongMap<Integer> stat = AtomicLongMap.create();

    public WorkStealingChannel() {
        int nCPU = Runtime.getRuntime().availableProcessors();
        int queueCount = nCPU / 2 + 1;
        managedQueues = new LinkedBlockingDeque[queueCount];
        for(int i=0;i<queueCount;i++){
            managedQueues[i] = new LinkedBlockingDeque<T>();
        }
    }

    public void put(T item) throws InterruptedException {
        int targetIndex = Math.abs(item.hashCode() % managedQueues.length);
        BlockingQueue<T> targetQueue = managedQueues[targetIndex];
        targetQueue.put(item);
    }

    public T take() throws InterruptedException {
        int rdnIdx = ThreadLocalRandom.current().nextInt(managedQueues.length);
        int idx = rdnIdx;
        while (true){
            idx = idx % managedQueues.length;
            T item = null;
            if(idx == rdnIdx){
                item = managedQueues[idx].poll();
            }else{
                item = managedQueues[idx].pollLast();
            }
            if(item != null){
                LOGGER.info("take ele from queue {}",idx);
                stat.addAndGet(idx,1);
                return item;
            }
            idx++;
            if(idx == rdnIdx){
                break;
            }
        }

        //走完一轮没有,则随机取一个等待
        LOGGER.info("wait for queue:{}",rdnIdx);
        stat.addAndGet(rdnIdx,1);
        return managedQueues[rdnIdx].take();
    }

    public AtomicLongMap<Integer> getStat() {
        return stat;
    }
}

测试实例

public class WorkStealingDemo {

    static final WorkStealingChannel<String> channel = new WorkStealingChannel<>();

    static volatile boolean running = true;

    static class Producer extends Thread{
        @Override
        public void run() {
            while(running){
                try {
                    channel.put(UUID.randomUUID().toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer extends Thread{
        @Override
        public void run() {
            while(running){
                try {
                    String value = channel.take();
                    System.out.println(value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void stop(){
        running = false;
        System.out.println(channel.getStat());
    }


    public static void main(String[] args) throws InterruptedException {
        int nCPU = Runtime.getRuntime().availableProcessors();
        int consumerCount = nCPU / 2 + 1;
        for (int i = 0; i < nCPU; i++) {
            new Producer().start();
        }

        for (int i = 0; i < consumerCount; i++) {
            new Consumer().start();
        }

        Thread.sleep(30*1000);
        stop();
    }
}

输出

{0=660972, 1=660613, 2=661537, 3=659846, 4=659918}

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 52
博文 703
码字总数 504022
作品 0
深圳
Java Fork/Join 框架

简介 从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。 这种思想和MapReduce很像(input...

Java工程师-Distance
05/23
0
0
Java7中的Fork/Join框架,到底在什么情况下才会采用工作窃取算法?

在学习《Java 7并发编程实战手册.pdf》第五章时,有一段话让我思考很久都很难理解,测试结果也不准确和信服,还请大家指教。 “当你使用同步方法,调用这些方法(比如:invokeAll()方法)的任...

JarvisZhu
2016/11/19
547
1
java 中的Fork/Join框架

什么是Fork/Join框架 Fork/Join框架是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 我们再通过Fork和Join这两个单词来理...

hgfgoodcreate
2016/07/03
88
1
Java进阶篇:多线程并发实践

关于作者 郭孝星,程序员,吉他手,主要从事Android平台基础架构方面的工作,欢迎交流技术方面的问题,可以去我的Github提issue或者发邮件至guoxiaoxingse@163.com与我交流。 文章目录 一 线...

郭孝星
2017/09/22
0
0
《Java并发编程的艺术》笔记

《Java并发编程的艺术》 方腾飞 魏鹏 程晓明 著 笔记: 第一章:并发编程的挑战 1、上下文切换: 系统的多线程并发执行,就是CPU通过给每个线程分配CPU时间片来实现的。当一个线程的时间用完...

tsmyk0715
2016/08/20
366
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

centos7安装redis及开机启动

配置编译环境: sudo yum install gcc-c++ 下载源码: wget http://download.redis.io/releases/redis-3.2.8.tar.gz 解压源码: tar -zxvf redis-3.2.8.tar.gz 进入到解压目录: cd redis-3......

hotsmile
46分钟前
0
0
Confluence 6 数据库和临时目录

数据库 所有的其他数据库,包括有页面,内容都存储在数据库中。如果你安装的 Confluence 是用于评估或者你选择使用的是 Embedded H2 Database 数据库。数据库有关的文件将会存储在 database...

honeymose
今天
1
0
day62-20180820-流利阅读笔记

1.今日导读 2.带着问题听讲解 3.新闻正文(中英文对照) 4.重点词汇 5.拓展内容

aibinxiao
今天
0
0
分布式锁实现及对比

一、问题介绍 日常工作中很多场景下需要用到分布式锁,例如:任务运行(多个节点同一时刻同一个任务只能在一个节点上运行(分片任务除外)),交易接受(前端交易请求发送时,可能由于两次提...

yangjianzhou
今天
7
0
【AI实战】快速掌握TensorFlow(二):计算图、会话

在前面的文章中,我们已经完成了AI基础环境的搭建(见文章:Ubuntu + Anaconda + TensorFlow + GPU + PyCharm搭建AI基础环境),以及初步了解了TensorFlow的特点和基本操作(见文章:快速掌握...

雪饼
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部