文档章节

ThreadPoolExecutor策略配置以及应用场景

ChanningBJ
 ChanningBJ
发布于 2017/02/12 18:27
字数 1930
阅读 33
收藏 0

ThreadPoolExecutor 是用来处理异步任务的一个接口,可以将其理解成为一个线程池和一个任务队列,提交到 ExecutorService 对象的任务会被放入任务队或者直接被线程池中的线程执行。ThreadPoolExecutor 支持通过调整构造参数来配置不同的处理策略,本文主要介绍常用的策略配置方法以及应用场景。

ThreadPoolExecutor 的处理逻辑

首先看一下 ThreadPoolExecutor 构造函数的定义:

public ThreadPoolExecutor(int corePoolSize,  //线程池核心线程数量
                              int maximumPoolSize,   //线程池最大线程数量
                              long keepAliveTime,      //线程KeepAlive时间,当线程池数量超过核心线程数量以后,idle时间超过这个值的线程会被终止
                              TimeUnit unit,              //线程KeepAlive时间单位
                              BlockingQueue<Runnable> workQueue,    //任务队列
                              ThreadFactory threadFactory,                  //创建线程的工厂对象
                              RejectedExecutionHandler handler)          //任务被拒绝后调用的handler

ThreadPoolExecutor 对线程池和队列的使用方式如下:

  1. 从线程池中获取可用线程执行任务,如果没有可用线程则使用ThreadFactory创建新的线程,直到线程数达到corePoolSize限制
  2. 线程池线程数达到corePoolSize以后,新的任务将被放入队列,直到队列不能再容纳更多的任务
  3. 当队列不能再容纳更多的任务以后,会创建新的线程,直到线程数达到maxinumPoolSize限制
  4. 线程数达到maxinumPoolSize限制以后新任务会被拒绝执行,调用 RejectedExecutionHandler 进行处理

三种常用的 ThreadPoolExecutor

Executors 是提供了一组工厂方法用于创建常用的 ExecutorService ,分别是 FixedThreadPool,CachedThreadPool 以及 SingleThreadExecutor。这三种ThreadPoolExecutor都是调用 ThreadPoolExecutor 构造函数进行创建,区别在于参数不同。

FixedThreadPool - 线程池大小固定,任务队列无界

下面是 Executors 类 newFixedThreadPool 方法的源码:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

可以看到 corePoolSize 和 maximumPoolSize 设置成了相同的值,此时不存在线程数量大于核心线程数量的情况,所以KeepAlive时间设置不会生效。任务队列使用的是不限制大小的 LinkedBlockingQueue ,由于是无界队列所以容纳的任务数量没有上限。

因此,FixedThreadPool的行为如下:

  1. 从线程池中获取可用线程执行任务,如果没有可用线程则使用ThreadFactory创建新的线程,直到线程数达到nThreads
  2. 线程池线程数达到nThreads以后,新的任务将被放入队列

FixedThreadPool的优点是能够保证所有的任务都被执行,永远不会拒绝新的任务;同时缺点是队列数量没有限制,在任务执行时间无限延长的这种极端情况下会造成内存问题。

SingleThreadExecutor - 线程池大小固定为1,任务队列无界

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

这个工厂方法中使用无界LinkedBlockingQueue,并的将线程数设置成1,除此以外还使用FinalizableDelegatedExecutorService类进行了包装。这个包装类的主要目的是为了屏蔽ThreadPoolExecutor中动态修改线程数量的功能,仅保留ExecutorService中提供的方法。虽然是单线程处理,一旦线程因为处理异常等原因终止的时候,ThreadPoolExecutor会自动创建一个新的线程继续进行工作。

输入图片说明

SingleThreadExecutor 适用于在逻辑上需要单线程处理任务的场景,同时无界的LinkedBlockingQueue保证新任务都能够放入队列,不会被拒绝;缺点和FixedThreadPool相同,当处理任务无限等待的时候会造成内存问题。

CachedThreadPool - 线程池无限大(MAX INT),等待队列长度为1

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

SynchronousQueue是一个只有1个元素的队列,入队的任务需要一直等待直到队列中的元素被移出。核心线程数是0,意味着所有任务会先入队列;最大线程数是Integer.MAX_VALUE,可以认为线程数量是没有限制的。KeepAlive时间被设置成60秒,意味着在没有任务的时候线程等待60秒以后退出。CachedThreadPool对任务的处理策略是提交的任务会立即分配一个线程进行执行,线程池中线程数量会随着任务数的变化自动扩张和缩减,在任务执行时间无限延长的极端情况下会创建过多的线程。

三种ExecutorService特性总结

| 类型 | 核心线程数 | 最大线程数 | Keep Alive 时间 | 任务队列 | 任务处理策略 | |--|---|-------------------|-----------------|---------------------| - | | FixedThreadPool | 固定大小 | 固定大小(与核心线程数相同) | 0 | LinkedBlockingQueue | 线程池大小固定,没有可用线程的时候任务会放入队列等待,队列长度无限制 | | SingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue | 与 FixedThreadPool 相同,区别在于线程池的大小为1,适用于业务逻辑上只允许1个线程进行处理的场景 | | CachedThreadPool | 0 | Integer.MAX_VALUE | 1分钟 | SynchronousQueue | 线程池的数量无限大,新任务会直接分配或者创建一个线程进行执行 |

自定义ThreadPoolExecutor

我们也可以通过修改 ThreadPoolExecutor 的构造函数来自定义任务处理策略。例如面对的业务是将数据异步写入HBase,当HBase严重超时的时候允许写入失败并记录日志以便事后补写。对于这种应用场景,如果使用FixedThreadPool,在HBase服务严重超时的时候会导致队列无限增长,引发内存问题;如果使用CachedThreadPool,会导致线程数量无限增长。对于这种场景,我们可以设置ExecutorService使用带有长度限制的队列以及限定最大线程个数的线程池,同时通过设置RejectedExecutionHandler处理任务被拒绝的情况。

首先定义 RejectedExecutionHandler:

public class MyRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // 处理任务被拒绝的情况,例如记录日志等
        }
    }

创建 ThreadPoolExecutor:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                10,                                                               //核心线程数设置成10
                30,                                                              //线程池最大线程数为30
                30, TimeUnit.SECONDS,                              //超过核心线程数量的线程idle 30秒之后会退出
                new ArrayBlockingQueue<Runnable>(100),       //队列长度为100
                new MyRejectedExecutionHandler()                //任务被拒绝以后的处理类
        );

这样设置以后,如果任务处理函数出现长时间挂起的情况,会依次发生下列现象:

  1. 线程池线程数量达到核心线程数,向ArrayBlockingQueue中放入任务
  2. ArrayBlockingQueue达到上限,创建新的线程进行处理
  3. 线程池中的线程数量达到30个,调用MyRejectedExecutionHandler处理新提交的任务

总结

  • 对于需要保证所有提交的任务都要被执行的情况,使用FixedThreadPool
  • 如果限定只能使用一个线程进行任务处理,使用SingleThreadExecutor
  • 如果希望提交的任务尽快分配线程执行,使用CachedThreadPool
  • 如果业务上允许任务执行失败,或者任务执行过程可能出现执行时间过长进而影响其他业务的应用场景,可以通过使用限定线程数量的线程池以及限定长度的队列进行容错处理。

© 著作权归作者所有

共有 人打赏支持
ChanningBJ

ChanningBJ

粉丝 8
博文 71
码字总数 9621
作品 4
海淀
程序员
concurrent包学习--ThreadPoolExecutor实现

    ThreadPoolExecutor是jdk自带的线程池实现。看到了"池"一定会想到对象池模式,它是单例模式的一个变种,主要思想是通过共享复用已有的空闲对象,达到限制开销和提高性能的目的。这里...

积淀
03/05
0
0
更好的使用JAVA线程池

这篇文章结合Doug Lea大神在JDK1.5提供的JCU包,分别从线程池大小参数的设置、工作线程的创建、空闲线程的回收、阻塞队列的使用、任务拒绝策略、线程池Hook等方面来了解线程池的使用,其中涉...

Float_Luuu
2016/03/27
2.3K
3
驾驭Java线程池:定制与扩展

Executor是一个强大多线程工作框架,其不仅提供了完善的执行策略便于用户使用,还提供多样的接口和参数供用户自定义配置,保证了框架的可扩展性和灵活性。本文将为大家介绍如何配置和使用线程...

登高且赋
2017/11/04
0
0
Java并发编程 -- Executor 框架介绍

前面详细通过源码解释了ThreadPoolExecutor类的运行原理,本篇文章来说一下Executor的框架组成。 Java的线程既是工作单元也是执行单元,从JDK5开始,把工作单元与执行机制分离开来,工作单元...

GordonNemo
03/13
0
0
JAVA线程池学习以及队列拒绝策略

JAVA线程池学习以及队列拒绝策略 为什么要用线程池? 在Java中,如果每当一个请求到达就创建一个新线程,开销是相当大的。在实际使用中,每个请求创建新线程的服务器在创建和销毁线程上花费的...

DemonsI
09/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

聊聊clean code

clean code,顾名思义就是整洁的代码,或者说清晰、漂亮的代码,相信大多数工程师都希望自己能写出这样的代码。 也许这是个千人千面的话题,每个工程师都有自己的理解。比如我,从一个天天被...

Skqing
15分钟前
1
0
redis连接报错—— (error) NOAUTH Authentication required.

1.redis报认证错误 redis客户端连接成功,但是操作报异常——(error) NOAUTH Authentication required 错误的含义是说你没有认证,说明没有使用密码连接 redis-cli -h 127.0.0.1 -p 6379 -a ...

啊哈关关
21分钟前
1
0
地理位置坐标标准以及转换

/** * 地理位置坐标标准以及转换 * * 1.WGS-84原始坐标系,一般用国际GPS纪录仪记录下来的经纬度,通过GPS定位拿到的原始经纬度,Google和高德地图定位的的经纬度(国外)都是基于W...

葉者
23分钟前
1
0
Generator-ES6

基本概念 Generator 函数是 ES6 提供的一种异步编程解决方案,语法行为与传统函数完全不同。 Generator 函数有多种理解角度。语法上,首先可以把它理解成,Generator 函数是一个状态机,封装...

简心
41分钟前
5
0
FullCalendar日历插件说明文档

普通显示设置 属性 描述 默认值 header 设置日历头部信息。 如果设置为false,则不显示头部信息。包括left,center,right左中右三个位置,每个位置都可以对应以下不同的配置: title: 显示当...

ada_young
42分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部