文档章节

JAVA线程池详解

ifree613
 ifree613
发布于 2017/04/06 13:37
字数 2262
阅读 155
收藏 2

JAVA线程池详解

说明:基于JDK 1.7.0.79

为什么要使用线程池而不是显示创建线程

  • 减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题;
  • 没有线程池,则可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

怎么更好的使用线程池

  • 线程池不允许使用 Executors 去创建,虽然使用简单,但是会存在以下问题;

    • FixedThreadPool 和 SingleThreadPool:

      允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。如下:

      FixedThreadPool
      public static ExecutorService newFixedThreadPool(int nThreads) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>());
      }
      SingleThreadPool
      public static ExecutorService newSingleThreadExecutor() {
      return new FinalizableDelegatedExecutorService
          (new ThreadPoolExecutor(1, 1,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>()));
      }
      
      LinkedBlockingQueue
      public LinkedBlockingQueue() {
          this(Integer.MAX_VALUE);
      }

      注意:以上代码可以看出,在进行设置阻塞队列时为new LinkedBlockingQueue,而该队列的默认大小为 Integer.MAX_VALUE。

    • CachedThreadPool 和 ScheduledThreadPool:

      允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。如下:

      CachedThreadPool
      public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());
      }
      ScheduledThreadPool
      public ScheduledThreadPoolExecutor(int corePoolSize) {
          super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
                new DelayedWorkQueue());
      }
      

      注意:以上代码可以看出,在进行 ThreadPoolExecutor 初始化时,maximumPoolSize 值为 Integer.MAX_VALUE 。 

  • 通过 ThreadPoolExecutor 方式创建线程池,让开发人员更加明确线程池的运行规则,规避资源耗尽的风险。

基于ThreadPoolExecutor创建线程池

ThreadPoolExecutor构造

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

参数说明

  • corePoolSize 核心线程大小;

    • 初次创建线程池时默认为 0 ,随着新任务加入持续新增线程,即使存在空闲的线程也会新建线程,只要没有大于 corePoolSize ,那么线程一旦创建就不会被回收;
    • 若设置了 allowCoreThreadTimeOut 参数为true,则表示 corePoolSize 存在空闲线程时,并且时间超过 keepAliveTime 设置时,将关闭线程。默认该参数为true,表示只要没有超过 corePoolSize 设置的大小,就不会被回收。
  • maximumPoolSize 该线程池最大能允许的线程数量,若当前任务大于了阻塞队列的大小时,会被创建使用,空闲时会被回收;

  • keepAliveTime 设置空闲线程的等待时间,超过后会被回收(条件是该线程不是核心线程),默认60秒;

  • unit 时间单位,配合 keepAliveTime 一起使用;

  • workQueue 阻塞并且线程安全的队列,当前的核心线程满后,则会把多余的任务放在阻塞队列中;当线程池中有空闲线程时就回去任务队列中拿任务并处理。

  • threadFactory 创建新线程的方式,可设置的线程名字,建议指定一个有意义的名字,便于排查错误;

  • handler 线程池当达到最大线程数量时所采用的拒绝策略,默认AbortPolicy,其他还有:DiscardPolicy、CallerRunsPolicy、DiscardOldestPolicy

线程数量的判定

定义:poolSize 为当前创建的线程数量

  • 当创建的线程 poolSize < corePoolSize 时,每次的请求,都会新创建一个线程来处理,即使是现在存在空闲的线程,只要没有达到corePoolSize就会一直创建新线程来处理,除非设置了allowCoreThreadTimeOut=true,则空闲达到keepAliveTime空闲时间才会被回收;

  • 当创建的线程 poolSize > corePoolSize 时,当存在新请求的时候,此时,新请求会被加入到BlockingQueue中;

  • 当 BlockingQueue 满了后,同时继续收到新请求,又会重新创建线程,执行任务(此时需要注意该阻塞队列,不要设置得太大,否则maximumPoolSize将失去意义,这个也是为什么不用使用 Executors 工具类的原因)

  • 当 poolSize > maximumPoolSize 时,执行RejectedExecutionHandler拒绝策略逻辑

阻塞队列详解

BlockingQueue

  • 数据存放
    • offer:添加元素加到BlockingQueue里,若BlockingQueue放入成功,则为true,否则为false。当然我们也可以通过它的重载方法实现等待一段时间,若还是不能放入,则返回失败。(注意:该方法不阻塞当前执行方法的线程)
    • put:添加元素加到BlockingQueue里,若BlockQueue空间不足,则调用该方法的线程将被阻塞,直到BlockingQueue里面有空间时再继续。
  • 数据获取
    • poll:取走BlockingQueue里排在首位的对象;若不能立即取出,则可以等待一定时间,时间到时取不到则返回null。指定时间单位的重载方法可以表示超过时间,返回失败。
    • take:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,则阻塞,直到有新数据时被唤醒。

相关实现队列

  • ArrayBlockingQueue 基于数组结构的有界阻塞队列,FIFO 排序原则。

  • LinkedBlockingQueue 基于链表的有界阻塞队列,单向链表结构,FIFO 排序原则;吞吐量通常要高于ArrayBlockingQueue;对生产者消费者不阻塞;默认构造创建的队列大小为Integer.MAX_VALUE,故不推荐使用默认构造。

  • SynchronousQueue 无队列容量的阻塞队列,一个 put 必须等待一个 take,反之亦然;吞吐量通常要高于 LinkedBlockingQueue。

  • PriorityBlockingQueue 一个具有优先级的无界阻塞队列,虽队列逻辑上是无界的,但由于资源被耗尽,所以试图执行添加操作可能会失败,出现OOM现象。

  • DelayQueue 一个无界阻塞队列,只有在延迟期满时才能从中提取元素。

ThreadFactory详解

  • 该参数用于设置创建线程的工厂,通过该工厂类创建线程时可以指定一个有意义的线程名字,有助于问题定位及排查。

采用 Executors.defaultThreadFactory() 创建默认的线程工厂,通过以下代码可以发现,当我们通过new Thread包装一个Runnable时,线程的名字设置成了一个带namePrefix前缀的自增名字。通常也业务中,我们需要设置一个更有意义的名字,所以不会采用默认的方式进行创建。

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

拒绝策略详解

RejectedExecutionHandler:当队列和线程池都满时,会触发该handler,此时表明线程池已经处于饱和状态,不能再处理新任务,需要使用者采取一种策略处理提交的新任务。默认策略为AbortPolicy,表示抛出异常方式处理新增的任务。

以下是JDK1.5提供的五种策略: 

  • AbortPolicy:直接抛出异常;
  • CallerRunsPolicy:只用调用者所在线程来运行任务;
  • DiscardOldestPolicy:丢弃旧任务,并执行当前任务;
  • DiscardPolicy:不执行,丢弃任务;
  • 自定义策略,实现RejectedExecutionHandler接口的rejectedExecution方法。

案例

案例1 错误用法,不指定队列大小,采用默认大小

new  ThreadPoolExecutor(5,10,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())

说明:上面是想达到最大并发10,但是却很难触发。 这里有一个误区,一旦设置了new LinkedBlockingQueue(){},不设置大小默认采用Integer.MAX_VALUE,而这样做就会导致队列很难加满,最终可能打爆内存,所以如果要使用尽量指定大小。

案例2 指定阻塞队列大小及设置每个线程的名字,便于控制及排错

ThreadFactory threadFactory = new NamedThreadFactory("BizProcessor-"+bizName);
ThreadPoolExecutor threadPoolExecutor=new  ThreadPoolExecutor(5,10,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10),threadFactory);

//或者采用SynchronousQueue队列
ThreadPoolExecutor threadPoolExecutor=new  ThreadPoolExecutor(5,10,0,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),threadFactory);

说明:通过指定LinkedBlockingQueue队列的大小,或者采用SynchronousQueue队列来保证队列大小可控;同时指定创建的线程有一定的业务含义,便于排错

案例3 重写 CallerRunsPolicy 拒绝策略,当线程池满时打印JVM的堆栈信息,便于事后分析问题

    private final RejectedExecutionHandler handler = new JvmCallerRunsPolicy();
    
 /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class JvmCallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            jstack();
            throw new RejectedExecutionException();
        }
        
        /**
         * 获取堆栈信息,当发生异常时触发
         * @throws Exception
         */
        public void jstack() throws Exception {
            String logPath = "/home/admin/app/logs";
            File file = new File(logPath);
            if (!file.exists()) {
                file.mkdirs();
            }
            FileOutputStream jstackStream = null;
            try {
                jstackStream = new FileOutputStream(new File(logPath, "app_jstack.log"));
                //获取所有堆栈信息
                Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
                Iterator<Map.Entry<Thread, StackTraceElement[]>> ite = map.entrySet().iterator();
                while (ite.hasNext()) {
                    Map.Entry<Thread, StackTraceElement[]> entry = ite.next();
                    StackTraceElement[] elements = entry.getValue();
                    if (elements != null && elements.length > 0) {
                        String threadName = entry.getKey().getName();
                        //记录线程名称
                        jstackStream.write(("Thread Name :[" + threadName + "]\n").getBytes());
                        for (StackTraceElement el : elements) {
                            String stack = el.toString() + "\n";
                            jstackStream.write(stack.getBytes());
                        }
                        jstackStream.write("\n".getBytes());
                    }
                }
            } catch (Exception e) {
                //只是提示错误,不抛异常
                log.error("JVM Collect ERROR:"+e);
            } finally {
                if (jstackStream != null) {
                    try {
                        jstackStream.close();
                    } catch (IOException e) {
                    }
                }
            }
        }

    }
    

使用方式

ThreadPoolExecutor threadPoolExecutor=new  ThreadPoolExecutor(5,10,0,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),threadFactory, handler);

© 著作权归作者所有

共有 人打赏支持
ifree613

ifree613

粉丝 28
博文 27
码字总数 44789
作品 0
成都
高级程序员
私信 提问
JVM 虚拟机(对象创建,类加载器,执行引擎等),

1.揭开 Java 对象创建的奥秘? 2.class 文件结构详解? 3.详解 Java 类的加载过程? > Java 对象创建,class 文件结构 Java对象模型 。Java对象保存在堆内存中。在内存中,一个Java对象包含三...

desaco
08/29
0
0
一份关于 Java、Kotlin 与 Android 的学习笔记

JavaKotlinAndroidLearn 这是一份关于 Java 、Kotlin 、Android 的学习笔记,既包含对基础知识点的介绍,也包含对一些重要知识点的源码解析,笔记的大纲如下所示: Java 重拾Java(0)-基础知...

叶应是叶
08/08
0
0
Java程序员进化为架构师需要掌握的知识

Java程序员进化为架构师掌握的知识: 一:Java知识 1、进制转换 2、Java基本数据类型 面向对象相关知识 3、类、接口、抽象类 this关键字、static关键字、final关键字 方法的参数传递机制 Ja...

andogo
2014/05/16
1K
2
InheritableThreadLocal详解

1、简介 在上一篇 ThreadLocal详解 中,我们详细介绍了ThreadLocal原理及设计,从源码层面上分析了ThreadLocal。但由于ThreadLocal设计之初就是为了绑定当前线程,如果希望当前线程的ThreadL...

沈渊
04/12
0
0
连接池详解,c3p0与dbcp的区别!

连接池: 连接池是创建和管理一个连接的缓冲池的技术,这些连接准备好被任何需要它们的线程使用。这项技术能明显提高对数据库操作的性能。 连接池的好处: (1)对于大多数应用程序,当它们正...

IT_laobai
06/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

IC-CAD Methodology知识图谱

CAD (Computer Aided Design),计算机辅助设计,指利用计算机及其图形设备帮助设计人员进行设计工作,这个定义同样可以用来近似描述IC公司CAD工程师这个岗位的工作。 早期IC公司的CAD岗位最初...

李艳青1987
7分钟前
0
0
《今日简史:人类命运大议题》的读后感范文3400字

《今日简史:人类命运大议题》的读后感范文3400字: 文:余祥。尤瓦尔.赫拉利,耶路撒冷希伯来大学教授,全球瞩目的新锐历史学家。今年已经拜读其著《人类简史:从动物到上帝》《未来简史:从...

原创小博客
16分钟前
0
0
Eos测试框架EosFactory

EOS Factory包含一个完整的EOS测试框架,可以进行智能合约的开发和测试。由Tokenika于创建于2017年的这个基于Python的EOS测试框架可以轻松地完成智能合约的开发、部署与测试。 如果你希望马上...

汇智网教程
22分钟前
5
0
CompletableFuture get方法一直阻塞或抛出TimeoutException

问题描述 最近刚刚上线的服务突然抛出大量的TimeoutException,查询后发现是使用了CompletableFuture,并且在执行future.get(5, TimeUnit.SECONDS);时抛出了TimeoutException异常,导致接口响...

xiaolyuh
48分钟前
2
0
dubbo 搭建与使用

官网:http://dubbo.apache.org/en-us/ 一,安装监控中心(可以不安装) admin管理控制台,monitor监控中心 下载 bubbo ops 这个是新版的,需要node.js环境,我没有就用老版的了...

小兵胖胖
51分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部