文档章节

线程池原理

小菜鸡1
 小菜鸡1
发布于 2016/08/08 17:02
字数 7960
阅读 65
收藏 7

 来看一下线程池的框架图,如下:

  1. Executor任务提交接口与Executors工具类

Executor框架同java.util.concurrent.Executor 接口在Java 5中被引入。Executor框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架。Executor存在的目的是提供一种将"任务提交"与"任务如何运行"分离开来的机制。定义如下:

public interface Executor {
    void execute(Runnable command);
}

虽然只有一个方法,但是却为灵活且强大的异步任务执行框架提供了基础。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。那么我们怎么得到Executor对象呢?这就是接下来要介绍的Exectors了。

Executors为Executor,ExecutorService,ScheduledExecutorService,ThreadFactory和Callable类提供了一些工具方法,类似于集合中的Collections类的功能。Executors方便的创建线程池。

        1>newCachedThreadPool :该线程池比较适合没有固定大小并且比较快速就能完成的小任务,它将为每个任务创建一个线程。那这样子它与直接创建线程对象(new Thread())有什么区别呢?看到它的第三个参数60L和第四个参数TimeUnit.SECONDS了吗?好处就在于60秒内能够重用已创建的线程。下面是Executors中的newCachedThreadPool()的源代码:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, 
        Integer.MAX_VALUE, 
        60L, 
        TimeUnit.SECONDS, 
        new SynchronousQueue<Runnable>());
}

  2> newFixedThreadPool使用的Thread对象的数量是有限的,如果提交的任务数量大于限制的最大线程数,那么这些任务将排队,然后当有一 个线程的任务结束之后,将会根据调度策略继续等待执行下一个任务。下面是Executors中的newFixedThreadPool()的源代码:

public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads, 
        nThreads, 
        0L, 
        TimeUnit.MILLISECONDS, 
        new LinkedBlockingQueue<Runnable>());
}

   3>newSingleThreadExecutor就是线程数量为1的FixedThreadPool,如果提交了多个任务,那么这些任务将会排 队,每个任务都会在下一个任务开始之前运行结束,所有的任务将会使用相同的线程。下面是Executors中的 newSingleThreadExecutor()的源代码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1, 0L, 
            TimeUnit.MILLISECONDS, 
            new LinkedBlockingQueue<Runnable>()));
}

     4>newScheduledThreadPool创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务。

通过如上配置的线程池的创建方法源代码,我们可以发现:

  1> 除了CachedThreadPool使用的是直接提交策略的缓冲队列以外,其余两个用的采用的都是无界缓冲队列,也就说,FixedThreadPool和SingleThreadExecutor创建的线程数量就不会超过 corePoolSize。

    2> 我们可以再来看看三个线程池采用的ThreadPoolExecutor构造方法都是同一个,使用的都是默认的ThreadFactory和handler:

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
    
    this(corePoolSize, 
        maximumPoolSize, 
        keepAliveTime, 
        unit, 
        workQueue,
        Executors.defaultThreadFactory(), 
        defaultHandler);
}

 也就说三个线程池创建的线程对象都是同组,优先权等级为正常的Thread.NORM_PRIORITY(5)的非守护线程,使用的被拒绝任务处理方式是直接抛出异常的AbortPolicy策略(前面有介绍)。

2、ExecutorService任务周期管理接口

Executor的实现通常都会创建线程来执行任务,但是使用异步方式来执行任务时,由于之前提交任务的状态不是立即可见的,所以如果要关闭应用程序时,就需要将受影响的任务状态反馈给应用程序。

为了解决执行服务的生命周期问题,Executor扩展了EecutorService接口,添加了一些用于生命周期管理的方法。如下:

public interface ExecutorService extends Executor {
    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    // 省略部分方法  
}

3、ThreadPoolExecutor线程池实现类

先来看一下这个类中定义的重要变量,如下:

private final BlockingQueue<Runnable> workQueue;              // 阻塞队列  
private final ReentrantLock mainLock = new ReentrantLock();   // 互斥锁  
private final HashSet<Worker> workers = new HashSet<Worker>();// 线程集合.一个Worker对应一个线程  
private final Condition termination = mainLock.newCondition();// 终止条件  
private int largestPoolSize;           // 线程池中线程数量曾经达到过的最大值。  
private long completedTaskCount;       // 已完成任务数量  
private volatile ThreadFactory threadFactory;     // ThreadFactory对象,用于创建线程。  
private volatile RejectedExecutionHandler handler;// 拒绝策略的处理句柄  
private volatile long keepAliveTime;   // 线程池维护线程所允许的空闲时间  
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;     // 线程池维护线程的最小数量,哪怕是空闲的  
private volatile int maximumPoolSize;  // 线程池维护的最大线程数量

其中有几个重要的规则需要说明一下:

1> corePoolSize与maximumPoolSize  由于ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小,当新任务在方法 execute(java.lang.Runnable) 中提交时:

  1. 如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的;
  2. 如果 设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池是大小固定的,如果运行的线程与corePoolSize相 同,当有新请求过来时,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理
  3. 如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程去处理请求;
  4. 如果运行的线程多于corePoolSize 并且等于maximumPoolSize,若队列已经满了,则通过handler所指定的策略来处理新请求;
  5. 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务

也就是说,处理任务的优先级为: 

  • 1. 核心线程corePoolSize > 任务队列workQueue > 最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
  • 2. 当池中的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁。

2> workQueue 线程池所使用的缓冲队列,该缓冲队列的长度决定了能够缓冲的最大数量,缓冲队列有三种通用策略:

  1) 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此 会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性;

   2) 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性;

   3) 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以 最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量.

3>ThreadFactory   使用 ThreadFactory 创建新线程。如果没有另外说明,则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

而构造方法中的threadFactory对象,是通过 Executors.defaultThreadFactory()返回的。Executors.java中的defaultThreadFactory()源码如下:

public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}

在DefaultThreadFactory类中实现了ThreadFactory接口并对其中定义的方法进行了实现,如下:

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }

    // 为线程池创建新的任务执行线程
    public Thread newThread(Runnable r) {
        // 线程对应的任务是Runnable对象r
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
        // 设为非守护线程
        if (t.isDaemon())
            t.setDaemon(false);
        // 将优先级设为Thread.NORM_PRIORITY
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

4>RejectedExecutionHandler

    当Executor已经关闭(即执行了executorService.shutdown()方法后),并且Executor将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法execute()中提交的新任务将被拒绝.

  在以上述情况下,execute 方法将调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略:

    1) 在默认的 ThreadPoolExecutor.AbortPolicy      处理程序遭到拒绝将抛出运行时 RejectedExecutionException;
    2) 在 ThreadPoolExecutor.CallerRunsPolicy        线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度

    3) 在 ThreadPoolExecutor.DiscardPolicy             不能执行的任务将被删除;

    4) 在 ThreadPoolExecutor.DiscardOldestPolicy    如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。

线程池默认会采用的是defaultHandler策略。首先看defaultHandler的定义:

private static final RejectedExecutionHandler defaultHandler=new AbortPolicy(); // 使用默认的拒绝策略

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() {
    }

    // 抛出异常  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
}

看一下其他拒绝策略的具体实现。

class MyRunnable implements Runnable {
    private String name;

    public MyRunnable(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            System.out.println(this.name + " is running.");
            Thread.sleep(100);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

如上是一个测试任务的例子,下面编写4个测试用例来测试。

1. DiscardPolicy 示例

public class DiscardPolicyDemo {
    
    private static final int THREADS_SIZE = 1;
    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {

        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY));
        // 设置线程池的拒绝策略为"丢弃"  
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

        // 新建10个任务,并将它们添加到线程池中。  
        for (int i = 0; i < 10; i++) {
            Runnable myrun = new MyRunnable("task-" + i);
            pool.execute(myrun);
        }
        // 关闭线程池  
        pool.shutdown();
    }
}

线程池pool的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),这意味着"线程池能同时运行的任务数量最大只能是1"。

线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。

根据上面分析的execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!

运行结果:

task-0 is running.
task-1 is running.

2. DiscardOldestPolicy 示例

public class DiscardOldestPolicyDemo {

    private static final int THREADS_SIZE = 1;
    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {

        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(CAPACITY));
        // 设置线程池的拒绝策略为"DiscardOldestPolicy"  
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

        // 新建10个任务,并将它们添加到线程池中。  
        for (int i = 0; i < 10; i++) {
            Runnable myrun = new MyRunnable("task-" + i);
            pool.execute(myrun);
        }
        // 关闭线程池  
        pool.shutdown();
    }
}

运行结果:

task-0 is running.
task-9 is running.

将"线程池的拒绝策略"由DiscardPolicy修改为DiscardOldestPolicy之后,当有任务添加到线程池被拒绝时,线程池会丢弃阻塞队列中末尾的任务,然后将被拒绝的任务添加到末尾。

3. AbortPolicy 示例

public class AbortPolicyDemo {

    private static final int THREADS_SIZE = 1;
    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {

        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(CAPACITY));
        // 设置线程池的拒绝策略为"抛出异常"  
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

        try {

            // 新建10个任务,并将它们添加到线程池中。  
            for (int i = 0; i < 10; i++) {
                Runnable myrun = new MyRunnable("task-" + i);
                pool.execute(myrun);
            }
        } catch (RejectedExecutionException e) {
            e.printStackTrace();
            // 关闭线程池  
            pool.shutdown();
        }
    }
}

(某一次)运行结果:

java.util.concurrent.RejectedExecutionException
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774) 
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768) 
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656) 
at AbortPolicyDemo.main(AbortPolicyDemo.java:27) 
task-0 is running. 
task-1 is running.

将"线程池的拒绝策略"由DiscardPolicy修改为AbortPolicy之后,当有任务添加到线程池被拒绝时,会抛出RejectedExecutionException。

4. CallerRunsPolicy 示例

public class CallerRunsPolicyDemo {

    private static final int THREADS_SIZE = 1;
    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {

        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(CAPACITY));
        // 设置线程池的拒绝策略为"CallerRunsPolicy"  
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // 新建10个任务,并将它们添加到线程池中。  
        for (int i = 0; i < 10; i++) {
            Runnable myrun = new MyRunnable("task-" + i);
            pool.execute(myrun);
        }

        // 关闭线程池  
        pool.shutdown();
    }
}

(某一次)运行结果:

task-2 is running.
task-3 is running. 
task-4 is running. 
task-5 is running. 
task-6 is running. 
task-7 is running. 
task-8 is running. 
task-9 is running. 
task-0 is running. 
task-1 is running.

将"线程池的拒绝策略"由DiscardPolicy修改为CallerRunsPolicy之后,当有任务添加到线程池被拒绝时,线程池会将被拒绝的任务添加到"线程池正在运行的线程"中取运行。

任务周期

线程池能够复用线程,减少线程创建,销毁,恢复等状态切换的开销,提高程序的性能。一个线程池管理了一组工作线程,同时它还包括了一个用于放置等待执行的任务的队列。

ThreadPoolExecutor类中定义了一些与线程状态与活动线程数相关的一些变量,如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 将整型的32位分为高3位和低29位,高3位表示线程池的状态,低29位表示活动的线程数  
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 29位能表示的最大二进制整数,也就是活动线程数  

// 高3位数值代表的线程池状态  
private static final int RUNNING    = -1 << COUNT_BITS;    // running 线程池能接受新任务  
private static final int SHUTDOWN   =  0 << COUNT_BITS;    // shutdown 线程池不再接受新任务  
private static final int STOP       =  1 << COUNT_BITS;    // stop 线程池不再接受新任务,不再执行队列中的任务,而且要中断正在处理的任务  
private static final int TIDYING    =  2 << COUNT_BITS;    // tidying 线程池所有任务均已终止  
private static final int TERMINATED =  3 << COUNT_BITS;    // terminated terminated()方法执行结束

由如上可知:

ctl是一个AtomicInteger类型的原子对象。ctl记录了"线程池中的任务数量"和"线程池状态"2个信息。ctl共包括32位。其中,高3位表示"线程池状态",低29位表示"线程池中的任务数量"。

RUNNING  -- 对应的高3位值是111 
SHUTDOWN  -- 对应的高3位值是000 
STOP  -- 对应的高3位值是001 
TIDYING  -- 对应的高3位值是010 
TERMINATED -- 对应的高3位值是011

线程池各个状态之间的切换如下图所示:

线程池各个状态间的转换的详细解释如下所示。

    1> RUNNING(111) -> SHUTDOWN(000) : 调用了shutdown方法,线程池实现了finalize方法,在里面调用了shutdown方法,因此shutdown可能是在finalize中被隐式调用的

    2> (RUNNING(111) or SHUTDOWN(000)) -> STOP(001) 调用了shutdownNow方法

    3> SHUTDOWN(000) -> TIDYING(010) : 当队列和线程池均为空的时候

    4> STOP(001) -> TIDYING(010) : 当线程池为空的时候

    5> TIDYING(010) -> TERMINATED(011) : terminated()方法调用完毕

说明:扩号后的3位数字表示ctl的高3位二进制值,并不关注低29位二进制的值

还有一些对常量的操作方法,只说明部分,其他的有兴趣自己可以去查看,如下:

private static int runStateOf(int c) { return c & ~CAPACITY; }      // 得到线程运行状态  
private static int workerCountOf(int c) { return c & CAPACITY; }    // 得到活动线程数  
private static int ctlOf(int rs, int wc) { return rs | wc; }        // 得到两者表示的值

来看一下ThreadPoolExecutor()中最主要的一个构造函数,如下:

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
    if(corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if(workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

调用Executors方法中的几个方法,如newCachedThreadPool()、newFixedThreadPool()时,都会间接调用上面的构造方法来初始化所有的线程池相关变量。

1、创建线程池并执行任务

有了Executor对象后,就可以调用execute()方法执行任务了。方法的源代码如下:

public void execute(Runnable command) {

    if (command == null)  // 任务为null,则抛出异常  
        throw new NullPointerException();
    int c = ctl.get();    // 取出记录着runState和workerCount 的 ctl的当前值  
      
    /* 
     *  通过workerCountOf方法从ctl所表示的int值中提取出低29位的值,也就是当前活动的线程数。如果当前 
     *  活动的线程数少于corePoolSize,则通过addWorker(command, true)新建一个线程,并将任务(command) 
     *  添加到该线程中 
     */
    if (workerCountOf(c) < corePoolSize) {      
        /* 
         * addWorker()返回值表示: 
         * 1、true 表示需要检测当前运行的线程是否小于corePoolSize 
         * 2、false 表示需要检测当前运行的线程数量是否小于maxPoolSize 
         */
        if (addWorker(command, true))
            return;       // 新线程创建成功,终止该方法的执行  
        c = ctl.get();    // 任务添加到线程失败,取出记录着runState和workerCount 的 ctl的当前值  
    }  
    /* 
     * 方法解释: 
     * isRunning(c) 当前线程池是否处于运行状态。源代码是通过判断c < SHUTDOWN 来确定返回值。由于RUNNING才会接收新任务,且只有这个值-1才小于SHUTDOWN 
     * workQueue.offer(command) 任务添加到缓冲队列 
     */
    if (isRunning(c) && workQueue.offer(command)) {// 当前线程处于运行状态且成功添加到缓冲队列  
        int recheck = ctl.get();   
        /* 
         * 如果 线程池已经处于非运行状态,则从缓冲队列中移除任务然后采用线程池指定的策略拒绝任务 
         * 如果 线程池中任务数量为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null 
         */
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)  // 得到活动线程数为0          
            addWorker(null, false);
    }  
    /* 
     * 当不满足以下两个条件时执行如下代码: 
     *  1. 当前线程池并不处于Running状态    
     *  2. 当前线程池处于Running状态,但是缓冲队列已经满了   
     */
    else if (!addWorker(command, false))
        reject(command);   // 采用线程池指定的策略拒绝任务  
}

 当前活动的线程小于corePoolSize了,那么等于和大于corePoolSize怎么处理呢?

    1>  当前活动的线程数量 >= corePoolSize 的时候,都是优先添加到队列中,直到队列满了才会去创建新的线程,在这里第20行的if语句已经体现出来了。这里利用了&&的特性,只有 当第一个条件会真时才会去判断第二个条件,第一个条件是isRunning(),判断线程池是否处于RUNNING状态,因为只有在这个状态下才会接受新 任务,否则就拒绝,如果正处于RUNNING状态,那么就加入队列,如果加入失败可能就是队列已经满了,这时候直接执行第29行。 

    2> 在execute()方法中,当 当前活动的线程数量 < corePoolSize 时,会执行addWorker()方法,关于addWorker(),它是用来直接新建线程用的,之所以叫addWorker而不是addThread是 因为在线程池中,所有的线程都用一个Worker对象包装着,来看一下这个方法:

/**
 * 创建并执行新线程
 *
 * @param firstTack 用于指定新增的线程执行的第一个任务
 * @param core      true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,
 *                  false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
 * @return 是否成功新增一个线程
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (; ; ) {
        int c = ctl.get();         // 获取记录着runState和workCount的int变量的当前值  
        int rs = runStateOf(c);    // 获取当前线程池运行的状态  
      /* 
        这个条件代表着以下几个情景,就直接返回false说明线程创建失败: 
        1.rs > SHUTDOWN; 此时不再接收新任务,且所有的任务已经执行完毕 
        2.rs = SHUTDOWN; 此时不再接收新任务,但是会执行队列中的任务,在后买年的或语句中,第一个不成立,firstTask != null成立 
        3.rs = SHUTDOWN;此时不再接收新任务,fistTask == null,任务队列workQueue已经空了 
      */
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                        firstTask == null &&
                        !workQueue.isEmpty()))
            return false;

        for (; ; ) {
            //获取当前活动的线程数  
            int wc = workerCountOf(c);
            //先判断当前活动的线程数是否大于最大值,如果超过了就直接返回false说明线程创建失败  
            //如果没有超过再根据core的值再进行以下判断  
          /* 
              1.core为true,则判断当前活动的线程数是否大于corePoolSize 
              2.core为false,则判断当前活动线程数是否大于maximumPoolSize 
          */
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //比较当前值是否和c相同,如果相同,则改为c+1,并且跳出大循环,直接执行Worker进行线程创建  
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // 获取ctl的当前值  
            if (runStateOf(c) != rs)    //检查下当前线程池的状态是否已经发生改变  
                continue retry;    //如果已经改变了,则进行外层retry大循环,否则只进行内层的循环  
            // else CAS failed due to workerCount change; retry inner loop  
        }
    }
    //下面这里就是开始创建新的线程了  
    //Worker的也是Runnable的实现类  
    Worker w = new Worker(firstTask);
    //因为不可以直接在Worker的构造方法中进行线程创建  
    //所以要把它的引用赋给t方便后面进行线程创建  
    Thread t = w.thread;

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {

        //再次取出ctl的当前值,用于进行状态的检查,防止线程池的已经状态改变了  
        int c = ctl.get();
        int rs = runStateOf(c);

        //将if语句中的条件转换为一个等价实现 :t == null || (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null))  
        //有个t == null是因为如果使用的是默认的ThreadFactory的话,那么它的newThread()可能会返回null  
      /* 
        1. 如果t == null, 则减少一个线程数,如果线程池处于的状态 > SHUTDOWN,则尝试终止线程池 
        2. 如果t != null,且rs == SHUTDOWN,则不再接收新任务,若firstTask != null,则此时也是返回false,创建线程失败 
        3. 如果t != null, 且rs > SHUTDOWN,同样不再接受新任务,此时也是返回false,创建线程失败 
      */
        if (t == null ||
                (rs >= SHUTDOWN &&
                        !(rs == SHUTDOWN &&
                                firstTask == null))) {
            decrementWorkerCount();    //减少一个活动的当前线程数  
            tryTerminate();    //尝试终止线程池  
            return false;    //返回线程创建失败  
        }

        workers.add(w);    //将创建的线程添加到workers容器中  

        int s = workers.size();    //获取当前线程活动的数量  
        if (s > largestPoolSize)    //判断当前线程活动的数量是否超过线程池最大的线程数量  
            largestPoolSize = s;    //当池中的工作线程创新高时,会将这个数记录到largestPoolSize字段中。然后就可以启动这个线程t了  
    } finally {
        mainLock.unlock();
    }

    t.start();    //开启线程  
    //若start后,状态又变成了SHUTDOWN状态(如调用了shutdownNow方法)且新建的线程没有被中断过,  
    //就要中断该线程(shutdownNow方法要求中断正在执行的线程),  
    //shutdownNow方法本身也会去中断存储在workers中的所有线程  
    if (runStateOf(ctl.get()) == STOP && !t.isInterrupted())
        t.interrupt();

    return true;
}

那么在创建线程的时候,线程执行的是什么的呢?

  我们前面提到Worker继承的其实也是Runnable,它在创建线程的时候是以自身作为任务传进先创建的线程中的,这段比较简单,我就不一一注释了,只是给出源代码给大家看吧。

Worker(Runnable firstTask) {
    this.firstTask = firstTask;
    //this指的是worker对象本身
    this.thread = getThreadFactory().newThread(this);
}

    它以自身的对象作为线程任务传进去,那么它的run方法又是怎样的呢?

public void run() {
    runWorker(this);
}

   竟然只有一句话调用runWorker()方法,这个可是重头戏,我们来看看,究竟运行的是什么。

/**
 * 执行Worker中的任务,它的执行流程是这样的:
 * 若存在第一个任务,则先执行第一个任务,否则,从队列中拿任务,不断的执行,
 * 直到getTask()返回null或执行任务出错(中断或任务本身抛出异常),就退出while循环。
 *
 * @param w woker
 */
final void runWorker(Worker w) {
    Runnable task = w.firstTask;    //将当前Worker中的任务取出来交给task,并释放掉w.firstTask占用的内存  
    w.firstTask = null;
    //用于判断线程是否由于异常终止,如果不是异常终止,在后面将会将该变量的值改为false  
    //该变量的值在processWorkerExit()会使用来判断线程是否由于异常终止  
    boolean completedAbruptly = true;
    try {
        //执行任务,直到getTask()返回的值为null,在此处就相当于复用了线程,让线程执行了多个任务  
        while (task != null || (task = getTask()) != null) {
            w.lock();
            clearInterruptsForTaskRun();//对线程池状态进行一次判断,后面我们会讲解一下该方法  
            try {
                beforeExecute(w.thread, task);    //在任务执行前需要做的逻辑方法,该方面可以由用户进行重写自定义  
                Throwable thrown = null;
                try {
                    task.run();    //开始执行任务  
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    afterExecute(task, thrown);    //在任务执行后需要做的逻辑方法,该方面可以由用户进行重写自定义  
                }
            } finally {
                task = null;
                w.completedTasks++;    //增加该线程完成的任务  
                w.unlock();
            }
        }
        completedAbruptly = false;    //线程不是异常终止  
    } finally {
        processWorkerExit(w, completedAbruptly);    //结束该线程  
    }
}

下面就是线程在执行任务之前对线程池状态的一次判断:

/**
 * 对线程的结束做一些清理和数据同步
 *
 * @param w                 封装线程的Worker
 * @param completedAbruptly 表示该线程是否结束于异常
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果completedAbruptly值为true,则说明线程是结束于异常  
    //如果不是结束于异常,那么它降在runWorker方法的while循环中的getTask()方法中已经减一了  
    if (completedAbruptly)
        decrementWorkerCount();    //此时将线程数量减一  

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;    //统计总共完成的任务数  
        workers.remove(w);    //将该线程数从workers容器中移除  
    } finally {
        mainLock.unlock();
    }

    tryTerminate();    //尝试终止线程池  

    int c = ctl.get();
    //接下来的这个if块要做的事儿了。当池的状态还是RUNNING,  
    //又要分两种情况,一种是异常结束,一种是正常结束。异常结束比较好弄,直接加个线程替换死掉的线程就好了,  
    //也就是最后的addWorker操作  
    if (runStateLessThan(c, STOP)) {    //如果当前运行状态为RUNNING,SHUTDOWN  
        if (!completedAbruptly) {    //如果线程不是结束于异常  
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;    //是否允许线程超时结束  
            if (min == 0 && !workQueue.isEmpty())    //如果允许把那个且队列不为空  
                min = 1;    //至少要保留一个线程来完成任务  
            //如果当前活动的线程数大于等于最小的值  
            // 1.不允许核心线程超时结束,则必须要使得活动线程数超过corePoolSize数才可以  
            // 2. 允许核心线程超时结束,但是队列中有任务,必须留至少一个线程  
            if (workerCountOf(c) >= min)
                return; // replacement not needed  
        }
        //直接加个线程  
        addWorker(null, false);
    }
}

前面我们的方法遇见过很多次tryTerminate()方法,到底他是怎样尝试结束线程池的呢?

/**
 * 执行该方法,根据线程池状态进行  判断是否结束线程池
 */
final void tryTerminate() {
    for (; ; ) {
        int c = ctl.get();
        if (isRunning(c) ||    //线程池正在运行中,自然不能结束线程池啦  
                runStateAtLeast(c, TIDYING) ||    //如果状态为TIDYING或TERMINATED,池中的活动线程数已经是0,自然也不需要做什么操作了  
                (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))    //线程池出于SHUTDOWN状态,但是任务队列不为空,自然不能结束线程池啦  
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate  
        /* 
          调用这个方法的目的是将shutdown信号传播给其它线程。 
          调用shutdown方法的时候会去中断所有空闲线程,如果这时候池中所有的线程都正在执行任务, 
          那么就不会有线程被中断,调用shutdown方法只是设置了线程池的状态为SHUTDOWN, 
          在取任务(getTask,后面会细说)的时候,假如很多线程都发现队列里还有任务(没有使用锁,存在竞态条件), 
          然后都去调用take,如果任务数小于池中的线程数,那么必然有方法调用take后会一直等待(shutdown的时候这些线程正在执行任务, 
          所以没能调用它的interrupt,其中断状态没有被设置),那么在没有任务且线程池的状态为SHUTDWON的时候, 
          这些等待中的空闲线程就需要被终止iinterruptIdleWorkers(ONLY_ONE)回去中断一个线程,让其从take中退出, 
          然后这个线程也进入同样的逻辑,去终止一个其它空闲线程,直到池中的活动线程数为0。 
        */
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {  
        /* 
          当状态为SHUTDOWN,且活动线程数为0的时候,就可以进入TIDYING状态了, 
          进入TIDYING状态就可以执行方法terminated(), 
          该方法执行结束就进入了TERMINATED状态(参考前文中各状态的含义以及可能的状态转变) 
        */
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();    //执行该方法,结束线程池  
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));  
                /* 
                  当线程池shutdown后,外部可能还有很多线程在等待线程池真正结束, 
                  即调用了awaitTermination方法,该方法中,外部线程就是在termination上await的, 
                  所以,线程池关闭之前要唤醒这些等待的线程,告诉它们线程池关闭结束了。 
                */
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS  
    }
}

2、关闭线程池

关闭时使用shutdown()方法,源码如下:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();    // 检查终止线程池的线程是否有权限。  
        advanceRunState(SHUTDOWN);// 设置线程池的状态为关闭状态。  
        interruptIdleWorkers();   // 中断线程池中空闲的线程  
        onShutdown();             // 钩子函数,在ThreadPoolExecutor中没有任何动作  
    } finally {
        mainLock.unlock();
    }
    tryTerminate();               // 尝试终止线程池  
}

 

本文转载自:

共有 人打赏支持
小菜鸡1
粉丝 10
博文 59
码字总数 16851
作品 0
深圳
程序员

暂无文章

你为什么在Redis里读到了本应过期的数据

一个事故的故事 晚上睡的正香突然被电话吵醒,对面是开发焦急的声音:我们的程序在访问redis的时候读到了本应过期的key导致整个业务逻辑出了问题,需要马上解决。 看到这里你可能会想:这是不...

IT--小哥
今天
2
0
祝大家节日快乐,阖家幸福! centos GnuTLS 漏洞

yum update -y gnutls 修复了GnuTLS 漏洞。更新到最新 gnutls.x86_64 0:2.12.23-22.el6 版本

yizhichao
昨天
5
0
Scrapy 1.5.0之选择器

构造选择器 Scrapy选择器是通过文本(Text)或 TextResponse 对象构造的 Selector 类的实例。 它根据输入类型自动选择最佳的解析规则(XML vs HTML): >>> from scrapy.selector import Sele...

Eappo_Geng
昨天
4
0
Windows下Git多账号配置,同一电脑多个ssh-key的管理

Windows下Git多账号配置,同一电脑多个ssh-key的管理   这一篇文章是对上一篇文章《Git-TortoiseGit完整配置流程》的拓展,所以需要对上一篇文章有所了解,当然直接往下看也可以,其中也有...

morpheusWB
昨天
5
0
中秋快乐!!!

HiBlock
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部