文档章节

Java线程框架_Executor

天呀鲁哇
 天呀鲁哇
发布于 2015/02/05 15:15
字数 2618
阅读 172
收藏 23
点赞 0
评论 0

Executor 框架是 juc 里提供的线程池的实现。前两天看了下 Executor 框架的一些源码,做个简单的总结。

线程池大概的思路是维护一个的线程池用于执行提交的任务。我理解池的技术的主要意义有两个:

  1. 1.    资源的控制,如并发量限制。像连接池这种是对数据库资源的保护。

  2. 2.    资源的有效利用,如线程复用,避免频繁创建线程和线程上下文切换。

那么想象中设计一个线程池就需要有线程池大小、线程生命周期管理、等待队列等等功能,下面结合代码看看原理。

Excutor 整体结构如下:

Executor 接口定义了最基本的 execute 方法,用于接收用户提交任务。 ExecutorService 定义了线程池终止和创建及提交 futureTask 任务支持的方法。

AbstractExecutorService 是抽象类,主要实现了 ExecutorServicefutureTask 相关的一些任务创建和提交的方法。

ThreadPoolExecutor 是最核心的一个类,是线程池的内部实现。线程池的功能都在这里实现了,平时用的最多的基本就是这个了。其源码很精练,远没当时想象的多。

ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基础上提供了支持定时调度的功能。线程任务可以在一定延时时间后才被触发执行。

1.ThreadPoolExecutor 原理  

1.1 ThreadPoolExecutor内部的几个重要属性    

   

   

1.线程池本身的状态    

Java代码  收藏代码

  1. volatile int runState;   

  2. static final int RUNNING = 0;   

  3. static final int SHUTDOWN = 1;   

  4. static final int STOP = 2;   

  5. static final int TERMINATED = 3;   

 

2.等待任务队列和工作集

Java代码  收藏代码

  1. private final BlockingQueue<Runnable> workQueue; //等待被执行的Runnable任务   

  2. private final HashSet<Worker> workers = new HashSet<Worker>(); //正在被执行的Worker任务集   


3.线程池的主要状态锁。线程池内部的状态变化 ( 如线程大小 ) 都需要基于此锁。

Java代码  收藏代码

  1. private final ReentrantLock mainLock = new ReentrantLock();  

 

4.线程的存活时间和大小

Java代码  收藏代码

  1. private volatile long keepAliveTime;// 线程存活时间   

  2. private volatile boolean allowCoreThreadTimeOut;// 是否允许核心线程存活   

  3. private volatile int corePoolSize;// 核心池大小   

  4. private volatile int maximumPoolSize; // 最大池大小   

  5. private volatile int poolSize; //当前池大小   

  6. private int largestPoolSize; //最大池大小,区别于maximumPoolSize,是用于记录线程池曾经达到过的最大并发,理论上小于等于maximumPoolSize。   

 

5.线程工厂和拒绝策略

Java代码  收藏代码

  1. private volatile RejectedExecutionHandler handler;// 拒绝策略,用于当线程池无法承载新线程是的处理策略。  

  2.  private volatile ThreadFactory threadFactory;// 线程工厂,用于在线程池需要新创建线程的时候创建线程  

 

6.线程池完成任务数

Java代码  收藏代码

  1. private long completedTaskCount;//线程池运行到当前完成的任务数总和  

 

1.2 ThreadPoolExecutor 的内部工作原理

有了以上定义好的数据,下面来看看内部是如何实现的 。 Doug Lea 的整个思路总结起来就是  5 句话:

  1. 1.    如果当前池大小 poolSize 小于 corePoolSize ,则创建新线程执行任务。

  2. 2.    如果当前池大小 poolSize 大于 corePoolSize ,且等待队列未满,则进入等待队列

  3. 3.    如果当前池大小 poolSize 大于 corePoolSize 且小于 maximumPoolSize ,且等待队列已满,则创建新线程执行任务。

  4. 4.    如果当前池大小 poolSize 大于 corePoolSize 且大于 maximumPoolSize ,且等待队列已满,则调用拒绝策略来处理该任务。

  5. 5.    线程池里的每个线程执行完任务后不会立刻退出,而是会去检查下等待队列里是否还有线程任务需要执行,如果在 keepAliveTime 里等不到新的任务了,那么线程就会退出。

 

下面看看代码实现 :

线程池最重要的方法是由 Executor 接口定义的 execute 方法 , 是任务提交的入口。  

我们看看 ThreadPoolExecutor.execute(Runnable cmd) 的实现:

 

Java代码  收藏代码

  1. public void execute(Runnable command) {  

  2.         if (command == null)  

  3.             throw new NullPointerException();  

  4.         if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  

  5.             if (runState == RUNNING && workQueue.offer(command)) {  

  6.                 if (runState != RUNNING || poolSize == 0)  

  7.                     ensureQueuedTaskHandled(command);  

  8.             }  

  9.             else if (!addIfUnderMaximumPoolSize(command))  

  10.                 reject(command); // is shutdown or saturated  

  11.         }  

  12. }  

 

解释如下:  

当提交一个新的 Runnable 任务:

分支1 :    如果当前池大小小于 corePoolSize, 执行 addIfUnderCorePoolSize(command) , 如果线程池处于运行状态且 poolSize < corePoolSize addIfUnderCorePoolSize(command) 会做如下事情,将 Runnable 任务封装成 Worker 任务 , 创建新的 Thread ,执行 Worker 任务。如果不满足条件,则返回 false 。代码如下:

 

Java代码  收藏代码

  1.  private boolean addIfUnderCorePoolSize(Runnable firstTask) {  

  2.         Thread t = null;  

  3.         final ReentrantLock mainLock = this.mainLock;  

  4.         mainLock.lock();  

  5.         try {  

  6.             if (poolSize < corePoolSize && runState == RUNNING)  

  7.                 t = addThread(firstTask);  

  8.         } finally {  

  9.             mainLock.unlock();  

  10.         }  

  11.         if (t == null)  

  12.             return false;  

  13.         t.start();  

  14.         return true;  

  15. }  

 

    分支2   如果大于 corePoolSize 或 1 失败失败,则:

  •     如果等待队列未满,把 Runnable 任务加入到 workQueue 等待队列

    workQueue .offer(command)

     

  •     如多等待队列已经满了,调用 addIfUnderMaximumPoolSize(command) ,和 addIfUnderCorePoolSize 基本类似,只不过判断条件是 poolSize < maximumPoolSize 。如果大于 maximumPoolSize ,则把 Runnable 任务交由 RejectedExecutionHandler 来处理。

   

问题:如何实现线程的复用 ?      

Doug Lea  的实现思路是 线程池里的每个线程执行完任务后不立刻退出,而是去检查下等待队列里是否还有线程任务需要执行,如果在 keepAliveTime 里等不到新的任务了,那么线程就会退出。这个功能的实现 关键在于 Worker  。线程池在执行 Runnable  任务的时候,并不单纯把 Runnable  任务交给创建一个 Thread  。而是会把 Runnable  任务封装成 Worker  任务。

下面看看 Worker  的实现:

 代码很简单,可以看出, worker  里面包装了 firstTask  属性,在构造worker  的时候传进来的那个 Runnable  任务就是 firstTask  。 同时也实现了Runnable接口,所以是个代理模式,看看代理增加了哪些功能。 关键看 woker  的 run  方法:

Java代码  收藏代码

  1. public void run() {  

  2.            try {  

  3.                Runnable task = firstTask;  

  4.                firstTask = null;  

  5.                while (task != null || (task = getTask()) != null) {  

  6.                    runTask(task);  

  7.                    task = null;  

  8.                }  

  9.            } finally {  

  10.                workerDone(this);  

  11.            }  

  12.        }  

 可以看出 worker 的 run 方法是一个循环,第一次循环运行的必然是 firstTask ,在运行完 firstTask 之后,并不会立刻结束,而是会调用 getTask 获取新的任务( getTask 会从等待队列里获取等待中的任务),如果 keepAliveTime 时间内得到新任务则继续执行,得不到新任务则那么线程才会退出。这样就保证了多个任务可以复用一个线程,而不是每次都创建新任务。 keepAliveTime 的逻辑在哪里实现的呢?主要是利用了 BlockingQueue 的 poll 方法支持等待。可看 getTask 的代码段:

 

Java代码  收藏代码

  1. if (state == SHUTDOWN)  // Help drain queue  

  2.     r = workQueue.poll();  

  3. else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  

  4.     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  

  5. else  

  6.     r = workQueue.take();  

2.ThreadFactory 和R ejectedExecutionHandler  

ThreadFactoryRejectedExecutionHandler是ThreadPoolExecutor的两个属性,也 可以认为是两个简单的扩展点 . ThreadFactory 是创建线程的工厂。  

默认的线程工厂会创建一个带有“ pool-poolNumber-thread-threadNumber ”为名字的线程,如果我们有特别的需要,如线程组命名、优先级等,可以定制自己的 ThreadFactory

RejectedExecutionHandler 是拒绝的策略。常见有以下几种:

AbortPolicy :不执行,会抛出 RejectedExecutionException 异常。

CallerRunsPolicy :由调用者(调用线程池的主线程)执行。

DiscardOldestPolicy :抛弃等待队列中最老的。

DiscardPolicy: 不做任何处理,即抛弃当前任务。

 

3.ScheduledThreadPoolExecutor    

ScheduleThreadPoolExecutor 是对ThreadPoolExecutor的集成。增加了定时触发线程任务的功能。需要注意     

从内部实现看, ScheduleThreadPoolExecutor  使用的是  corePoolSize   线程和一个无界队列的固定大小的池,所以调整  maximumPoolSize   没有效果。无界队列是一个内部自定义的 DelayedWorkQueue

ScheduleThreadPoolExecutor  线程池接收定时任务的方法是 schedule ,看看内部实现:

 

Java代码  收藏代码

  1. public ScheduledFuture<?> schedule(Runnable command,  

  2.                                    long delay,  

  3.                                    TimeUnit unit) {  

  4.     if (command == null || unit == null)  

  5.         throw new NullPointerException();  

  6.     RunnableScheduledFuture<?> t = decorateTask(command,  

  7.         new ScheduledFutureTask<Void>(command, null,  

  8.                                       triggerTime(delay, unit)));  

  9.   

  10.     delayedExecute(t);  

  11.     return t;  

  12. }  

 

       以上代码会初始化一个 RunnableScheduledFuture 类型的任务 t, 并交给 delayedExecute 方法。 delayedExecute(t) 方法实现如下:

     

Java代码  收藏代码

  1.     private void delayedExecute(Runnable command) {  

  2.         if (isShutdown()) {  

  3.             reject(command);  

  4.             return;  

  5.         }  

  6.         if (getPoolSize() < getCorePoolSize())  

  7.             prestartCoreThread();  

  8.   

  9.         super.getQueue().add(command);  

  10. }  

 

 

如果当前线程池大小 poolSize 小于 CorePoolSize ,则创建一个新的线程,注意这里创建的线程是空的,不会把任务直接交给线程来做,而是把线程任务放到队列里。因为任务是要定时触发的,所以不能直接交给线程去执行。  

问题: 那如何做到定时触发呢?  

关键在于DelayedWorkQueue,它代理了  DelayQueue 。可以认为 DelayQueue 是这样一个队列(具体可以去看下源码,不详细分析):

  1. 1.         队列里的元素按照任务的 delay 时间长短升序排序, delay 时间短的在队头, delay 时间长的在队尾。

  2. 2.                       DelayQueue  里 FIFO  的获取一个元素的时候,不会直接返回 head  。可能会阻塞,等到 head  节点到达 delay  时间后才能被获取。可以看下 DelayQueue  的 take  方法实现:

 

Java代码  收藏代码

  1. public E take() throws InterruptedException {  

  2.     final ReentrantLock lock = this.lock;  

  3.     lock.lockInterruptibly();  

  4.     try {  

  5.         for (;;) {  

  6.             E first = q.peek();  

  7.             if (first == null) {  

  8.                 available.await();  

  9.             } else {  

  10.                 long delay =  first.getDelay(TimeUnit.NANOSECONDS);  

  11.                 if (delay > 0) {  

  12.                     long tl = available.awaitNanos(delay);//等待delay时间  

  13.                 } else {  

  14.                     E x = q.poll();  

  15.                     assert x != null;  

  16.                     if (q.size() != 0)  

  17.                         available.signalAll(); // wake up other takers  

  18.                     return x;  

  19.                 }  

  20.             }  

  21.         }  

  22.     } finally {  

  23.         lock.unlock();  

  24.     }  

  25. }  

     

 

4.线程池使用策略

通过以上的详解基本上能够定制出自己需要的策略了,下面简单介绍下Executors里面提供的一些常见线程池策略:

1.FixedThreadPool

Java代码  收藏代码

  1. public static ExecutorService newFixedThreadPool(int nThreads) {  

  2.     return new ThreadPoolExecutor(nThreads, nThreads,  

  3.                                   0L, TimeUnit.MILLISECONDS,  

  4.                                   new LinkedBlockingQueue<Runnable>());  

  5. }  

 实际上就是个不支持keepalivetime,且corePoolSize和maximumPoolSize相等的线程池。

2.SingleThreadExecutor

Java代码  收藏代码

  1. public static ExecutorService newSingleThreadExecutor() {  

  2.     return new FinalizableDelegatedExecutorService  

  3.         (new ThreadPoolExecutor(1, 1,  

  4.                                 0L, TimeUnit.MILLISECONDS,  

  5.                                 new LinkedBlockingQueue<Runnable>()));  

  6. }  

  实际上就是个不支持keepalivetime,且corePoolSize和maximumPoolSize都等1的线程池。

3.CachedThreadPool

Java代码  收藏代码

  1. public static ExecutorService newCachedThreadPool() {  

  2.     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  

  3.                                   60L, TimeUnit.SECONDS,  

  4.                                   new SynchronousQueue<Runnable>());  

  5. }  

 实际上就是个支持keepalivetime时间是60秒(线程空闲存活时间),且corePoolSize为0,maximumPoolSize无穷大的线程池。

4.SingleThreadScheduledExecutor

Java代码  收藏代码

  1. public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {  

  2.     return new DelegatedScheduledExecutorService  

  3.         (new ScheduledThreadPoolExecutor(1, threadFactory));  

  4. }  

 实际上是个corePoolSize为1的ScheduledExecutor。上文说过ScheduledExecutor采用无界等待队列,所以maximumPoolSize没有作用。

5.ScheduledThreadPool

Java代码  收藏代码

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  

  2.     return new ScheduledThreadPoolExecutor(corePoolSize);  

  3. }  

  实际上是corePoolSize课设定的ScheduledExecutor。上文说过ScheduledExecutor采用无界等待队列,所以maximumPoolSize没有作用。

 

以上还不一定满足你的需要,完全可以根据自己需要去定制。


<线程相关  先暂时不整理   等慢慢理解   因为这些比较难所以慢点  参考数据  java并发编程实战>


本文转载自:http://singleant.iteye.com/blog/1423931

共有 人打赏支持
天呀鲁哇
粉丝 8
博文 98
码字总数 42007
作品 0
长宁
程序员
读书笔记之《Java并发编程的艺术》-线程池和Executor的子孙们

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
Java并发的四种风味:Thread、Executor、ForkJoin和Actor

这篇文章讨论了Java应用中并行处理的多种方法。从自己管理Java线程,到各种更好几的解决方法,Executor服务、ForkJoin 框架以及计算中的Actor模型。 Java并发编程的4种风格:Threads,Execu...

恶魔永生
2015/01/14
0
0
Java多线程学习(八)线程池与Executor 框架

Java面试通关手册(Java学习指南,欢迎Star,会一直完善下去,欢迎建议和指导):https://github.com/Snailclimb/JavaGuide 历史优质文章推荐: Java并发编程指南专栏 分布式系统的经典基础理...

snailclimb
05/31
0
0
Java并发教程-7高级并发对象

目前为止,该教程重点讲述了最初作为Java平台一部分的低级别API。这些API对于非常基本的任务来说已经足够,但是对于更高级的任务就需要更高级的API。特别是针对充分利用了当今多处理器和多核...

noday
2014/04/25
0
0
读书笔记之《Java并发编程的艺术》-并发编程容器和框架(重要)

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
读书笔记之《Java并发编程的艺术》-并发编程基础

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
8
JAVA多线程和并发基础面试问答

Java多线程面试问题 1. 进程和线程之间有什么不同? 一个进程是一个独立(self contained)的运行环境,它可以被看作一个程序或者一个应用。而线程是在进程中执行的一个任务。Java运行环境是一...

清风傲剑
2014/12/06
0
0
JAVA多线程和并发基础面试问答

原文链接 译文连接 作者:Pankaj 译者:郑旭东 校对:方腾飞 多线程和并发问题是Java技术面试中面试官比较喜欢问的问题之一。在这里,从面试的角度列出了大部分重要的问题,但是你仍然应该牢...

雷神雨石
2014/07/19
0
2
JAVA多线程和并发基础面试问答

多线程和并发问题是Java技术面试中面试官比较喜欢问的问题之一。在这里,从面试的角度列出了大部分重要的问题,但是你仍然应该牢固的掌握Java多线程基础知识来对应日后碰到的问题。(校对注:...

LCZ777
2014/05/26
0
0
JAVA多线程和并发基础面试问答

Java多线程面试问题 1. 进程和线程之间有什么不同? 一个进程是一个独立(self contained)的运行环境,它可以被看作一个程序或者一个应用。而线程是在进程中执行的一个任务。Java运行环境是一...

hanzhankang
2014/01/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

《Linux Perf Master》Edition 0.4 发布

在线阅读:https://riboseyim.gitbook.io/perf 在线阅读:https://www.gitbook.com/book/riboseyim/linux-perf-master/details 百度网盘【pdf、mobi、ePub】:https://pan.baidu.com/s/1C20T......

RiboseYim
8分钟前
0
0
conda 换源

https://mirrors.tuna.tsinghua.edu.cn/help/anaconda/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/conda config --add channels https://mir......

阿豪boy
17分钟前
0
0
Confluence 6 安装补丁类文件

Atlassian 支持或者 Atlassian 缺陷修复小组可能针对有一些关键问题会提供补丁来解决这些问题,但是这些问题还没有放到下一个更新版本中。这些问题将会使用 Class 类文件同时在官方 Jira bug...

honeymose
27分钟前
0
0
设计模式:代理模式

代理模式可以分为三种:静态代理,动态代理,cglib代理 1.静态代理:被代理的类需要实现一接口或是继承一父类 委托类(被代理的类): package com.java.pattern.proxy.staticdemo;publ...

人觉非常君
30分钟前
0
0
非常实用的IDEA插件之总结

1、Alibaba Java Coding Guidelines 经过247天的持续研发,阿里巴巴于10月14日在杭州云栖大会上,正式发布众所期待的《阿里巴巴Java开发规约》扫描插件!该插件由阿里巴巴P3C项目组研发。P3C...

Gibbons
35分钟前
0
0
Tomcat介绍,安装jdk,安装tomcat,配置Tomcat监听80端口

Tomcat介绍 Tomcat是Apache软件基金会(Apache Software Foundation)的Jakarta项目中的一个核心项目,由Apache、Sun和其他一些公司及个人共同开发而成。 java程序写的网站用tomcat+jdk来运行...

TaoXu
36分钟前
0
0
TensorFlow,从一个 Android Demo 开始

TensorFlow Android Demo 项目地址 Machine Learning 既然提到了 TensorFlow,那是不是得神经网络、机器学习了解下? 如果你能坚持把 机器学习速成课程 给啃完了,觉得还挺有兴趣的,那可以考...

孟飞阳
38分钟前
0
0
JVM学习笔记二:内存结构规范

1、JVM基本结构图 2、java堆(Heap) 3、方法区(Method Area) 4、程序计数器 5、JAVA栈图解 局部变量表:八大基本类型,还可以存储引用类型 上一篇:JVM学习笔记一:类加载机制介绍...

刘祖鹏
43分钟前
0
0
mui集成微信H5支付(返回白屏问题已经解决)

一.项目需求 因为公司人员缺少,没有专门开发安卓和ios的人员,为了项目尽早上线采用了混合APP开发的方式,我选择了MUI混合开发框架,项目中需要在用户购买VIP会员的时候进行支付,所以需要在项目...

银装素裹
47分钟前
0
0
SpringBoot集成Redis--配置自定义的RedisCacheManager

配置自定义的RedisCacheManager--1自定义键生成规则 默认的键生成器 当不指定缓存的key时,SpringBoot会使用SimpleKeyGenerator生成key。 SimpleKeyGenerator SimpleKey 查看源码可以发现,它...

karma123
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部