文档章节

Java concurrent之ThreadPoolExecutor线程池

嗨椒爆炒小小鱼
 嗨椒爆炒小小鱼
发布于 2016/07/19 16:31
字数 1347
阅读 61
收藏 5

线程池并不神秘!

ThreadPoolExecutor其实是实现了ExecutorService接口的具体实现。而ExecutorService继承了Executor接口,在Executor接口中定义了一系列execute(Runnable command)接口。ExecutorService二级接口中有添加了更多的功能接口如shutdown(); submit(Callable<T> task)

ThreadPoolExecutor类中有一系列重要的属性变量:(可暂时略过,不过看完后面的逻辑,相信你会回来找的)

    //控制线程池的生命周期的参数
    volatile int runState;
    static final int RUNNING    = 0;
    static final int SHUTDOWN   = 1;
    static final int STOP       = 2;
    static final int TERMINATED = 3;

    //用来存放任务的一个队列,队列里的任务会转接给woker变量里的线程处理
    private final BlockingQueue<Runnable> workQueue;

    //针对操作ThreadPoolExecutor对象成员变量的锁。
    private final ReentrantLock mainLock = new ReentrantLock();

    //用来支持awaitTermination的Wait condition 

    private final Condition termination = mainLock.newCondition();

    //包含线程池中所有的worker threads. 当持有mainLock锁的时候才有访问权限
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * 线程空闲等待超时时间,以毫秒记。
     * 当线程池线程数量大于corePoolSize或者allowCoreThreadTimeOut为true时使用。
     */
    private volatile long  keepAliveTime;

    /**
     * false: 即使线程处于空闲状态,仍然存活(不会使用keepAliveTime去判断是否结束线程)
     * true: 如果线程处于空闲状态,则按照keepAliveTime判断结束线程
     */
    private volatile boolean allowCoreThreadTimeOut;

    //线程池常驻线程数量,只有获得mainLock锁时才能修改,但是对所有多线程读操作实时可见.
    private volatile int   corePoolSize;

    // 线程池最大线程数量,只有获得mainLock锁时才能修改,但是对所有多线程读操作实时可见.
    private volatile int   maximumPoolSize;

    // 线程池当前线程数量,只有获得mainLock锁时才能修改,但是对所有多线程读操作实时可见.
    private volatile int   poolSize;

    //线程池数量饱和或线程池关闭时会调用的处理类
    private volatile RejectedExecutionHandler handler;

    //线程池添加新线程的工厂类。线程池中所有的线程都由工厂类生成。
    private volatile ThreadFactory threadFactory;

    //线程池达到的最大大小.
    private int largestPoolSize;

    //线程池完成任务计数器,worker threads中止时会更新.
    private long completedTaskCount;

    //线程池拒绝执行task的默认处理类
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

线程池执行任务核心入口方法execute()具体代码如下:

public void execute(Runnable command) {
	if (command == null)
		throw new NullPointerException();
	if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
		if (runState == RUNNING && workQueue.offer(command)) {
			if (runState != RUNNING || poolSize == 0)
				ensureQueuedTaskHandled(command);
		}
		else if (!addIfUnderMaximumPoolSize(command))
			reject(command); // is shutdown or saturated
	}
}

过程如下:

  1. 执行的任务commandnull时,抛出异常
  2. poolSize < corePoolSize时,表示线程池的数量小于线程池预期的常驻线程数量,调用addIfUnderCorePoolSize方法:1)生成新的thread对象;2)执行线程任务
  3.     private boolean addIfUnderCorePoolSize(Runnable firstTask) {
            Thread t = null;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (poolSize < corePoolSize && runState == RUNNING)
                    t = addThread(firstTask);
            } finally {
                mainLock.unlock();
            }
            if (t == null)
                return false;
            t.start();
            return true;
        }

    addThread()并不是threadFactory.newThread()那么简单。woker对象初始化的时候意味着线程池里有新线程的生成,这个时候传入的Runnable实现类被标记为首个任务。1)生成worker对象;2)生成新的线程;3)新线程指派给worker对象,并将woker对象放入阻塞队列中;4)增加线程池当前线程数量,修改线程池到达最大数量

        private Thread addThread(Runnable firstTask) {
            Worker w = new Worker(firstTask);
            Thread t = threadFactory.newThread(w);//注意此时新线程的run实现由Woker提供
            if (t != null) {
                w.thread = t;
                workers.add(w);
                int nt = ++poolSize;
                if (nt > largestPoolSize)
                    largestPoolSize = nt;
            }
            return t;
        }

    线程池内线程具体执行任务的代码:1)取出开始的首任务执行 2)遍历从workQueue中取任务执行

            /**
             * Main run loop
             */
            public void run() {
                try {
                    Runnable task = firstTask;
                    firstTask = null;
                    while (task != null || (task = getTask()) != null) {
                        runTask(task);//里边还会递归调用run方法
                        task = null;
                    }
                } finally {
                    workerDone(this);
                }
            }
        }

    线程池如何保证线程一直运行呢?getTask()时阻塞队列的take()实现会让线程在无任务时进入阻塞状态(RetrantLockCondition实现)

        Runnable getTask() {
            for (;;) {
                try {
                    int state = runState;
                    if (state > SHUTDOWN)
                        return null;
                    Runnable r;
                    if (state == SHUTDOWN)  // Help drain queue
                        r = workQueue.poll();//取出并删除队列顶部一个元素,失败时并不会阻塞而是返回false
                    else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                        r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                    else
                        r = workQueue.take();//取出并删除队列顶部一个元素,如果workQueue无任务则当前线程进入阻塞状态
                    if (r != null)
                        return r;
                    if (workerCanExit()) {
                        if (runState >= SHUTDOWN) // Wake up others
                            interruptIdleWorkers();
                        return null;
                    }
                    // Else retry
                } catch (InterruptedException ie) {
                    // On interruption, re-check runState
                }
            }
        }
  4. poolSize >= corePoolSize时,此时不会创建新的线程,而是把这些任务放到待工作队列workQueue
    workQueue.offer(command)
  5. poolSize >= corePoolSize且工作队列workQueue满员时(对于无界队列貌似不会发生):1)生成新的thread对象;2)执行线程任务
        private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
            Thread t = null;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (poolSize < maximumPoolSize && runState == RUNNING)//需要线程池线程数量小于最大数
                    t = addThread(firstTask);
            } finally {
                mainLock.unlock();
            }
            if (t == null)
                return false;
            t.start();
            return true;
        }
  6. 如果仍然往一个工作负荷满载的线程池指派任务的话,那么线程池会使用拒绝执行策略来处理。具体的策略类有很多可以选择,甚至可以自己实现。
        void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }

     

© 著作权归作者所有

嗨椒爆炒小小鱼
粉丝 7
博文 5
码字总数 4757
作品 0
浦东
私信 提问
Java多线程之线程池(ThreadPoolExecutor)实现原理分析(一)

在上一篇文章Java中实现多线程的3种方法介绍和比较中,我们讲解了Java中实现多线程的3种方法。使用多线程,就必须要考虑使用线程池,今天我们来聊聊线程池的那些事。 注:源码都是基于JDK1....

小怪聊职场
2018/05/14
0
0
Java线程框架_Executor

Executor 框架是 juc 里提供的线程池的实现。前两天看了下 Executor 框架的一些源码,做个简单的总结。 线程池大概的思路是维护一个的线程池用于执行提交的任务。我理解池的技术的主要意义有...

天呀鲁哇
2015/02/05
0
0
ThreadPoolExecutor源码分析及阻塞提交任务方法

ThreadPoolExecutor源码 ThreadPoolExecutor 基本使用参考:ThreadPoolExecutor执行过程分析 线程池状态标志 ctl 保存了线程池的运行状态(runState)和线程池内有效线程数量(workerCount)...

4rnold
2018/06/28
0
0
Java多线程学习(八)线程池与Executor 框架

Java面试通关手册(Java学习指南,欢迎Star,会一直完善下去,欢迎建议和指导):https://github.com/Snailclimb/JavaGuide 历史优质文章推荐: Java并发编程指南专栏 分布式系统的经典基础理...

snailclimb
2018/05/31
0
0
为何服务器QPS上不去?Java线程调优权威指南

从刚问世起,Java 的部分魅力就来自其多线程。即便在多核和多 CPU 系统司空见惯之前,能够轻松编写多线程程序也是 Java 的一个标志性特征。 Java 性能方面的吸引力显而易见:如果有两个 CPU ...

高可用架构
2016/05/13
0
0

没有更多内容

加载失败,请刷新页面

加载更多

UserInputControls用户输入控制

enum UserInputControls { kGovernedByOrthoMode = 0x0001,//正交模式管理 kNullResponseAccepted = 0x0002,//允许输入空 kDontEchoCancelForCtrlC = 0x0004,//ctrl C 模式不能重复......

一个小妞
21分钟前
0
0
分布式系统的事务处理

当我们在生产线上用一台服务器来提供数据服务的时候,我会遇到如下的两个问题: 1)一台服务器的性能不足以提供足够的能力服务于所有的网络请求。 2)我们总是害怕我们的这台服务器停机,造成...

群星纪元
23分钟前
2
0
Kanban VS Scrum:哪个是最好的敏捷项目管理框架

“我们使用敏捷开发。”在与软件开发团队交流时,你会听到很多这样的说法。根据统计,2018年全球约有90%的开发人员在使用敏捷开发。Choerodon猪齿鱼团队也是其中之一。 但是,敏捷并不统一。...

Choerodon
27分钟前
0
0
vue select 慢一步

我在使用vue的select的时候,@change事件里面,写一个axios拿到数据,然后修改某个变量,按理修改变量会立即呈现在页面上面,但实际情况是,值是变更了,但是页面上的效果慢了一步,非常奇怪...

朝如青丝暮成雪
31分钟前
0
0
分布式架构 核心知识体系

1.问题 1、何为分布式何为微服务? 2、为什么需要分布式? 3、分布式核心理论基础,节点、网络、时间、顺序,一致性? 4、分布式是系统有哪些设计模式? 5、分布式有哪些类型? 6、如何实现分...

小刀爱编程
34分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部