Java并发系列8--线程池ThreadPoolExecutor

原创
2018/07/04 17:04
阅读数 60

前面讲的同步并发工具有些比较简单,所以篇幅也比较短,今天要讲的线程池非常重要,所以会是一个大章哦。已经预见留言区如下:

“太长不看...”
“看到ThreadPoolExecutor构造已睡着”
“精力饱满而来,昏昏欲睡而去...”

学习的确需要投入精力,尤其涉及到一些细节,对别人能做到不明觉厉,为何不自己一览山上风景呢?

一、何为线程池

如果你接触过对线池,比如数据库连接池,他们要解决的问题是:对象的启动耗费资源比较多,最好能做到只启动一次,然后重复使用。我们平常用到static final去修饰常量,也是这个意思。
对于线程来说,启动线程相对来说耗费时间会久一点,而且线程开太多,又会耗费内存,导致GC压力,所以我们需要有控制和管理的手段。
线程池顾名思义就是一个池子,使用了它之后,创建线程变成了从池子获得线程;关闭线程变成了把线程归还给池子。

二、先来讲讲ExecutorsExecutorService

JDK中自然提供了一套线程池的实现,就是ThreadPoolExecutor
为了更简单的使用ThreadPoolExecutor,JDK提供了线程池工厂类Executors,我们来看看他都与那些工厂方法:

  • newFixedThreadPool(int nThreads) 构建一个拥有固定线程数量的线程池
  • newSingleThreadExecutor() 构建一个只拥有一个线程的线程池
  • newCachedThreadPool() 构建一个弹性的线程池,需要多少就有多少
  • newSingleThreadScheduledExecutor() 构建一个只拥有一个线程的计划任务线程池
  • newScheduledThreadPool(int corePoolSize) 构建一个弹性的计划任务线程池

我们先来看一个例子:

		ExecutorService executorService = Executors.newSingleThreadExecutor();
		Future<Integer> future = executorService.submit(()->{return 12;});
		System.out.println(future.get());
		executorService.shutdown();

我们看到了几个新的类,Executors的工厂类方法会返回一个ExecutorService,实际上他是ThreadPoolExecutor的父接口。
ExecutorService提供了submit()方法,把一个RunnableCallback提交到线程池去执行,这里的入参是CallbackCallback是一个回调接口,没有入参但有返回值。submit()方法提交了一个任务之后,会返回一个Future用来表示异步计算的结果,这里也就是获得Callback所返回的结果。
那么如果submit()入参是Runnable呢,这样的话Future一般只会获取到null
ExecutorService是继承自Executor接口,有时你不需要用Future来获取异步计算的结果,只是想从线程池取出线程来执行一些任务,那么可以:

		ExecutorService executorService = Executors.newSingleThreadExecutor();
		executorService.execute(()->{});
		executorService.shutdown();

需要注意的是线程池创建之后是不会自动关闭的,需要手动调用shutdown()方法,ExecutorService还有一个shutdownNow()方法,表示立即结束,而不是在所有线程工作完成后优雅的结束。

1、ScheduledExecutorService

在上面我们看到Executors的工厂方法中有两个会返回ScheduledExecutorService,分别是newSingleThreadScheduledExecutor()newScheduledThreadPool(int corePoolSize),这个跟计划任务有关了,类似Linux的at命令。
主要有两个方法来对任务进行周期性的调度:

  • scheduleAtFixedRate() 以上个任务的执行时间为起点,之后的period时间,调度下一次任务
  • scheduleWithFixedDelay() 以上个任务的结束时间为起点,经过delay时间进行任务调度

三、重头戏之ThreadPoolExecutor

Executors的工厂方法比较合适初学者使用,简单直接。如果你想做一个高玩,那么就不得不去探索一下核心线程池的内部实现了。
来看一下ExecutornewFixedThreadPool()方法:

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

会发现他其实是返回了一个ThreadPoolExecutor对象,可以确定ThreadPoolExecutor就是线程池的实现类了,那ThreadPoolExecutor构造函数的这几个参数都是什么意思呢? 看一下ThreadPoolExecutor最丰富的构造,其他构造都是调用这个的:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

我们一个个来分析一下,会发现他的完善和强大:

  • int corePoolSize 指定了线程池核心(最少)线程数量
  • int maximumPoolSize 指定了线程池最大线程数量
  • long keepAliveTime 指定了线程存活时间,也就是当有线程被归还的时候,他的存活时间,毕竟线程一直运行着也是耗费资源
  • TimeUnit unit 线程存活时间单位
  • BlockingQueue<Runnable> workQueue 任务队列,保存被提交但未被执行的任务。如果线程池已满且所有线程都在执行任务,那么后来提交的任务就会暂时保存在这个队列中
  • ThreadFactory threadFactory 创建线程时用到的线程工厂类
  • RejectedExecutionHandler handler 拒绝策略。当任务队列workQueue也满了的时候,再有任务提交到线程池,要通过什么把他拒绝掉

一下看到这么多,还是挺吓人的。其实前面几个参数还比较通俗易懂,比较难理解的在于后三个参数,我们来一一拆解一下。

1、workQueue

如果对BlockingQueue不熟悉,请参考https://my.oschina.net/lizaizhong/blog/1840206
这个workQueue有以下几种选择:

  • SynchronousQueue 这是一个比较特殊的队列,本身没有容量,来一个就得消费一个。可以看到Executors.newCachedThreadPool()就是使用的这个队列,因为newCachedThreadPool()中的maximumPoolSize为无限大
  • ArrayBlockingQueue 有界队列
  • LinkedBlockingQueue 无界队列
  • PriorityBlockingQueue 优先级队列

2、threadFactory

threadFactory顾名思义就是线程工厂。如果我们想对线程池中的线程进行一些自定义配置,那么可以重写ThreadFactorynewThread()方法:

		ThreadFactory threadFactory = new ThreadFactory() {
			@Override
			public Thread newThread(Runnable r) {
				Thread thread = new Thread(r);
				thread.setName("myThread-"+thread.getId());
				thread.setDaemon(true);
				return thread;
			}
		};

3、rejectedExecutionHandler

当线程池线程都被取出,并且任务队列里的任务也排满的时候,新进入的任务怎么办?只能被拒绝。
JDK提供了四种拒绝策略,分别如下:

  • AbortPolicy 该策略直接抛出异常,阻止系统正常工作
  • CallerRunsPolicy 只要线程池未关闭,该策略直接在调用者(main)线程中运行当前被丢弃的任务。这种策略容易造成调用者线程的性能急剧下降
  • DiscardOldestPolicy 该策略丢弃任务队列中最老(最早)的一个请求,并尝试再次提交当前任务请求
  • DiscardPolicy 该策略默默丢弃无法处理的任务

4、扩展线程池

ThreadPoolExecutor的扩展机制类似与拦截器,但并未提供接口方法,而是需要重写这些方法,主要是三个方法:

		ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 10, TimeUnit.MINUTES	, new SynchronousQueue<>()) {
			@Override
			protected void beforeExecute(Thread t, Runnable r) {
				System.out.println("准备执行");
			}

			@Override
			protected void afterExecute(Runnable r, Throwable t) {
				System.out.println("执行完成");
			}

			@Override
			protected void terminated() {
				System.out.println("线程池退出");
			}
		};

四、不要使用Executors(What?)

如果仔细查看一下Executors的几个工厂方法,例如newSingleThreadExecutor():

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

会发现他返回的线程池,请求任务队列是一个无界队列,那么这样容易出现什么问题呢?想必你已经想到了,容易出现任务大量堆积,导致OOM。
其他的工厂方法也存在这个问题,总结起来就是:

  • fixedThreadPoolsingleThreadPool请求任务队列为无界队列,容易OOM
  • cachedThreadPoolscheduledThreadPool创建线程的数量为Integer.MAX_VALUE,容易OOM

Executors的这个问题,在大型项目中尤其需要注意。所以根据前辈们的血泪史,我们得到的宝贵经验就是:不要使用Executors获得线程池,最好选择自己来构建。

五、submit提交任务无异常的问题

如果你用线程池提交了一个Runnable任务,例如:

		executor.submit(()->{System.out.println(1/0);});

结果是不会输出任何东西,即使Runnable任务重抛出了异常。为啥会出现这种情况呢?感兴趣的同学可以自己研究下源码,主要涉及的类有FutureTaskRunnableAdapter,主要看FutureTask的run()方法即可了解原因。
不抛异常可是非常危险,为了解决这个问题,建议像线程池提交Runnable类型的任务使用execute()方法比较好!

展开阅读全文
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部