文档章节

线程池的结构和原理

清风伴月
 清风伴月
发布于 2017/09/03 21:32
字数 1305
阅读 1
收藏 0

线程池的基本结构

线程池的基本结构,如下图所示。用户通过使用线程池的execute方法将Runnable提交到线程池中进行执行。当线程池中的线程都很忙的时候,这个新加入的 Runnable 就会被放到等待队列。当线程空闲下来的时候,就会去等待队列里查看是否还有排队等待的任务,如果有,就会队列中取出任务并继续执行。如果没有,线程就会进入休眠。

如果我们更具体一点,就以本周第二节课所讲的ThreadPoolExecutor为例。当我们把一个Runnable交给线程池去执行的时候,这个线程池处理的流程是这样的:

  1. 先判断线程池中的核心线程们是否空闲,如果空闲,就把这个新的任务指派给某一个空闲线程去执行。如果没有空闲,并且当前线程池中的核心线程数还小于 corePoolSize,那就再创建一个核心线程。
  2. 如果线程池的线程数已经达到核心线程数,并且这些线程都繁忙,就把这个新来的任务放到等待队列中去。如果等待队列又满了,那么
  3. 查看一下当前线程数是否到达maximumPoolSize,如果还未到达,就继续创建线程。如果已经到达了,就交给RejectedExecutionHandler来决定怎么处理这个任务。

我们通过代码来验证一下上面讲的内容:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

看上去应该是一致的。另外,这份代码里有一段英文的注释,为了节约篇幅,我删掉了,大家可以去JDK的源代码里自行查看。

工作线程和等待队列

工作线程里的逻辑都封装在Worker这个内部类里了。代码如下所示:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

关于锁和并发的问题,我们先不去管他。这段代码里有几个地方要注意的,第一,我们可以使用beforeExecute和afterExecute这两个方法去监控任务的执行情况,这些方法在ThreadPoolExecutor里都是空方法,我们可以重写这些方法来实现线程池的监控。第二,就是线程的逻辑是不断地执行一个循环,去调用 getTask 方法来获得任务(第一个任务是firstTask,这个请大家自己去看,我就不讲了)。所以这个getTask有必要研究一下:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

这个方法里,前边的计数逻辑,我们先不看,这个方法的核心点在于workQueue.poll,workQueue的类型是

private final BlockingQueue<Runnable> workQueue;

这是一个阻塞队列!!这就意味着,如果某一个线程试图从这个队列中取数据,而这个队列里没有数据的时候,线程就会进入休眠了。

至于前边的那些逻辑,加加减减的,并不重要,感兴趣的可以自己查看这个方法的注释,总而言之,就是返回一个null,让worker线程退出。所以,getTask里能返回 null 的分支都是满足线程退出条件的。

RejectedExecutionHandler

当队列和线程池都满了的时候,再有新的任务到达,就必须要有一种办法来处理新来的任务。Java线程池中提供了以下四种策略:

  1. AbortPolicy: 直接抛异常
  2. CallerRunsPolicy:让调用者帮着跑这个任务
  3. DiscardOldestPolicy:丢弃队列里最老的那个任务,执行当前任务
  4. DiscardPolicy:不处理,直接扔掉

例如,我们看一个例子:

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

这个例子就是,如果线程池和队列都满了,那做为调用者,您就自己去执行 r.run 吧,我线程池是不管了。您爱咋咋滴。另外,让调用者去自己执行,也可以让调用者不要再往线程池里放任务了,有帮于减轻线程池的压力。

本文转载自:https://zhuanlan.zhihu.com/p/27232156

共有 人打赏支持
清风伴月
粉丝 1
博文 129
码字总数 255659
作品 0
海淀
程序员
私信 提问
线程池的原理和连接池的原理

一、 线程池的原理: 线程池,究竟是怎么一回事?其实线程池的原理很简单,类似于操作系统中的缓冲区的概念,它的流程如下: 先启动若干数量的线程,并让这些线程都处于睡眠状态,当客户端有一...

长平狐
2013/01/06
196
0
浅谈Java并发编程系列(六) —— 线程池的使用

线程池的作用 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的资源浪费。 提高响应速度。当任务到达时,不需要等到线程创建就能立即执行。 方便管理线程。线程是稀缺资源,...

codershamo
2017/11/29
0
0
一个Linux下C线程池的实现

在传统服务器结构中, 常是 有一个总的 监听线程监听有没有新的用户连接服务器, 每当有一个新的 用户进入, 服务器就开启一个新的线程用户处理这 个用户的数据包。这个线程只服务于这个用户 , ...

天下杰论
2014/09/18
0
0
深入源码分析-线程池的实现原理

本文主要来介绍一下其中比较简单的线程池的实现原理,希望读者们可以举一反三,通过对线程池的理解,学习并掌握所有编程中池化技术的底层原理。 创建一个线程 在Java的并发编程中,线程是十分...

小刀爱编程
10/08
0
0
深入源码分析Java线程池的实现原理(转载)

摘要: 程序的运行,其本质上,是对系统资源(CPU、内存、磁盘、网络等等)的使用。如何高效的使用这些资源是我们编程优化演进的一个方向。今天说的线程池就是一种对CPU利用的优化手段。 网上...

文文1
10/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

css hack

浏览器的兼容性一直是个头疼的问题,使用“欺骗”技术可使各个浏览器效果一致,花了些时间整理了各个浏览器的HACK,主要包括IE系列和最新版本的Chrome、Safari、Firefox、 Opera,比较全面的...

kitty1116
15分钟前
0
0
zookeeper脑裂问题

一、为什么zookeeper要部署基数台服务器? 二、zookeeper脑裂(Split-Brain)问题 2.1、什么是脑裂? 2.2、什么原因导致的? 2.2、zookeeper是如何解决的? 一、为什么zookeeper要部署基数台...

tantexian
21分钟前
1
0
Spring事务传播行为详解

前言 Spring在TransactionDefinition接口中规定了7种类型的事务传播行为。事务传播行为是Spring框架独有的事务增强特性,他不属于的事务实际提供方数据库行为。这是Spring为我们提供的强大的...

码代码的小司机
31分钟前
2
0
Android设备通过fastboot刷入TWRP

方法一:通过fastboot刷入TWRP的方式 首先去TWRP官网下载TWRP安装文件https://twrp.me/Devices/ 1.进入bootloader adb reboot bootloader 也可在开机时,同时按住电源键+音量减,进入bootloa...

robslove
36分钟前
0
0
为何译为“东家机”和“宾客机”

学习过虚拟化、云计算的人大概都知道,并且都在自己的电脑上安装KVM,如下图所示: 什么情况?不过是在物理机的Windows 10上安装了VMware;在VMware上安装了Linux CentOS 7操作系统;又在其上...

大别阿郎
50分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部