文档章节

Java并发编程实战(chapter_3)(线程池ThreadPoolExecutor源码分析)

心中的理想乡
 心中的理想乡
发布于 09/19 00:41
字数 6143
阅读 186
收藏 18

这个系列一直没再写,很多原因,中间经历了换工作,熟悉项目,熟悉新团队等等一系列的事情。并发课题对于Java来说是一个又重要又难的一大块,除非气定神闲、精力满满,否则我本身是不敢随便写这个话题的。随便出一个生涩、浮于表面的文章,我觉得意义不大。所以一直就搁置到现在。这一次开启,有一个小小的契机:我自己面试中,已经被问烂了的构造函数的几个参数有什么意义,这种问题,发现其实很多人并不了解。就着这次的机会,我就重开这个课题。

一、基本的一些准备知识

李老爷子的线程池堪称经典,老爷子也因此风靡全球开发者圈子,阅读了源码,你才能感受到什么叫做编程思想,我们普普通通的CRUD简直都弱爆了!老爷子牛逼点也在于,源码中的注释非常完备,这不得不佩服:思想牛逼一方面,能把思想完善、由浅入深的表述出来,我觉得更牛逼!其中对于这个ThreadPoolExecutor的基础知识的了解,我觉得完全可以看注释就可以学全了。要想了解线程池源码,我们先要了解如下几个方面:

  • 线程池的几种状态
  • 线程池的状态表述
  • 状态的使用的方式
  • 线程池的构造函数

1、线程池的几种状态

最关键的是几种扭转的状态,让我们直接上老爷子的注释:

     /* The runState provides the main lifecycle control, taking on values:
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     *
     * 
     * (下面是几种转态转换的根本的基本方式,很简单的英文,不用翻译)
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
     *
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
     */
  • RUNNING:接受新的任务,并且也继续运行阻塞队列里面的任务
  • SHUTDOWN:不接受新的任务了,但是可以继续执行阻塞队列里面的任务
  • STOP:不接受新的任务了,也不运行阻塞队列里面的任务了,并且去打断正在执行的任务
  • TIDYING:所有的任务都已经终止了,workerCount(任务数)是0,线程池运行到了这个状态之后,会去调terminated()这个方法
  • TERMINATED:terminated()这个方法执行完成

2、线程池的状态表述

同样,上源码:

// ctl这是一个很重要的参数,使用位标记方式,将当前线程池状态和当前线程池创建的任务多少杂糅到了一起
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 预留三位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池最大线程大小:(2^29)-1 (about 500 million)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池状态位,使用int的高三位进行储存
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 通过ctl值计算出运行线程池状态值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 通过ctl值计算出线程池当前任务多少的值
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 通过运行状态和任务多少的两个值,生成ctl这个包装的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

思想也很简单:大家熟知的int类型,是占四字节,32位的。为了状态操作的高效与空间节约,老爷子使用了位操作来控制。其中32位的高三位用来存储线程池的状态;低29位用来控制当前线程池有多少个线程。上面的源码就是对位操作的基本实现(都是基本的位操作,我这里不在累赘)

3、状态的使用的方式

这里会给出几个源码中,对状态和线程数量操控的方式:

// (c:ctl,s:state)当前线程池的状态,是不是小于给定的状态
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

// (c:ctl,s:state)当前线程池的状态,是不是大于等于给定的状态
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

// 当前线程池的状态是RUNNING的吗
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

// 使用CAS原理对当前线程池线程数量值加一
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

// 使用CAS原理对当前线程池线程数量值减一
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

// 使用CAS原理对当前线程池线程数量值减一,直到成功为止
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

下面的源码是对线程状态修改源码:

private void advanceRunState(int targetState) {
    // 这是一个死循环,直到修改成功才break
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

这里有两个判断条件,只要一个成功就会break循环

  • runStateAtLeast:如果当前状态和要设置的状态相等,或者比要设置的状态大。说明线程池状态的不可逆,说明,如果一个线程池已经是SHUTDOWN了,是不能设置回RUNNING状态的
  • compareAndSet:CAS设置ctl值。根据短路原理,到了这个方法执行已经说明当前状态是小于要设置状态了,所以可以修改ctl的状态位值。如果设置失败,返回false,继续死循环。成功,break

3、线程池的构造函数

常用的JDK推荐的,或者各大“api使用”书籍介绍的,无非都是下面的几个方法,进行创建线程池:

  • Executors.newCachedThreadPool
  • Executors.newFixedThreadPool
  • Executors.newScheduledThreadPool
  • Executors.newSingleThreadExecutor

可是当我们深入源码,才发现:这几个方法的内部无非都调用了ThreadPoolExecutor的构造函数,即使是newScheduledThreadPool这个方法,表面调用了ScheduledThreadPoolExecutor类,可是深入源码才发现:ScheduledThreadPoolExecutor类继承了ThreadPoolExecutor,并且构造函数使用了super进行了构建。这就给我们了一个很好的切入口:只要研究ThreadPoolExecutor构造函数就行。进一步,还会发现,ThreadPoolExecutor有四个构造函数,入参不一样,也都不约而同,最终调用了入参最多的那个(入参少的时候使用默认值),我们看看ThreadPoolExecutor中入参最多的构造函数的源码:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 对入参进行合法性校验
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    // 获取系统安全管理器(不做分析)
    this.acc = System.getSecurityManager() == null ?
        null : AccessController.getContext();
	// 核心几大参数的赋值操作
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize:核心运行线程数
  • maximumPoolSize:最大运行运行程
  • workQueue:阻塞队列
  • keepAliveTime:当线程大于核心线程数时,且阻塞队列没有元素,最大等待时间
  • threadFactory:生成线程的工厂类
  • handler:超出线程池最大承受能力之后的失败策略方法对象

对于线程池表现出来的各种特性,就是通过这几个参数控制的,所以很关键!

二、线程池的基本执行图解

对于线程池源码,我们先主要从execute执行方法入手进行分析,下面主要用一个图进行大致流程的展示:

ThreadPoolExecutor基本运行模型

配合上代码,我们先指出对应代码的大致位置,我们有个大体的概念比较好:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    // 下面大约就是①的过程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 下面大约就是②的过程
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 下面大约就是③的过程
    else if (!addWorker(command, false))
        // 下面大约就是④的过程
        reject(command);
}

三、线程池细节源码分析

1、addWorker方法

a、addWorker,我们先来看看

private boolean addWorker(Runnable firstTask, boolean core) {
	// 死循环,在某些条件下,会返回挥着break
    retry:
    for (;;) {
        int c = ctl.get();
        // 当下线程池运行状态
        int rs = runStateOf(c);

        // 下面是对线程池状态的一些列判断
        
        // 这个判断稍微有点绕,返回false的条件是:
        // 线程池是SHUTDOWN、STOP、TIDYING、TERMINATED其中的任意一个状态
        // 且(线程池状态为STOP、TIDYING、TERMINATED 或者 firstTask不为空 或者 阻塞队列为空)
        // 同样是返回false,添加失败
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            
            // 下面是对线程池当下线程数的一系列判断
            
            int wc = workerCountOf(c);
            // 线程数如果大于等于最大线程池允许数量((2^29)-1)或者大于等于设置的
            // 核心线程数或者最大线程数
            // 同样是返回false,添加失败
            if (wc >= CAPACITY ||
                // 这里也是一个玄妙之处:
                // 如果传入的core为true情况,可见线程数量依赖值为核心线程数
                // 如果为false,数量依赖于最大的线程数。通过这个core值,就可以
                // 控制什么时候,依赖什么值进行创建线程
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 下面是CAS的经典操作:
            
            // 这个第一个if如果设置成功,就结束整体的外部循环。没成功说明有竟态
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 再次获取一遍ctl,算是double check
            c = ctl.get();
            // 这里判断,如果为true,说明线程池当下状态已经被修改
            // 要重新通过外层循环的状态判断来确定返回值,所以continue了
            if (runStateOf(c) != rs)
                continue retry;
            
            // 到了这里,说明线程池状态没有被翻转,那就是说当前线程数因为竟态
            // 原因没有设置成功,那直接内部循环在执行一次,继续进行CAS的设置
        }
    }
    
	// 下面是启动线程的主要代码
    // 线程是否启动成功
    boolean workerStarted = false;
    // 线程是否添加成功
    boolean workerAdded = false;
    // 封装传入的线程对象Worker,这个也是很关键的对象,接下来会分析
    Worker w = null;
    try {
        // 封装线程的初始化工作,下面会分析
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 当下线程池的主锁,最大的一把锁,上锁期间主要对线程池容器进行维护
            // 这个容器是一个HashSet,保存当前运行的封装线程Worker
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 再次获取线程池当前状态,因为很有可能期间被人更改了
                int rs = runStateOf(ctl.get());

                // rs < SHUTDOWN:线程池是RUNNING状态
                // rs == SHUTDOWN && firstTask == null:
                // 线程池是SHUNTDOWN且firstTask为空,这种情况主要是因为
                // 线程池再SHUNDOWN状态了,可是阻塞队列还有没运行完的线程
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        // 保持一个线程最大线程池状态
                        largestPoolSize = s;
                    // 到这里线程添加到容器成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果添加容器成功,就启动封装的线程,且设置启动标识位为true
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果封装线程启动失败,会进行一系列的失败处理
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

b、下面是对addWorkerFailed方法的解说

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    // 同样的,获取主锁
    mainLock.lock();
    try {
        // 不为空的情况将封装线程从容器中移除
        // 为空的情况,主要是new Worker的时候报错
        if (w != null)
            workers.remove(w);
        // 循环登陆,减少一个线程数
        decrementWorkerCount();
        // 试着看看,能不能结束线程池,就是把线程池TERMINASTE了
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

c、下面是tryTerminate方法的解说

final void tryTerminate() {
    // 发现没,又是个死循环,老爷子很喜欢这种方式啊,而且是用for!
    for (;;) {
        int c = ctl.get();
        // 三种情况直接方法返回:
        // 1、正处在RUNNING状态的线程池
        // 2、线程池的状态是TIDYING或者是TERMINATE
        // 3、线程池是SHUNDOWN状态的,但是阻塞队列不为空
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 走到这里,线程池的状态可能是:SHUTDOWN(且阻塞队列空)、STOP
        // 如果此时线程数不为0的话,要进行打断操作了
        if (workerCountOf(c) != 0) { 
            // 这里入参的意思是只打断容器里第一个封装线程里面的线程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
		
        // 执行到这里,说明线程池的状态是:SHUTDOWN(阻塞队列为空)、STOP
        // 此时线程数为0,说明线程池可以进行终结操作了
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // CAS先将线程池设置成TIDYING的状态
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 执行用户实现的terminated方法
                    terminated();
                } finally {
                    // 无论怎么样都会将线程池设置成TERMINATED状态
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                // 到这里说明终结成功,不过根据Java原理,返回前
                // 先执行finally里面的解主锁的方法
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // 如果能执行到这里,说明CAS设置TIDYING状态失败
        // 说明是竟态状态
    }
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //线程没有被打断且获取到封装线程的锁
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
// 用户自己实现的结束方法
protected void terminated() { }

到这里,已经讲完了一个很主要的内部方法:addWorker。下面我们对封装线程对象Worker进行讲解

2、Worker对象

这个东西,是一个很很很很很很很很经典的Java并发模型:AQS。这片文章不做AQS的讲解,放到后续

a、具体的Worker对象张什么样

private final class Worker
    extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
		// 这个就是最终启动的线程,看到了吧
        final Thread thread;
        // 我们传入的Runnable对象被放到了这里
        Runnable firstTask;
        // 这里记录完成的任务数。
        // 这里说明下一个理念:一个Worker,是最终被运行的Runnanle对象
        // 在很大的情况下(下面做分析)Worker这个线程会一直存在
        // 存在的意义是不断读取阻塞队列里面存储的我们传进来的Runnable对象
        // 然后运行。所以我们实现的Runnable对象的run方法,最终不是被
        // start方法调用执行的,而是通过直接调用执行的!
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // AQS对象状态!也是一大难的东西!
            this.firstTask = firstTask;
            // 这里的getThreadFactory方法使用的就是我们传入的threadFactory
            // 对象
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
        	// 看到了吧,这里执行了外层对象的方法,去直接调用传入的
        	// Runnable中的run方法,等下解说
            runWorker(this);
        }

		// 下面的几个函数都是AQS必须要实现的方法,这里不累赘
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    
public ThreadFactory getThreadFactory() {
    return threadFactory;
}

b、默认的线程工厂DefaultThreadFactory:

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
        	Thread.currentThread().getThreadGroup();
        // 这里记录了线程名的前缀,可见会将线程池序号进行递增操作
        namePrefix = "pool-" +
            poolNumber.getAndIncrement() +
            "-thread-";
    }

    public Thread newThread(Runnable r) {
        // 这里就是生成喜闻乐见的Thread对象了,结合上面这里的r就是我们的Worker对象
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

3、runWorker方法

a、接下来又是一个关键性方法runWorker

final void runWorker(Worker w) {
    // 获取当前运行着的Worker线程
    Thread wt = Thread.currentThread();
    // 这个就是我们当下传入的Runnable
    Runnable task = w.firstTask;
    // 置空的意思是:Worker其实是一个壳子,以后会一直运行着,不断执行其他阻塞队列
    // 里面的Runnable对象的run方法
    w.firstTask = null;
    // 这里做解锁操作,是表示下面所有操作是可以被打断的
    // 另外AQS默认情况下不做unlock操作,lock会阻塞
    w.unlock(); 
    // 这个标志位表示线程执行过程中有没有被打断,或者运行异常
    boolean completedAbruptly = true;
    try {
        // 这个While循环里面的语句相当关键,包含了线程池执行流程的枢纽!
        // 我先大致说一下,下面会详细分析getTask方法:
        // 主要就是判断如果当前Worker里面的Runnable对象不为空
        // 就会执行这个对象的run方法;执行完之后,还会回到这个循环
        // 再下面的finally块里面将task置空了,所以就去调用getTask方法
        // 而getTask方法是一个很大可能阻塞的方法,阻塞的原因就是等待
        // 阻塞队列里面放入对象!所以也就形成了,一个Worker对象,循环
        // 不停的执行传入的Runnable对象run方法。这也就构成了corePoolSize
        // 与maxPoolSize两个参数控制系统级别的线程多少的目的!也就是说,
        // 这就是线程池里面,“池”这个概念的由来~
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 这里主要是判断是否要打断当前Worker所在的线程
            // 要满足两个个条件:
            // 1、当前线程池是STOP、TIDYING、TERMINATED
            // 2、当前线程是没有被打断的情况
            // 其中Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)
            // 主要用于清除线程终端标志,因为很大可能线程池刚刚转换成STOP
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 在执行线程体之前执行的方法,用户实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    // 请看下面几个异常,都是直接抛了出去
                    // 而并没有处理,所以处理内部异常也是
                    // 线程池的一个关键点
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 在执行完线程体之后的方法,用户实现
                    // 异常同时也传入这里了,所以可以自己实现一个子类
                    // 自己实现这个方法,进行异常处理
                    afterExecute(task, thrown);
                }
            } finally {
                // 这个地方肯定会被执行,所以无论run方法怎么样
                // Worker运行完成线程数都会加一
                task = null;
                w.completedTasks++;
                // 这里进行解锁操作
                w.unlock();
            }
        }
        // 注意代码执行到了这里说明while循环跳出来了
        // 大致有如下几种情况:
        // 1、阻塞队列里面没值了
        // 2、线程池状态翻转,便成了大于等于SHUTDOWN状态的了
        // 由于是正常结束,所以异常结束标志是false
        completedAbruptly = false;
    } finally {
        // 这里肯定会被执行,但是有两种情况跳入这个代码块
        // 1、run方法没有抛异常,completedAbruptly为false
        // 2、run方法抛异常,completedAbruptly为true
        // 下面也会进行解说
        processWorkerExit(w, completedAbruptly);
    }
}

b、我们来看核心的getTask方法

private Runnable getTask() {
    // 这个标志位主要用于后面的poll方法是否超时
    boolean timedOut = false; 

    // 又来了,李老爷子!是一个死循环判断!
    for (;;) {
        int c = ctl.get();
        // 获取当前线程池运行状态
        int rs = runStateOf(c);

        // 如果同时符合下面两种情况,直接返回null,并减少线程数量
        // 1、线程池状态是:SHUTDOWN、STOP、TIDYING、TERMINTED
        // 2、线程池的状态是STOP、TIDYING、TERMINTED或者队列为空
        // 这预示着线程池要进行关闭操作了,此Worker要结束声明周期!
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 这里是循环指导CAS成功设置
            decrementWorkerCount();
            return null;
        }

        // 这里获取当前线程池的线程数
        int wc = workerCountOf(c);

        // 这个标识位要解释解释:
        // 1、allowCoreThreadTimeOut成员变量,可设置
        // 2、wc > corePoolSize线程数是否大于核心线程数
        // 简单说就是:这个标志位控制线程池的收缩!
        // 很关键是不是!
        // 正常情况下只要超出核心线程数的线程才要进行收缩的
        // 收缩的条件是根据传入的阻塞队列超时时间
        // 但是我们可以通过设置allowCoreThreadTimeOut为true
        // 这样核心线程也可以收缩!
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 这里maximumPoolSize不能为零和负数
        // 这里判断很复杂,简单理解就是:
        // 如果线程池线程数超出了设置的最大线程数或者阻塞队列被打断了
        // 且当前Worker所在线程不是最后一个线程或者阻塞队列空了。
        // 这里如果wc>maximumPoolSize,那一定大于1,那就说明
        // 一定会执行if方法体;如果小于等于maximumPoolSize情况,
        // 那就说明是线程合理的收缩,这种时候,只有allowCoreThreadTimeOut
        // 被置位或者线程数大于核心线程数,当然如果要是只有一个线程数且队列不为空
        // 的情况也不能收缩,要保证有封装线程能执行阻塞队列里面线程
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                // 这里如果设置成功说明可以终结这个Worker了
                return null;
            // 这里是continue,因为有竟态
            continue;
        }

        try {
            // 注意这里的timed的取值,timed为true的时候是:
            // 1、allowCoreThreadTimeOut被置位
            // 2、或者线程数大于核心线程数
            // 其他情况是直接take方法,直接阻塞的。除非被打断
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                // 正常情况是拿到了Runnable,直接返回了
                return r;
            // 这种是阻塞队列超时了
            timedOut = true;
        } catch (InterruptedException retry) {
            // 打断情况并非阻塞队列超时,所以这里设置成false
            timedOut = false;
        }
    }
}

c、下面是对processWorkerExit分析

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 这个判断说明当前Worker所在的线程执行Runnable中的run方法抛了异常
    // 所以这个时候,要将线程数减一
    if (completedAbruptly) 
        decrementWorkerCount();

    // 获取主锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 将当前Worker存在期间一共执行了多少个Runnable累加到
        // 线程池的统计字段上面
        completedTaskCount += w.completedTasks;
        // 将封装线程从容器中移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
	// 上面的方法在这里执行了,分析请看上面
    tryTerminate();

    int c = ctl.get();
    // 如果现在线程池的状态是:RUNNING、SHUTDOWN,执行if代码块
    if (runStateLessThan(c, STOP)) {
        // 如果没有抛异常情况,执行这个if代码块
        if (!completedAbruptly) {
            // 这个代码块,主要是要保证如果阻塞队列中还有Runnable线程
            // 而又走到了即将结束当前WOrker的代码,线程池要保证,至少还有
            // 运行着的Worker对阻塞队列中的线程进行处理,执行
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 为0的情况表示允许核心线程收缩,或者核心线程直接设成了0
            // 阻塞队列不为空要保证最小可用的Worker为1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 判断当前线程数是不是比最小的还要小
            if (workerCountOf(c) >= min)
                // 这里表明,有足够的Worker去执行
                return; 
            // 代码运行到这里,表明没有足够的Worker了,下面去创建
        }
        // 这里添加一个Worker的原因是:
        // RUNNING和SHUTDOWN状态都是允许继续执行阻塞队列中的线程的
        // 所以这里创建一个firstTask为null,依赖getTast去获取队列中的
        // 线程去执行。false的原因是创建依据maximumPoolSize
        addWorker(null, false);
    }
}

四、结尾

到此为止,线程池的主要源码,都分析了,剩下,还有几个附加功能源码,留着接下来有精力再一点点回补吧。当然,对于下一步的深入,就要到AQS的分析了。可见,这里的Worker本身就是一个AQS,在Worker上面调用lock或是unlock方法,都是进入一个内部的阻塞队列的管理的。其中最最底层,还会涉及到操作系统中线程的同步原语:mutex!接下来,我会分析那个,敬请期待!

© 著作权归作者所有

共有 人打赏支持
心中的理想乡
粉丝 18
博文 75
码字总数 101203
作品 0
深圳
程序员
私信 提问
Java多线程之线程池(ThreadPoolExecutor)实现原理分析(一)

在上一篇文章Java中实现多线程的3种方法介绍和比较中,我们讲解了Java中实现多线程的3种方法。使用多线程,就必须要考虑使用线程池,今天我们来聊聊线程池的那些事。 注:源码都是基于JDK1....

小怪聊职场
05/14
0
0
Java线程框架_Executor

Executor 框架是 juc 里提供的线程池的实现。前两天看了下 Executor 框架的一些源码,做个简单的总结。 线程池大概的思路是维护一个的线程池用于执行提交的任务。我理解池的技术的主要意义有...

天呀鲁哇
2015/02/05
0
0
读书笔记之《Java并发编程的艺术》-线程池和Executor的子孙们

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
慎用Java 自带两种线程池!

Java5相比之前的Java版本,在并发编程上,有了非常大的提高,加了很多类,提供了很多可用于并发编程的工具包和工具类。尤其为人们所称道的,就是Java自带的线程池。 Java5线程池的介绍文章,...

webas
2013/04/26
0
4
Java并发--深入理解线程池

为什么需要线程池 在生产环境中,我们不能无限制的创建线程,主要原因如下: 线程创建与销毁的代价并不低; 如果可运行的线程数量多于可用处理器的数量,有些线程将会闲置,大量闲置的线程会...

珩翊
09/13
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Java提高班(六)反射和动态代理(JDK Proxy和Cglib)

反射和动态代理放有一定的相关性,但单纯的说动态代理是由反射机制实现的,其实是不够全面不准确的,动态代理是一种功能行为,而它的实现方法有很多。要怎么理解以上这句话,请看下文。 一、...

王磊的博客
18分钟前
1
0
Ext grid 渲染

// 单元格字体颜色渲染function renderer_Meta_useStatus(value, cellmeta, record,rowIndex, columnIndex, store){ var color = ""; if("空闲"==value){ color = "green";......

MoksMo
28分钟前
4
0
log4j2在spring中的配置

<?xml version="1.0" encoding="UTF-8"?><!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL --><!--Configuration后面的status,这个用于设置l......

TonyTaotao
34分钟前
3
0
java 中间变量缓存机制(i++,++i)

public class Test { public static void main(String[] args) { int i = 0; i = i ++ ; System.out.println(i); } } 答案是 0 如果是 i = ++......

shzwork
41分钟前
5
0
初识多线程及其原理-笔记

什么情况下应该使用多线程? 通过并行计算提高程序执行性能 需要等待网络、I/O响应导致耗费大量的执行时间, 可以采用异步线程的方式来减少阻塞 tomcat7 以前的io模型 客户端阻塞 线程级别阻...

Java搬砖工程师
52分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部