文档章节

Java 并发编程,线程池(ThreadPoolExecutor)源码解析

郑加威
 郑加威
发布于 2018/03/01 20:00
字数 4089
阅读 987
收藏 12

创建线程的几种方式

在开始解析线程池之前,先简单的总结一下创建线程的几种方式:

  • 继承Thread类
  • 实现Runnable接口
  • 使用Future和Callable
  • 借助线程池

上面是创建一个线程的四种方式,在实际的开发中是推荐使用线程池来实现多线程的并发操作。ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。

线程池的主要处理流程

在开始分析之前,先看一下线程池的主要处理流程,如下图:

上图是线程池的处理流程,先从宏观上,看一下处理流程,下面结合代码看看具体是如何实现的,那么开始分析:

数据结构

继承关系

public class ThreadPoolExecutor extends AbstractExecutorService {}

类的定义非常的简单,只是继承了一个抽象类 AbstractExecutorService 。并没有很好的反映出它的一个继承关系, 不要着急,看一下下面的这张图:

通过上图可以发现,Executor才是站在金子塔顶端的那个, 由它来掌控全局, 当然Executor这个接口也是非常的高冷, 只定义了一个 void execute(Runnable command) 方法,将任务的提交与任务的执行进行了分离。ExecutorService接口继承了Executor 接口,添加了一些带返回的submit()方法和关闭方法shutdown()等。 AbstractExecutorService抽象类实现了ExecutorService()提供了一些方法的默认实现,ThreadPoolExector继承了抽象类,并实现了Executor接口的execute()方法。

基本属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是ThreadPoolThread类中很重要的一个字段,它是一个特殊的int类型变量。 并且是一个原子操作,能保证线程的安全性。 ctl 这个变量包含了两部分的信息 , 简写表示 c:

  • workerCount:线程池内有效线程的数量 : 简写表示 wc
  • runState:线程池的运行状态 ,running ,shutdown 等 ,简写表示为 rs

它是怎么做到一个变量包含两个信息的,详细分析一下:

一个int类型的变量用二进制表示是32 位的,其中高3位表示的是线程池的状态,低29位表示的线程中有效线程的数量。 这设计厉害了,还可以这么玩。

runState是一个生命周期的控制状态,值如下:

  • RUNNING : 接受新任务并处理队列任务 , 数值表示 -1 << 29
  • SHUTDOWN :不接受新任务并处理队列任务 ,数值表示 0 << 29
  • STOP : 不接受新任务,不处理队列任务,并且会中断正在处理的线程任务,数值表示 1 << 29
  • TIDYING : 所有任务终止,workCount为0,清理状态 数值表示 2 << 29
  • TERMINATED :terminated()方法执行后执行该状态 ,数值表示 3 << 29

上面的5个常量,按照从小到大的顺序依次性进行排列。线程池的运行状态要小于线程池的其它状态。

若想了解移位运算,请查看 JAVA二进制位运算、移位运算

上面说了ctl中包含了两部分信息,下面的代码是它们的具体实现:
先看将两部分合并的方法:

//将runState和workerCount 合并到一个ctl中
//RUNNING的值为 -1 << 29  , wc 为 0 
//rs的二进制表示为:     11100000000000000000000000000000
//wc二进制表示为:        00000000000000000000000000000000
//                     --------------------------------  | 运算
//所以两者 | 运算后:值为 11100000000000000000000000000000 
//rs填充ctl的高三位, wc填充ctl的低29 位  ,初始化wc的值为零
private static int ctlOf(int rs, int wc) { return rs | wc; }

再看将ctl分解为runState和workCount方法

//该方法用于取出runState的值

//00011111111111111111111111111111  CAPACITY的二进制表示

//11100000000000000000000000000000  ~ 按位取反操作,如果是1111 取反就是 0000
//11100000000000000000000000000000  c的二进制表示
//--------------------------------  运算
//11100000000000000000000000000000  返回结果
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//该方法用于取出workCount的值
//11100000000000000000000000000000
//00011111111111111111111111111111
//--------------------------------
//00000000000000000000000000000000   返回结果
private static int workerCountOf(int c)  { return c & CAPACITY; }

上面演示了在初始化线程池的时候,c , rs , wc 是如何运算的。如果没有明白,请去看一下位移运算。

构造方法

ThreadPoolExecutor提供了四个构造方法,会涉及到几个非常重要的参数,构造方法看下面的这一个就可以了,因为其他的三个构造方法,都是使用的this调用的下面这个方法。ThreadPoolExecutor的构造方法除了进行参数的合法性和赋值赋值操作外,并没有其他多余的动作。

//corePoolSize : 核心线程池的实现大小
//maximumPoolSize : 最大线程池的实现大小
//keepAliveTime :线程活动保持时间,线程空闲超过这个时间就会终止
//unit : 线程活动保持时间的单位
//workQueue :用来暂时保存任务的工作队列
//threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个线程设置有意义的名字
//handler : 当ThreadPoolExecutor已经关闭或已经饱和时,execute()方法将调用Handler
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        //判断参数合法性,不合法抛出IllegalArgumentException异常
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        //队列,线程工厂,handler为空抛出空指针异常
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        //赋值操作
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

上面只是简单的介绍了一下参数的含义,下面具体说明一下:

  • corePoolSize:当提交一个任务时,线程池会创建一个线程来执行任务,即使有空闲线程也会创建一个新的,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本的线程。

  • workQueue:用于保存等待执行任务的阻塞队列

    • ArrayBlockingQueue:基于数组结构的有界阻塞队列
    • LinkedBlockingQueue:基于链表结构的阻塞队列,按照FIFO排序元素
    • SynchronousQueue:一个不存储元素的阻塞队列
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列
  • maximumPoolSize:线程池允许创建的最大线程数,如果队列满了,并且已经创建的线程数小于最大线程数,则线程池会创建新的线程去执行任务。

  • ThreadFactory:创建线程的工厂

  • RejectedExecutionHandler:当队列和线程池都满了,说明线程池已经处于饱和状态,那么必须采取一种策略来处理新提交的任务

    • AbortPolicy:直接抛出异常
    • CallerRunsPolicy:只用调用者所在的线程来运行任务
    • DiscardOldestPloicy:丢弃队列里最近的一个任务,并执行当前任务
    • DiscardPolicy:不处理,丢弃掉

向线程池提交任务

提交任务有两个方法,一个是execute()一个是submit()方法,前一个不会返回值,后一个会返回值。submit()方法不再本次解析的范围内,感兴趣的可以自行研究简单提一下,submit()方法在AbstractExecutorService抽象类中实现 ,借助 FutureTask进行封装,在execute() 中执行。

execute()方法
对于线程池,作为使用方的我们只能向线程池提交任务,而对于任务是否执行和什么时候执行,并不能控制。

public void execute(Runnable command) {
     //验证参数合法性
     if (command == null)
         throw new NullPointerException();
   
     //获取ctl的值
     int c = ctl.get();

     //如果当前线程池中线程的数量小于核心线程的数量
     if (workerCountOf(c) < corePoolSize) {
            //执行addWorker方法,
            //addWorker 将任务添加到队列,并启动,成功返回true,失败返回false
            if (addWorker(command, true))
                //addWorker返回true, 添加成功,结束execute()方法运行
                return;
            c = ctl.get();
     }
     
     //线程池的运行状态已经不是RUNNING
     //线程池的状态是RUNNING,wc > corePoolSize,队列未满
     if (isRunning(c) && workQueue.offer(command)) {
         int recheck = ctl.get();
         // 非RUNNING状态 则从workQueue中移除任务并拒绝
         if (! isRunning(recheck) && remove(command))
              reject(command);
         // 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败
         else if (workerCountOf(recheck) == 0)
             // 这行代码是为了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
            // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
            //但要运行任务队列中的任务
              addWorker(null, false);
      }
       
      //线程池非RUNNING状态
      //队列满了,启动新的线程也失败了,采用拒绝策略
      else if (!addWorker(command, false))
         reject(command);
}

上面的注释已经详细的标注了if中判断的条件,这里在简单的总结一下,execute()方法的添加策略,分为几种情形:

  • wc < corePoolSize , 创建一个新的线程,并提交任务
  • wc = corePoolSize , 任务队列未满,则添加到阻塞队列
  • corePoolSize < wc < maximumPoolSize ,阻塞队列已满,尝试创建一个新的进程来执行任务
  • wc >= maximunPoolSize ,阻塞队列已满,则采用拒绝队列。

上面的四种情形也和文章开始的流程图相契合,可以在回头看一下。

addworker() 也在线程池的调度逻辑中扮演了很重要的角色,下面来看一下,它具体都进行了哪些操作。

添加任务的方法逻辑可能有一点绕, 多看几遍就可以了。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //获取ctl的值
            int c = ctl.get();
            //计算rs的值,用于判断线程池当前的状态
            int rs = runStateOf(c);

            //下面的if判断两种情况
            //1.rs >= SHUTDOWN  ==> 线程池处于非运行状态
            //2.线程池状态为SHUTDOWN,此时不再接收新的任务,但还要处理队列中的任务
            //  firstTask !=null 不再接收新任务, 队列为空,任务已经处理完成。都直接返回。 
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&firstTask == null && ! workQueue.isEmpty()))
                return false;
            //执行到此处,说明线程处在RUNNING状态
            //或者 SHUTDOWN状态,队列中还有任务要执行
            for (;;) {
                //计算线程池中有效线程的数量
                int wc = workerCountOf(c);
                //判断wc的数量是否符合规则
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //cas ,+1 操作 ,成功返回true,失败返回false
                if (compareAndIncrementWorkerCount(c))
                    break retry;  //退出循环
                c = ctl.get();  // Re-read ctl
                //+1 操作失败, 从新检测线程池的状态,继续循环
                if (runStateOf(c) != rs)
                    continue retry; 
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //workCount +1 操作成功,执行下面的步骤

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //Worker: 实现了Runnable的私有内部类,将任务进行封装
            w = new Worker(firstTask);
            final Thread t = w.thread;  //w.thread 由Work构造方法初始化
            if (t != null) {
                //获取全局锁,并发访问workers ,加锁处理
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //RUNNING状态,SHUTDOWN状态清理队列中的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //将work添加到HashSet集合中。
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //启动线程执行任务
                    t.start(); //执行的是ThreadPoolExecuteor的runWorker方法
                    workerStarted = true;
                }
            }
        } finally {
            //线程启动失败
            if (! workerStarted)
                //删除添加的任务
                addWorkerFailed(w);
        }
        return workerStarted;
    }

上面的注释已经很清楚,可能还有人觉得有些混乱,那么在串一下这个处理流程:

线程池的状态在不断的发生变化,添加新的任务时,先判断一下当前的线程状态。
在调用addWorker()之前,判断了此时的wc数量是小于< corePoolSize的。那么也就说只要线程池状态是RUNNING, 就可以直接创建新的线程来执行提交的任务。但是还是要排除,线程池是非RUNNING的这种情况。if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&firstTask == null && !workQueue.isEmpty()))便有了这个判断条件。 所以如果addWorker()方法没有创建新线程也没有提交新任务,直接返回false的话,只能是下列情形中的一种:

  • 线程池是STOP|TIDYING|TERMINATED 中的一种状态
  • 线程池是SHUNDOWN状态,但 firstTask != null
  • 线程池是SHUNDOWN状态,但 workQueue 是空
  • 线程的有效线程数量大于CAPACITY|| 大于(corePoolSize或者maximumPoolSize)

上面的 大于(corePoolSize或者maximumPoolSize)取决于队列是否已满

t.start()是调用的runWorker方法,因为Worker的run()方法,调用的是ThreadPoolExecutor类中的RunWorker()方法,看一下RunWorker的具体实现。
runWorker()

final void runWorker(Worker w) {
    //获取当前运行的线程
    Thread wt = Thread.currentThread();
    //获取Worker中包装的Runnable
    Runnable task = w.firstTask;
    w.firstTask = null;
    //允许中断,因为在work的构造方法中抑制了中断
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
         //task 就是 w.firstTask ,也就是包装后的任务
         //task 不是null 则进入while循环中
         //如果 task == null , 则从队列中取出一个任务执行
         //如果队列为空,则task还是null ,不进入while循环。
         while (task != null || (task = getTask()) != null) {
            //加锁
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())
                    //满足上面的条件,则执行中断操作
                    wt.interrupt();
                try {
                    //任务提交之前是否需要一些操作,交由子类实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();  //执行任务,执行Runnable中的run()方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //交给子类的实现方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    //修改task值,获取新的任务执行
                    task = null;
                    //完成任务数+1
                    w.completedTasks++;
                    //释放锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //清理线程的后续
            processWorkerExit(w, completedAbruptly);
    }
}

看一下上面方法的核心代码,while循环。如果传入的对象不为null将直接运行传入的任务,传入任务完成之后,task变为null, 则调用getTask()方法来获取队列中的任务,如果队列中一直有任务存在这又是一个死循环,while会一直循环下去,当然这只是一种极端的情况,在getTask()方法有返回null的条件,当getTask()方法返回null,则退出runWorker()方法。

getTask()

private Runnable getTask() {
        //线程等待超时变量,默认为false
        boolean timedOut = false; // Did the last poll() time out?
        //死循环
        for (;;) {
            //获取ctl的值
            int c = ctl.get();
            //获取线程池当前的状态值
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //判断线程池当前所处的状态
            //线程的状态不是RUNNING,队列为空。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                //工作线程的数量-1 
                decrementWorkerCount();
                return null;
            }
            //工作线程的数量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //没有超时,获取一个任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

上面的getTask()方法内部是一个死循环,结束死循环的话可以通过break或者return关键字。上面方法使用了return null 来结束任务获取,分为以下情形。

  • 线程池状态 >= SHUTDOWN && (rs >= STOP 任务队列为空)
  • 线程获取任务等待超时或者任务队列为空了
  • 成功取到了不为null的任务

关闭线程池

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //线程池状态设置为SHUTDOWN
            advanceRunState(SHUTDOWN);
            //中断所有空闲线程,等待队列任务执行完成
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

可以通过调用线程池的shutdown或者shutdownNow方法来关闭线程池,它们的原理是遍历池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow首先将线程池的状态设置为STOP,然后尝试停止所有正在执行的或者暂停的任务并返回等待执行的任务列表。而shutdown只是将线程池的状态设置成SHUTDOWN,然后中断所有没有在执行的任务线程。

参考:
书籍:《JAVA并发编程艺术》
博客: http://blog.csdn.net/clevergump/article/details/50688008

 

© 著作权归作者所有

郑加威
粉丝 175
博文 183
码字总数 387300
作品 0
杭州
架构师
私信 提问
Java 并发编程源码解析汇总篇

java并发编程,内存模型 java并发编程,volatile内存实现和原理 Java并发编程,并发基础 Java 并发编程,线程池(ThreadPoolExecutor)源码解析 Java并发编程,Executor 框架介绍 Java并发编...

郑加威
2018/12/23
0
0
Java多线程之线程池(ThreadPoolExecutor)实现原理分析(一)

在上一篇文章Java中实现多线程的3种方法介绍和比较中,我们讲解了Java中实现多线程的3种方法。使用多线程,就必须要考虑使用线程池,今天我们来聊聊线程池的那些事。 注:源码都是基于JDK1....

小怪聊职场
2018/05/14
0
0
ThreadPoolExecutor 原理解析

本文首发于个人微信公众号《andyqian》,期待你的关注 前言 在上一篇文章《Java线程池ThreadPoolExecutor》中描述了ThreadPoolExecutor的基本概念,以及一些常用方法。这对于我们来说,是远远...

andyqian
06/14
48
0
读书笔记之《Java并发编程的艺术》-线程池和Executor的子孙们

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
754
1
Java多线程学习(八)线程池与Executor 框架

Java面试通关手册(Java学习指南,欢迎Star,会一直完善下去,欢迎建议和指导):https://github.com/Snailclimb/JavaGuide 历史优质文章推荐: Java并发编程指南专栏 分布式系统的经典基础理...

snailclimb
2018/05/31
0
0

没有更多内容

加载失败,请刷新页面

加载更多

75、GridFS

GridFS是MongoDB提供的用于持久化存储文件的模块,CMS使用Mongo DB存储数据,使用FGridFS可以快速集成开发。 工作原理: 在GridFS存储文件是将文件分块存储,文件会按照256KB的大小分割成多个...

lianbang_W
42分钟前
4
0
js bind 绑定this指向

本文转载于:专业的前端网站➱js bind 绑定this指向 1、示例代码 <!DOCTYPE html><html lang="zh"> <head> <meta charset="UTF-8" /> <title>bind函数绑定this指向......

前端老手
44分钟前
4
0
CentOS Linux 7上将ISO映像文件写成可启动U盘

如今,电脑基本上都支持U盘启动,所以,可以将ISO文件写到U盘上,用来启动并安装操作系统。 我想将一个CentOS Linux 7的ISO映像文件写到U盘上,在CentOS Linux 7操作系统上,执行如下命令: ...

大别阿郎
51分钟前
4
0
深入vue-公司分享ppt

组件注册 全局注册 注册组件,传入一个扩展过的构造器 Vue.component('my-component', Vue.extend({/*...*/})) 注册组件,传入一个选项对象(自动调用Vue.extend) Vue.component('my-comp...

莫西摩西
52分钟前
4
0
gitlab重置管理员密码

登录gitlab服务器 [root@localhost bin]# sudo gitlab-rails console productionLoading production environment (Rails 5.2.3)irb(main):001:0> u = User.where(email: 'admin@example.co......

King华仔o0
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部