文档章节

JAVA线程池详解

ifree613
 ifree613
发布于 2017/04/06 13:37
字数 2262
阅读 115
收藏 2
点赞 0
评论 0

JAVA线程池详解

说明:基于JDK 1.7.0.79

为什么要使用线程池而不是显示创建线程

  • 减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题;
  • 没有线程池,则可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

怎么更好的使用线程池

  • 线程池不允许使用 Executors 去创建,虽然使用简单,但是会存在以下问题;

    • FixedThreadPool 和 SingleThreadPool:

      允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。如下:

      FixedThreadPool
      public static ExecutorService newFixedThreadPool(int nThreads) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>());
      }
      SingleThreadPool
      public static ExecutorService newSingleThreadExecutor() {
      return new FinalizableDelegatedExecutorService
          (new ThreadPoolExecutor(1, 1,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>()));
      }
      
      LinkedBlockingQueue
      public LinkedBlockingQueue() {
          this(Integer.MAX_VALUE);
      }

      注意:以上代码可以看出,在进行设置阻塞队列时为new LinkedBlockingQueue,而该队列的默认大小为 Integer.MAX_VALUE。

    • CachedThreadPool 和 ScheduledThreadPool:

      允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。如下:

      CachedThreadPool
      public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());
      }
      ScheduledThreadPool
      public ScheduledThreadPoolExecutor(int corePoolSize) {
          super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
                new DelayedWorkQueue());
      }
      

      注意:以上代码可以看出,在进行 ThreadPoolExecutor 初始化时,maximumPoolSize 值为 Integer.MAX_VALUE 。 

  • 通过 ThreadPoolExecutor 方式创建线程池,让开发人员更加明确线程池的运行规则,规避资源耗尽的风险。

基于ThreadPoolExecutor创建线程池

ThreadPoolExecutor构造

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

参数说明

  • corePoolSize 核心线程大小;

    • 初次创建线程池时默认为 0 ,随着新任务加入持续新增线程,即使存在空闲的线程也会新建线程,只要没有大于 corePoolSize ,那么线程一旦创建就不会被回收;
    • 若设置了 allowCoreThreadTimeOut 参数为true,则表示 corePoolSize 存在空闲线程时,并且时间超过 keepAliveTime 设置时,将关闭线程。默认该参数为true,表示只要没有超过 corePoolSize 设置的大小,就不会被回收。
  • maximumPoolSize 该线程池最大能允许的线程数量,若当前任务大于了阻塞队列的大小时,会被创建使用,空闲时会被回收;

  • keepAliveTime 设置空闲线程的等待时间,超过后会被回收(条件是该线程不是核心线程),默认60秒;

  • unit 时间单位,配合 keepAliveTime 一起使用;

  • workQueue 阻塞并且线程安全的队列,当前的核心线程满后,则会把多余的任务放在阻塞队列中;当线程池中有空闲线程时就回去任务队列中拿任务并处理。

  • threadFactory 创建新线程的方式,可设置的线程名字,建议指定一个有意义的名字,便于排查错误;

  • handler 线程池当达到最大线程数量时所采用的拒绝策略,默认AbortPolicy,其他还有:DiscardPolicy、CallerRunsPolicy、DiscardOldestPolicy

线程数量的判定

定义:poolSize 为当前创建的线程数量

  • 当创建的线程 poolSize < corePoolSize 时,每次的请求,都会新创建一个线程来处理,即使是现在存在空闲的线程,只要没有达到corePoolSize就会一直创建新线程来处理,除非设置了allowCoreThreadTimeOut=true,则空闲达到keepAliveTime空闲时间才会被回收;

  • 当创建的线程 poolSize > corePoolSize 时,当存在新请求的时候,此时,新请求会被加入到BlockingQueue中;

  • 当 BlockingQueue 满了后,同时继续收到新请求,又会重新创建线程,执行任务(此时需要注意该阻塞队列,不要设置得太大,否则maximumPoolSize将失去意义,这个也是为什么不用使用 Executors 工具类的原因)

  • 当 poolSize > maximumPoolSize 时,执行RejectedExecutionHandler拒绝策略逻辑

阻塞队列详解

BlockingQueue

  • 数据存放
    • offer:添加元素加到BlockingQueue里,若BlockingQueue放入成功,则为true,否则为false。当然我们也可以通过它的重载方法实现等待一段时间,若还是不能放入,则返回失败。(注意:该方法不阻塞当前执行方法的线程)
    • put:添加元素加到BlockingQueue里,若BlockQueue空间不足,则调用该方法的线程将被阻塞,直到BlockingQueue里面有空间时再继续。
  • 数据获取
    • poll:取走BlockingQueue里排在首位的对象;若不能立即取出,则可以等待一定时间,时间到时取不到则返回null。指定时间单位的重载方法可以表示超过时间,返回失败。
    • take:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,则阻塞,直到有新数据时被唤醒。

相关实现队列

  • ArrayBlockingQueue 基于数组结构的有界阻塞队列,FIFO 排序原则。

  • LinkedBlockingQueue 基于链表的有界阻塞队列,单向链表结构,FIFO 排序原则;吞吐量通常要高于ArrayBlockingQueue;对生产者消费者不阻塞;默认构造创建的队列大小为Integer.MAX_VALUE,故不推荐使用默认构造。

  • SynchronousQueue 无队列容量的阻塞队列,一个 put 必须等待一个 take,反之亦然;吞吐量通常要高于 LinkedBlockingQueue。

  • PriorityBlockingQueue 一个具有优先级的无界阻塞队列,虽队列逻辑上是无界的,但由于资源被耗尽,所以试图执行添加操作可能会失败,出现OOM现象。

  • DelayQueue 一个无界阻塞队列,只有在延迟期满时才能从中提取元素。

ThreadFactory详解

  • 该参数用于设置创建线程的工厂,通过该工厂类创建线程时可以指定一个有意义的线程名字,有助于问题定位及排查。

采用 Executors.defaultThreadFactory() 创建默认的线程工厂,通过以下代码可以发现,当我们通过new Thread包装一个Runnable时,线程的名字设置成了一个带namePrefix前缀的自增名字。通常也业务中,我们需要设置一个更有意义的名字,所以不会采用默认的方式进行创建。

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

拒绝策略详解

RejectedExecutionHandler:当队列和线程池都满时,会触发该handler,此时表明线程池已经处于饱和状态,不能再处理新任务,需要使用者采取一种策略处理提交的新任务。默认策略为AbortPolicy,表示抛出异常方式处理新增的任务。

以下是JDK1.5提供的五种策略: 

  • AbortPolicy:直接抛出异常;
  • CallerRunsPolicy:只用调用者所在线程来运行任务;
  • DiscardOldestPolicy:丢弃旧任务,并执行当前任务;
  • DiscardPolicy:不执行,丢弃任务;
  • 自定义策略,实现RejectedExecutionHandler接口的rejectedExecution方法。

案例

案例1 错误用法,不指定队列大小,采用默认大小

new  ThreadPoolExecutor(5,10,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())

说明:上面是想达到最大并发10,但是却很难触发。 这里有一个误区,一旦设置了new LinkedBlockingQueue(){},不设置大小默认采用Integer.MAX_VALUE,而这样做就会导致队列很难加满,最终可能打爆内存,所以如果要使用尽量指定大小。

案例2 指定阻塞队列大小及设置每个线程的名字,便于控制及排错

ThreadFactory threadFactory = new NamedThreadFactory("BizProcessor-"+bizName);
ThreadPoolExecutor threadPoolExecutor=new  ThreadPoolExecutor(5,10,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10),threadFactory);

//或者采用SynchronousQueue队列
ThreadPoolExecutor threadPoolExecutor=new  ThreadPoolExecutor(5,10,0,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),threadFactory);

说明:通过指定LinkedBlockingQueue队列的大小,或者采用SynchronousQueue队列来保证队列大小可控;同时指定创建的线程有一定的业务含义,便于排错

案例3 重写 CallerRunsPolicy 拒绝策略,当线程池满时打印JVM的堆栈信息,便于事后分析问题

    private final RejectedExecutionHandler handler = new JvmCallerRunsPolicy();
    
 /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class JvmCallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            jstack();
            throw new RejectedExecutionException();
        }
        
        /**
         * 获取堆栈信息,当发生异常时触发
         * @throws Exception
         */
        public void jstack() throws Exception {
            String logPath = "/home/admin/app/logs";
            File file = new File(logPath);
            if (!file.exists()) {
                file.mkdirs();
            }
            FileOutputStream jstackStream = null;
            try {
                jstackStream = new FileOutputStream(new File(logPath, "app_jstack.log"));
                //获取所有堆栈信息
                Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
                Iterator<Map.Entry<Thread, StackTraceElement[]>> ite = map.entrySet().iterator();
                while (ite.hasNext()) {
                    Map.Entry<Thread, StackTraceElement[]> entry = ite.next();
                    StackTraceElement[] elements = entry.getValue();
                    if (elements != null && elements.length > 0) {
                        String threadName = entry.getKey().getName();
                        //记录线程名称
                        jstackStream.write(("Thread Name :[" + threadName + "]\n").getBytes());
                        for (StackTraceElement el : elements) {
                            String stack = el.toString() + "\n";
                            jstackStream.write(stack.getBytes());
                        }
                        jstackStream.write("\n".getBytes());
                    }
                }
            } catch (Exception e) {
                //只是提示错误,不抛异常
                log.error("JVM Collect ERROR:"+e);
            } finally {
                if (jstackStream != null) {
                    try {
                        jstackStream.close();
                    } catch (IOException e) {
                    }
                }
            }
        }

    }
    

使用方式

ThreadPoolExecutor threadPoolExecutor=new  ThreadPoolExecutor(5,10,0,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),threadFactory, handler);

© 著作权归作者所有

共有 人打赏支持
ifree613

ifree613

粉丝 27
博文 26
码字总数 44789
作品 0
成都
高级程序员
InheritableThreadLocal详解

1、简介 在上一篇 ThreadLocal详解 中,我们详细介绍了ThreadLocal原理及设计,从源码层面上分析了ThreadLocal。但由于ThreadLocal设计之初就是为了绑定当前线程,如果希望当前线程的ThreadL...

沈渊 ⋅ 04/12 ⋅ 0

【Java并发专题】27篇文章详细总结Java并发基础知识

努力的意义,就是,在以后的日子里,放眼望去全是自己喜欢的人和事! github:https://github.com/CL0610/Java-concurrency,欢迎题issue和Pull request。所有的文档都是自己亲自码的,如果觉...

你听___ ⋅ 05/06 ⋅ 0

连接池详解,c3p0与dbcp的区别!

连接池: 连接池是创建和管理一个连接的缓冲池的技术,这些连接准备好被任何需要它们的线程使用。这项技术能明显提高对数据库操作的性能。 连接池的好处: (1)对于大多数应用程序,当它们正...

IT_laobai ⋅ 今天 ⋅ 0

Java多线程学习(八)线程池与Executor 框架

Java面试通关手册(Java学习指南,欢迎Star,会一直完善下去,欢迎建议和指导):https://github.com/Snailclimb/JavaGuide 历史优质文章推荐: Java并发编程指南专栏 分布式系统的经典基础理...

snailclimb ⋅ 05/31 ⋅ 0

ThreadLocal可能引起的内存泄露

  threadlocal里面使用了一个存在弱引用的map,当释放掉threadlocal的强引用以后,map里面的value却没有被回收.而这块value永远不会被访问到了. 所以存在着内存泄露. 最好的做法是将调用thr...

天天顺利 ⋅ 06/15 ⋅ 0

5月份值得一看的 Java 技术干货!

5月又即将要离我们远去了,这个月有小长假51劳动节,有54青年节,有513母亲节,更有坑爹的520神马节?!! 废话不说,又到了总结上个月干货的时候了,这个月我们带来了各种Java技术干货,都是...

Java技术栈 ⋅ 05/31 ⋅ 0

JVM学习总结(一)运行时数据区

《深入Java虚拟机》这本书买了有一段时间了,当时看的时候就只是看,并没有边看边总结啥的,最后发现到脑子里面的根本所剩无几了。现在开始要好好归纳总结地再学习一遍。 运行时数据区域 JV...

hensemlee ⋅ 04/22 ⋅ 0

【JVM】 java内存区域与内存溢出异常

前言 此系列博客是读《深入理解java虚拟机》所做的笔记整理。 No1. JVM内存管理这堵墙? 对C和C++的开发人员来说,在内存管理领域,他们既拥有每一个对象的“所有权”,也担负着每一个对象生...

binggetong ⋅ 05/07 ⋅ 0

JVM(Thread/Stack)

JVM Thread/Stack Memory Size JVM Thread/Stack Object states (6 states) Dump OS Thread/Stack OS的线程运行状态 Iuput(top): Output: or input(ps): Dump Thread/Stack Analysis 注意thr......

赵-猛 ⋅ 2016/10/12 ⋅ 0

Java多线程学习(二)synchronized关键字(2)

系列文章传送门: Java多线程学习(一)Java多线程入门 Java多线程学习(二)synchronized关键字(1) java多线程学习(二)synchronized关键字(2) Java多线程学习(三)volatile关键字 Ja...

一只蜗牛呀 ⋅ 04/16 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

IDEA创建SpringMVC+Mybatis+Maven项目

视频如下(加载有点慢请见谅,服务器不太好): 视频

影狼 ⋅ 25分钟前 ⋅ 0

前阿里P8架构师:精准定制Java架构师学习计划!

可以说,Java是现阶段中国互联网公司中,覆盖度最广的研发语言,掌握了Java技术体系,不管在成熟的大公司,快速发展的公司,还是创业阶段的公司,都能有立足之地。 有不少朋友问,除了掌握J...

java高级架构牛人 ⋅ 27分钟前 ⋅ 0

zookeper学习

https://blog.csdn.net/u012152619/article/category/6470028

~少司命~ ⋅ 29分钟前 ⋅ 0

Spring MVC ,JSON,JQuery,不懂JQuery,跳过了

/spring-mvc-study/src/main/webapp/course_json.jsp <%@ page language="java" contentType="text/html; charset=UTF-8"pageEncoding="UTF-8"%><!DOCTYPE html PUBLIC "-//W3C//DTD ......

颖伙虫 ⋅ 29分钟前 ⋅ 0

2018上海云栖大会workshop-日志数据采集与分析对接

摘要: 日志数据采集与分析对接 课程描述 通过日志服务采集用户、数据库、业务等访问数据。演示对于业务日志分析与处理,程序日志查询与监控,打通日志与数据仓库对接案例。 日志种类 网站访...

阿里云云栖社区 ⋅ 30分钟前 ⋅ 0

mahout demo

package com.datamine.CollaborativeFiltering.mysql; import org.apache.mahout.cf.taste.impl.neighborhood.NearestNUserNeighborhood; import org.apache.mahout.cf.taste.impl.recommend......

xiaomin0322 ⋅ 31分钟前 ⋅ 0

red hat openstack 12配置要求

安装 openstack 之前,一般要规划整个系统中,到底要多少台机器来参与openstack, 根据rhosp12的官方文档: 最低要求是3台物理机,1台作为director,一台作为 controller ,一台作为computer....

tututu_jiang ⋅ 32分钟前 ⋅ 0

Rocket-Chip在GitHub上的各个源码

在github上通过搜索Rocket-chip可以得到36个结果:其中 https://github.com/freechipsproject/rocket-chip https://github.com/ucb-bar/riscv-boom https://github.com/ucb-bar/fpga-zynq (......

whoisliang ⋅ 38分钟前 ⋅ 0

【HAVENT原创】CentOS 6.5 下 Nginx 的安装与配置

nginx是轻量级的Web服务器、反向代理服务器及邮件服务器,具有占用内存少,并发能力强的优点,已被广泛应用。本文介绍目前最新版本 1.12.2 的安装。 各版本nginx下载地址:http://nginx.org/...

HAVENT ⋅ 44分钟前 ⋅ 0

查看linux系统重启之前的log -- last_kmsg

当 Linux Kernel 出现 BUG 的时候,后走入 panic flow,这个时候由于 Kernel 出现了严重的问题,adbd 也无法响应 adb 连接请求,这个时候想透过读取 Kernel Log Buffer 来看 Kernel Log 是不...

zyzzu ⋅ 45分钟前 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部