自定义线程池影响的线上事故

2019/04/10 10:10
阅读数 9

  作为一个牛逼的程序员,相信大家肯定是接触过多线程的概念的。并且可能会在实际的工作中因为一些业务场景需要使用自定义线程池来执行批量的任务或对线程进行管理。同样,我们项目中也存在一个两个场景需要使用线程池。而这两个场景分别为:

1、持续监听某个外部接口的不间断的返回信息,其实就是长链接的阻塞接口,总共12种资源需要监听,所以就意味需要12个不间断的线程执行阻塞任务。

2、RabbitMQ的消费者,因为需要应用启动的时候就执行消息的消费,所以也通过线程池中获取线程执行消费任务。

一、先看线程池的定义

public class ThreadPoolUtil {

private static Logger logger = LoggerFactory.getLogger(ThreadPoolUtil.class);

private static volatile ThreadPoolExecutor threadPoolExecutor = null;
/**
* 创建执行
* @return
*/
private static AtomicInteger nextId = new AtomicInteger(0);
public static ThreadPoolExecutor createExecutor(){


if (threadPoolExecutor != null){
return threadPoolExecutor;
}
synchronized (ThreadPoolUtil.class){

if (threadPoolExecutor != null){
return threadPoolExecutor;
}
int corePoolSize = 16;
int maxPoolSize = 16;
int keepAliveSeconds = 60;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue(500);
RejectedExecutionHandler rejectedExecutionHandler = (r, executor) -> logger.error("队列已经满了,总任务{},直接拒绝吧", executor.getTaskCount());

threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
keepAliveSeconds, TimeUnit.SECONDS, queue, r -> {
String fileName = Thread.currentThread().getStackTrace()[5].getFileName();
String threadName = fileName.substring(0,fileName.indexOf("."))+"-";
Thread thread = new Thread(r, threadName+nextId.incrementAndGet());
return thread;
}, rejectedExecutionHandler);
}
return threadPoolExecutor;
}

  看看上面的线程池设计,好像是没有啥问题的。如果是放在普通的可终结的任务使用当前线程池,理论上是没有太大问题。但是!我们的应用刚好这几个任务都是阻塞的。阻塞就意味着线程是无法回收的,其他的任务使用这个线程池之后,就只能先放到队列中,然后一直得不到释放的线程资源执行。最终队列积压,任务被抛弃。

 

二、线上事故描述

  因为在初始化的时候,已经将 12 个监听都启动了,并且使用的是当前线程池构造工具。启动完成之后,12个核心线程就一直被阻塞占用。这12个资源的监听还是比较正常的,并且能够对监听数据进行处理和执行。

  因为需要MQ消费端启动的时候就可以执行消费,所以在启动的时候,设置了启动配置类中调用上述工具创建线程池,相当于用新的线程执行消息监听的动作。然而MQ却迟迟不见消费的过程,导致消息队列一直积压。并且无法完成正确的数据处理。

三、问题猜测及理论支撑

  猜测:没有被消费,应该就是我们的线程池中没有空闲的线程进行消息监听处理。初始化的时候的消费监听的任务被直接丢弃到了线程池的任务队列中,而这个线程池的任务队列中数数据只有在两种情况下才可能被执行。

  第一种:线程池中有空闲的线程,可以进行执行

  第二种:消息队列满了,开启了加大了线程池的线程数以便执行堆积的任务

  而我们的这个一步开启MQ消费监听的任务被发送到线程池的时候,因为核心线程数就是 12 ,而我们前面的资源监听接口已经开启了12个阻塞任务,所以就没有了可用线程。所以被存放到了线程池待执行任务队列中。可怕的是,我们这个线程池的队列大小为500 ,很显然 1 < 500 ,所以就无法触发线程加大的动作,导致这个队列的任务“被遗忘”。

 

 理论支撑:

  线程池的核心参数包括: coreSize , maxSize, quauaSize,RejectedExecutionHandler

  分别为:核心线程数,最大线程数,可积压的任务数,拒绝策略

  当创建线程的时候,首先会先创建核心线程数等量的线程,比如上面就是 12个核心线程, 而当我们的核心线程都在执行阶段的时候,再次加入的任务就会被存放到任务队列中。当任务不断的增加并且幅度远远大于核心线程的处理速度,导致任务队列存放到最大值,比如上面的500,那么就需要增加线程数,此时就是需要增加线程数到最大值,比如上面的16,然而,增大了之后,发现已然不能处理消化任务的投放数量,这个时候就用不同的处理策略,比如上面的  rejectedExecutionHandler 就是直接丢弃。

  

  猜测和理论匹配一下的话就是:核心线程是12 ,这12个线程被资源监听的阻塞任务占用无法释放,而开启消费监听的任务被丢到了待执行的任务队列中,此时,任务队列又不满足益处的条件,所以就没有增加新的线程来处理,以至于,这个创建消费监听的任务就“被遗忘”了。

 

  如何进行论证呢?使用如下测试代码

  

public static void main(String[] args) {
        ThreadPoolExecutor executor = createExecutor();
      // 临界值 分别设置12 16 512 518
        for (int i =0; i < $临界值;i++){

            int finalI = i;
            executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("当前任务序号为:"finalI +" ,活跃线程数"+ executor.getActiveCount());
                    Thread.sleep(10000*1000); // 这就当作是持久的任务在执行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    }

   测试结果:

  临界值为 12, 核心线程刚好够用

    

  临界值为 16 , 虽然任务数大于了核心线程,但是并没有新建线程数。所以验证任务被放到了队列中,先使用队列存放,队列满了再开新线程

   

  临界值为 512 任务数量大于  核心线程数  所以新任务放到了队列中,且刚好不会有超出,不触发新的线程创建

    

  临界值为 516 任务数量大于  ( 核心线程数 + 队列大小 ) 所有活跃线程被加到最大

  

  临界值为 518, 任务数量大于  ( 队列大小 + 最大线程数) 所有产生丢弃

  

 

 

四、如何解决当前事故

  出现这个问题之后,我们直接就增加了核心线程的数量,以保证整体大于在阻塞任务的数量。比如我们这个就是重新设置为核心线程数量 16 > 12,

  同时,我们将阻塞任务同非阻塞任务所创建的线程池进行隔离,以减少共用线程池造成的 正常任务被遗忘的可能性。

 

五、如何设置你的线程池大小

  那么在开发中,如何设置i线程池的大小?其实这没有特定的规范,需要结合自己任务的执行时间而考虑,

  但是最好提前考虑好,任务是否为阻塞性任务,如果是的话,建议做好线程隔离。

  在我们一般将核心线程设置为 n + 1  (n 为内核数量)

  最大线程数量设置 2n + 1  (n 为内核数量) 

    

  

原文出处:https://www.cnblogs.com/china-baizhuangli/p/12329087.html

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部