文档章节

死磕 java线程系列之自己动手写一个线程池

 彤哥读源码
发布于 10/09 08:18
字数 3310
阅读 4020
收藏 193

mythreadpool

(手机横屏看源码更方便)


问题

(1)自己动手写一个线程池需要考虑哪些因素?

(2)自己动手写的线程池如何测试?

简介

线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?本文彤哥将手把手带你写一个可用的线程池。

属性分析

线程池,顾名思义它首先是一个“池”,这个池里面放的是线程,线程是用来执行任务的。

首先,线程池中的线程应该是有类别的,有的是核心线程,有的是非核心线程,所以我们需要两个变量标识核心线程数量coreSize和最大线程数量maxSize。

为什么要区分是否为核心线程呢?这是为了控制系统中线程的数量。

当线程池中线程数未达到核心线程数coreSize时,来一个任务加一个线程是可以的,也可以提高任务执行的效率。

当线程池中线程数达到核心线程数后,得控制一下线程的数量,来任务了先进队列,如果任务执行足够快,这些核心线程很快就能把队列中的任务执行完毕,完全没有新增线程的必要。

当队列中任务也满了,这时候光靠核心线程就无法及时处理任务了,所以这时候就需要增加新的线程了,但是线程也不能无限制地增加,所以需要控制其最大线程数量maxSize。

其次,我们需要一个任务队列来存放任务,这个队列必须是线程安全的,我们一般使用BlockingQueue阻塞队列来充当,当然使用ConcurrentLinkedQueue也是可以的(注意ConcurrentLinkedQueue不是阻塞队列,不能运用在jdk的线程池中)。

最后,当任务越来越多而线程处理却不及时,迟早会达到一种状态,队列满了,线程数也达到最大线程数了,这时候怎么办呢?这时候就需要走拒绝策略了,也就是这些无法及时处理的任务怎么办的一种策略,常用的策略有丢弃当前任务、丢弃最老的任务、调用者自己处理、抛出异常等。

根据上面的描述,我们定义一个线程池一共需要这么四个变量:核心线程数coreSize、最大线程数maxSize、阻塞队列BlockingQueue、拒绝策略RejectPolicy。

另外,为了便于给线程池一个名称,我们再加一个变量:线程池的名称name。

所以我们得出了线程池的属性及构造方法大概如下:

public class MyThreadPoolExecutor implements Executor {
    /**
     * 线程池的名称
     */
    private String name;
    /**
     * 核心线程数
     */
    private int coreSize;
    /**
     * 最大线程数
     */
    private int maxSize;
    /**
     * 任务队列
     */
    private BlockingQueue<runnable> taskQueue;
    /**
     * 拒绝策略
     */
    private RejectPolicy rejectPolicy;

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }
}

任务流向分析

根据上面的属性分析,基本上我们已经得到了任务流向的完整逻辑:

首先,如果运行的线程数小于核心线程数,直接创建一个新的核心线程来运行新的任务。

其次,如果运行的线程数达到了核心线程数,则把新任务入队列。

然后,如果队列也满了,则创建新的非核心线程来运行新的任务。

最后,如果非核心线程数也达到最大了,那就执行拒绝策略。

mythreadpool

代码逻辑大致如下:

    @Override
    public void execute(Runnable task) {
        // 正在运行的线程数
        int count = runningCount.get();
        // 如果正在运行的线程数小于核心线程数,直接加一个线程
        if (count < coreSize) {
            // 注意,这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小
            if (addWorker(task, true)) {
                return;
            }
            // 如果添加核心线程失败,进入下面的逻辑
        }

        // 如果达到了核心线程数,先尝试让任务入队
        // 这里之所以使用offer(),是因为如果队列满了offer()会立即返回false
        if (taskQueue.offer(task)) {
            // do nothing,为了逻辑清晰这里留个空if
            // 【本篇文章由公众号“彤哥读源码”原创】
        } else {
            // 如果入队失败,说明队列满了,那就添加一个非核心线程
            if (!addWorker(task, false)) {
                // 如果添加非核心线程失败了,那就执行拒绝策略
                rejectPolicy.reject(task, this);
            }
        }
    }

创建线程逻辑分析

首先,创建线程的依据是正在运行的线程数量有没有达到核心线程数或者最大线程数,所以我们还需要一个变量runningCount用来记录正在运行的线程数。

其次,这个变量runningCount需要在并发环境下加加减减,所以这里需要使用到Unsafe的CAS指令来控制其值的修改,用了CAS就要给这个变量加上volatile修饰,为了方便我们这里直接使用AtomicInteger来作为这个变量的类型。

然后,因为是并发环境中,所以需要判断runningCount < coreSize(或maxSize)(条件一)的同时修改runningCount CAS加一(条件二)成功了才表示可以增加一个线程,如果条件一失败则表示不能再增加线程了直接返回false,如果条件二失败则表示其它线程先修改了runningCount的值,则重试。

最后,创建一个线程并运行新任务,且不断从队列中拿任务来运行【本篇文章由公众号“彤哥读源码”原创】。

mythreadpool

代码逻辑如下:

    private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判断是不是真的可以创建一个线程
        for (; ; ) {
            // 正在运行的线程数
            int count = runningCount.get();
            // 核心线程还是非核心线程
            int max = core ? coreSize : maxSize;
            // 不满足创建线程的条件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,可以创建线程
            if (runningCount.compareAndSet(count, count + 1)) {
                // 线程的名字
                String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                // 创建线程并启动
                new Thread(() -> {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    // 运行的任务
                    Runnable task = newTask;
                    // 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            // 执行任务
                            task.run();
                        } finally {
                            // 任务执行完成,置为空
                            task = null;
                        }
                    }
                }, threadName).start();

                break;
            }
        }

        return true;
    }

取任务逻辑分析

从队列中取任务应该使用take()方法,这个方法会一直阻塞直至取到任务或者中断,如果中断了就返回null,这样当前线程也就可以安静地结束了,另外还要注意中断了记得把runningCount减一。

    private Runnable getTask() {
        try {
            // take()方法会一直阻塞直到取到任务为止
            return taskQueue.take();
        } catch (InterruptedException e) {
            // 线程中断了,返回null可以结束当前线程
            // 当前线程都要结束了,理应要把runningCount的数量减一
            runningCount.decrementAndGet();
            return null;
        }
    }

好了,到这里我们自己的线程池就写完了,下面我们一起来想想怎么测试呢?

测试逻辑分析

我们再来回顾下自己的写的线程池的构造方法:

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

name,这个随便传;

coreSize,我们假设为5;

maxSize,我们假设为10;

taskQueue,任务队列,既然我们设置的是有边界的,我们就用最简单的ArrayBlockingQueue好吧,容量设置为15,这样里面最多可以存储15条任务;

rejectPolicy,拒绝策略,我们假设使用丢弃当前任务的策略,OK,我们来实现一个。

/**
 * 丢弃当前任务
 */
public class DiscardRejectPolicy implements RejectPolicy {
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        // do nothing
        System.out.println("discard one task");
    }
}

OK,这样一个线程池就创建完成了,下面就是执行任务了,我们假设通过for循环连续不断地添加100个任务好不好。

public class MyThreadPoolExecutorTest {
    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

我们分析下这段程序:

(1)先连续创建了5个核心线程,并执行了新任务;

(2)后面的15个任务进了队列;

(3)队列满了,又连续创建了5个线程,并执行了新任务;

(4)后面的任务就没得执行了,全部走了丢弃策略;

(5)所以真正执行成功的任务应该是 5 + 15 + 5 = 25 条任务;

运行之:

thread name: core_test2
thread name: core_test5
thread name: core_test3
thread name: core_test4
thread name: core_test1
thread name: test6
thread name: test7
thread name: test8
thread name: test9
discard one task
thread name: test10
discard one task
...省略被拒绝的任务
【本篇文章由公众号“彤哥读源码”原创】
discard one task
running: 1570546871851: 2
running: 1570546871851: 8
running: 1570546871851: 7
running: 1570546871851: 6
running: 1570546871851: 5
running: 1570546871851: 3
running: 1570546871851: 4
running: 1570546871851: 1
running: 1570546871851: 10
running: 1570546871851: 9
running: 1570546872852: 14
running: 1570546872852: 20
running: 1570546872852: 19
running: 1570546872852: 17
running: 1570546872852: 18
running: 1570546872852: 16
running: 1570546872852: 15
running: 1570546872852: 12
running: 1570546872852: 13
running: 1570546872852: 11
running: 1570546873852: 21
running: 1570546873852: 24
running: 1570546873852: 23
running: 1570546873852: 25
running: 1570546873852: 22

可以看到,创建了5个核心线程、5个非核心线程,成功执行了25条任务,完成没问题,完美^^。

总结

(1)自己动手写一个线程池需要考虑的因素主要有:核心线程数、最大线程数、任务队列、拒绝策略。

(2)创建线程的时候要时刻警惕并发的陷阱;

彩蛋

我们知道,jdk自带的线程池还有两个参数:keepAliveTime、unit,它们是干什么的呢?

答:它们是用来控制何时销毁非核心线程的,当然也可以销毁核心线程,具体的分析请期待下一章吧。

完整源码

Executor接口

public interface Executor {
    void execute(Runnable command);
}

MyThreadPoolExecutor线程池实现类

/**
 * 自动动手写一个线程池
 */
public class MyThreadPoolExecutor implements Executor {

    /**
     * 线程池的名称
     */
    private String name;
    /**
     * 线程序列号
     */
    private AtomicInteger sequence = new AtomicInteger(0);
    /**
     * 核心线程数
     */
    private int coreSize;
    /**
     * 最大线程数
     */
    private int maxSize;
    /**
     * 任务队列
     */
    private BlockingQueue<runnable> taskQueue;
    /**
     * 拒绝策略
     */
    private RejectPolicy rejectPolicy;
    /**
     * 当前正在运行的线程数【本篇文章由公众号“彤哥读源码”原创】
     * 需要修改时线程间立即感知,所以使用AtomicInteger
     * 或者也可以使用volatile并结合Unsafe做CAS操作(参考Unsafe篇章讲解)
     */
    private AtomicInteger runningCount = new AtomicInteger(0);

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

    @Override
    public void execute(Runnable task) {
        // 正在运行的线程数
        int count = runningCount.get();
        // 如果正在运行的线程数小于核心线程数,直接加一个线程
        if (count < coreSize) {
            // 注意,这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小
            if (addWorker(task, true)) {
                return;
            }
            // 如果添加核心线程失败,进入下面的逻辑
        }

        // 如果达到了核心线程数,先尝试让任务入队
        // 这里之所以使用offer(),是因为如果队列满了offer()会立即返回false
        if (taskQueue.offer(task)) {
            // do nothing,为了逻辑清晰这里留个空if
        } else {
            // 如果入队失败,说明队列满了,那就添加一个非核心线程
            if (!addWorker(task, false)) {
                // 如果添加非核心线程失败了,那就执行拒绝策略
                rejectPolicy.reject(task, this);
            }
        }
    }

    private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判断是不是真的可以创建一个线程
        for (; ; ) {
            // 正在运行的线程数
            int count = runningCount.get();
            // 核心线程还是非核心线程
            int max = core ? coreSize : maxSize;
            // 不满足创建线程的条件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,可以创建线程
            if (runningCount.compareAndSet(count, count + 1)) {
                // 线程的名字
                String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                // 创建线程并启动
                new Thread(() -> {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    // 运行的任务【本篇文章由公众号“彤哥读源码”原创】
                    Runnable task = newTask;
                    // 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            // 执行任务
                            task.run();
                        } finally {
                            // 任务执行完成,置为空
                            task = null;
                        }
                    }
                }, threadName).start();

                break;
            }
        }

        return true;
    }

    private Runnable getTask() {
        try {
            // take()方法会一直阻塞直到取到任务为止
            return taskQueue.take();
        } catch (InterruptedException e) {
            // 线程中断了,返回null可以结束当前线程
            // 当前线程都要结束了,理应要把runningCount的数量减一
            runningCount.decrementAndGet();
            return null;
        }
    }

}

RejectPolicy拒绝策略接口

public interface RejectPolicy {
    void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor);
}

DiscardRejectPolicy丢弃策略实现类

/**
 * 丢弃当前任务
 */
public class DiscardRejectPolicy implements RejectPolicy {
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        // do nothing
        System.out.println("discard one task");
    }
}

测试类

public class MyThreadPoolExecutorTest {
    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

    }
}

© 著作权归作者所有

彤哥读源码

粉丝 269
博文 67
码字总数 166193
作品 0
深圳
私信 提问
加载中

评论(18)

开源中国首席罗纳尔多
开源中国首席罗纳尔多
您好,请问java的锁关键字的原理是什么?还有socket是调用系统的socket的api吗?
彤哥读源码 博主
(1)synchronized,可以关注我的公众号看synchronized那篇解析; (2)网络请求肯定是调用系统socket的的。
蒋林辉
蒋林辉
看了一下jdk源码,原理差不多相似,给个赞
彤哥读源码 博主
我这相当于简化版的,jdk本身要考虑很多因素,所以代码又臭又长,我这相当于为后面学习线程池源码打个基础,这两篇文章看完了,自己去看jdk的线程池应该也没啥问题了。
彤哥读源码 博主
还有一篇是这个:https://my.oschina.net/u/4108008/blog/3115545,开源中国没有给我推荐,大佬给评价评价呗。
H
HappyMakeCode
应用场景,能帮忙稍微举例
彤哥读源码 博主
应用场景就是线程池的场景,这里只是为了帮助理解线程池的实现原理。
m
moshengren
为啥不设计成线程池主动获取任务?一有空闲线程就获取一个任务分配给线程,这样不需要拒绝机制,性能也会好吧
彤哥读源码 博主
线程池获取任务再分配给空闲线程,一样的问题,而且还要多维护一个空闲线程列表,还不如直接让线程去拿任务。 也需要维护一个任务队列列表,任务队列也会满,任务队列满(且没有空闲线程)了怎么办呢?
m
moshengren
嗯,一样
宇林木风
为啥>符号都被转义了
彤哥读源码 博主
多谢提醒,可能是编辑的时候自动转了,晚上回去我处理一下,另外,可以关注公众号“彤哥读源码”查看,公众号上面是好的。
悲喜世界
悲喜世界
学习,mark
不标准的救火队员
不标准的救火队员
threadPool.execute(()->
JPer
JPer
写的很不错,帮顶
张亦俊
张亦俊
线程代码里扔个error会发生什么咧
彤哥读源码 博主
吃掉。
死磕 java同步系列之CountDownLatch源码解析

问题 (1)CountDownLatch是什么? (2)CountDownLatch具有哪些特性? (3)CountDownLatch通常运用在什么场景中? (4)CountDownLatch的初始次数是否可以调整? 简介 CountDownLatch,可以...

彤哥读源码
06/16
158
0
死磕 java同步系列之CyclicBarrier源码解析——有图有真相

问题 (1)CyclicBarrier是什么? (2)CyclicBarrier具有什么特性? (3)CyclicBarrier与CountDownLatch的对比? 简介 CyclicBarrier,回环栅栏,它会阻塞一组线程直到这些线程同时达到某个...

彤哥读源码
06/28
126
0
【死磕Java并发】—– 死磕 Java 并发精品合集

【死磕 Java 并发】系列是 LZ 在 2017 年写的第一个死磕系列,一直没有做一个合集,这篇博客则是将整个系列做一个概览。 先来一个总览图: 【高清图,请关注“Java技术驿站”公众号,回复:脑...

chenssy
2018/07/22
0
0
死磕 java同步系列之ReentrantReadWriteLock源码解析

问题 (1)读写锁是什么? (2)读写锁具有哪些特性? (3)ReentrantReadWriteLock是怎么实现读写锁的? (4)如何使用ReentrantReadWriteLock实现高效安全的TreeMap? 简介 读写锁是一种特...

彤哥读源码
06/13
113
6
死磕 java同步系列之ReentrantLock VS synchronized——结果可能跟你想的不一样

问题 (1)ReentrantLock有哪些优点? (2)ReentrantLock有哪些缺点? (3)ReentrantLock是否可以完全替代synchronized? 简介 synchronized是Java原生提供的用于在多线程环境中保证同步的...

彤哥读源码
06/11
2.1K
6

没有更多内容

加载失败,请刷新页面

加载更多

Andorid SQLite数据库开发基础教程(2)

Andorid SQLite数据库开发基础教程(2) 数据库生成方式 数据库的生成有两种方式,一种是使用数据库管理工具生成的数据库,我们将此类数据库称为预设数据库,另一种是使用代码生成的数据库。...

大学霸
29分钟前
3
0
YecPad 开源啦!: 基于C#的功能强大的可编辑记事本文本编辑软件

JY Lin 开源:YecPad : 基于C#的功能强大的可编辑记事本文本编辑软件 YecPad 是一款基于C#编程语言开发的功能强大的可编辑记事本文本编辑软件。 可以进行文本文件的打开、保存、删除及编辑功...

YDOOK
36分钟前
3
0
StringBuilder 与 StringBuffer 的区别

StringBuffer是线性安全的,支持并发操作,适合多线程。 StringBuilder线性不安全,不支持并发操作,适合单线程。 也就是说他们俩区别就在于支不支持并发操作,使用上基本上类似...

无名氏的程序员
39分钟前
3
0
js 找数组中的最值

本文转载于:专业的前端网站➸js 找数组中的最值 背景: 2个数组以下 , 比如 [[4, 9, 1, 3], [13, 35, 18, 26], [32, 35, 97, 39], [1000000, 1001, 857, 1]] 找最值的时候,我一开始想用两个...

前端老手
48分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部