文档章节

源码分析Dubbo网络通讯篇之NettyServer网络事件之线程池

中间件兴趣圈
 中间件兴趣圈
发布于 03/19 21:29
字数 2018
阅读 2.3K
收藏 2

本文主要分析Dubbo线程池的构建过程,主要介绍官方文档中有关于ThreadPool的种类:

  • fixed 固定大小线程池,启动时建立线程,不关闭,一致持有。(缺省)
  • cached :缓存线程池,空闲一分钟,线程会消费,需要时重新创建新线程。
  • limited :可伸缩线程池,但池中的线程数只会增长不会收缩。
  • eager :优先使用线程来执行新提交任务。(渴望立即执行,而不是进入队列排队执行)。配置标签:< dubbo:protocol threadpool = "fixed" ../>

各种类型的线程池,内部就是根据规则创建不同的ThreadPoolExecutor对象,那我们先简单回顾一下线程池的基本知识,其构造方法如下所示:

 public ThreadPoolExecutor(
         int corePoolSize, // 线程池核心线程数、常驻线程数。
         int maximumPoolSize,  // 线程池中最大线程数量
        long keepAliveTime, // 线程保持活跃时间,(如果线程创建,并空闲
                 //指定值后,线程会被回收,0表示不开启该特性,其范围针对   // corePoolSize的线程)
        TimeUnit unit,  // keepAliveTime的时间单位。
        BlockingQueue&lt; Runnable&gt; workQueue,// 任务队列
        ThreadFactory threadFactory, // 线程工厂类,一般通过该线程工厂,为线程命名,以便区分线程。
        RejectedExecutionHandler handler) // 拒绝策略。

提交任务流程(线程创建流程)

  1. 如果线程池中线程数量小于corePoolSize,则创建一个线程来执行该任务。
  2. 如果线程池中的线程大于等于corePoolSize,则尝试将任务放入队列中。
  3. 如果成功将任务放入队列,则本次提交任务正常结束,如果放入任务队列失败则继续下一步。
  4. 如果线程池中的线程数量小于最大线程数,则创建先的线程,否则执行拒绝策略。 更多有关线程池,请参考作者的另外一篇博文:源码分析JDK线程池

1、fixed 固定大小线程池

public class FixedThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<runnable>() :
                        (queues &lt; 0 ? new LinkedBlockingQueue<runnable>()
                                : new LinkedBlockingQueue<runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

实现要点:

  1. 首先获取可配置参数threadname、threads、queues三个参数,分别代表线程池中线程名前缀、线程中最大线程数量、任务队列长度。
  2. 要实现fixed固定大小线程池,故名思议,就是线程池自创建以来,线程数量始终保持一致。其实现要点是,corePoolSize、maximumPoolSize相等,并且其值等于threads(默认200),并且keepAliveTime=0,表示线程始终活跃。
  3. 任务队列,如果queues 为0,则使用SynchronousQueue,如果小于0,则使用无界队列,如果大于0,则创建容量为LinkedBlockingQueue的队列,超过容量,则拒绝入队。
  4. 线程工厂,NamedThreadFactory,主要设置线程名称,默认为Dubbo-thread-序号。
  5. 拒绝策略AbortPolicyWithReport,其主要是如果拒绝任务,首先会打印出详细日志,包含线程池的核心参数,并且会dump jstack 日志,日志文件默认存储在 user.home/Dubbo_JStack.log.timestamp,可以通过 dump.directory 属性配置,可通过< dubbo:protocol> < dubbo:parameter key =“” value = ""/> < /dubbo:protocol>。 >这里再简单介绍如果队列长度为0(默认),为什么是选用SynchronousQueue队列。 SynchronousQueue的一个简单理解:调用offer、put之前,必须先调用take,也就是先调用take方法的线程阻塞,然后当别的线程调用offer之后,调用take的线程被唤醒,如果没有线程调用take方法,一个线程调用offer方法,则会返回false,并不会将元素添加到SynchronousQueue队列中,因为SynchronousQueue内部的队列长度为0。 与该线程池相关的配置属性:threadname、theadpool、threads、queues。

2、cached 缓存线程池,线程空闲后会被回收

public class CachedThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<runnable>() :
                        (queues &lt; 0 ? new LinkedBlockingQueue<runnable>()
                                : new LinkedBlockingQueue<runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

实现要点:既然要实现线程可以被回收,则必然要设置 keepAliveTime。 故对应线程池核心参数设置,对应如下:

  • corePoolSize:通过参数corethreads设置,默认为0
  • maximumPoolSize:通过参数threads设置,默认200
  • keepAliveTime:通过参数alive设置,默认为60 * 1000
  • workQueue :通过queues参数设置,默认为0
  • 其他与fixed相同,则不重复介绍

3、limited 可伸缩线程池,其特征:线程数只增不减

public class LimitedThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<runnable>() :
                        (queues &lt; 0 ? new LinkedBlockingQueue<runnable>()
                                : new LinkedBlockingQueue<runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

就不回收,与cached不同的就是 keepAliveTime 的取值不同,limited 取值为:Long.MAX_VALUE,其他与 cached 相同。

4、eager

其核心实现主要由 TaskQueue、EagerThreadPoolExecutor 共同完成。 首先,我们关注一下 TaskQueued 的 offer方法。

public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }

        int currentPoolThreadSize = executor.getPoolSize();     // [@1](https://my.oschina.net/u/1198)
        // have free worker. put task into queue to let the worker deal with task.
        if (executor.getSubmittedTaskCount() &lt; currentPoolThreadSize) {   // @2
            return super.offer(runnable);
        }

        // return false to let executor create new worker.
        if (currentPoolThreadSize &lt; executor.getMaximumPoolSize()) {    // [@3](https://my.oschina.net/u/2648711)
            return false;
        }

        // currentPoolThreadSize &gt;= max     // @4
        return super.offer(runnable); 
    }

代码@1:获取当前线程池中线程的数量。

代码@2:如果当前已提交到线程池中的任务数量小于当前存在在的线程数,则走默认的提交流程。

代码@3:如果当前已提交到线程中的数量大于当前的线程池,并线程池中数量并未达到线程池允许创建的最大线程数时,则返回false,并不入队,其效果是会创建新的线程来执行。

代码@4:如果当前线程池中的线程已达到允许创建的最大线程数后,走默认的提交任务逻辑。 EagerThreadPoolExecutor#execute

public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        submittedTaskCount.incrementAndGet();       // @1 
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.");
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();   // @2
        }
    }

其核心实现逻辑:如果提交任务失败,则再走一次默认的任务提交流程。 最总后结一下Eager的核心特性。

public class EagerThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

        // init queue and executor
        TaskQueue<runnable> taskQueue = new TaskQueue<runnable>(queues &lt;= 0 ? 1 : queues);
        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                threads,
                alive,
                TimeUnit.MILLISECONDS,
                taskQueue,
                new NamedThreadFactory(name, true),
                new AbortPolicyWithReport(name, url));
        taskQueue.setExecutor(executor);
        return executor;
    }
}

其核心特性如下:

  1. 首先,其配置参数与cached类型的线程池相同,说明eager也是基于缓存的。
  2. eager与cached类型线程池不同的一点是,提交任务后,线程优先于队列,默认的提交流程是如果线程数达到核心线程数后,新提交的任务是首先进入队列,但eager是优先创建线程来执行,这有点与公平锁,非公平锁一样的概念了。

>作者介绍:丁威,《RocketMQ技术内幕》作者,RocketMQ 社区优秀布道师、CSDN2019博客之星TOP10,维护公众号:中间件兴趣圈目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。可以点击链接加入中间件知识星球 ,一起探讨高并发、分布式服务架构,交流源码。 在这里插入图片描述</runnable></runnable></runnable></runnable></runnable></runnable></runnable></runnable></runnable></runnable></runnable>

© 著作权归作者所有

中间件兴趣圈

中间件兴趣圈

粉丝 50
博文 47
码字总数 128914
作品 0
青浦
私信 提问
加载中

评论(0)

源码分析Dubbo服务提供者启动流程-下篇

本文继续上文Dubbo服务提供者启动流程,在上篇文章中详细梳理了基于dubbo spring文件的配置方式,Dubbo是如何加载配置文件,服务提供者dubbo:service标签服务暴露全流程,本节重点关注Regis...

丁威
2019/10/21
0
0
Dubbo线程模型与线程池策略

一、Dubbo的线程模型概述 Dubbo 默认的底层网络通讯使用的是 Netty ,服务提供方 NettyServer 使用两级线程池,其中 EventLoopGroup(boss) 主要用来接受客户端的链接请求,并把接受的请求分发...

阿里加多
2019/12/17
0
0
buddo源码分析-transport组件之Netty(一)

dubbo 2.5.10 版本,netty仍然使用的是netty的3.10.5版本,我们从下面的代码可以看出,SPI默认使用的是“netty”,而不是“netty4”。 package com.alibaba.dubbo.remoting; import com.ali...

FrankYou
2019/08/27
0
0
dubbo+zk通信数据时的注意事项

config,配置层,对外配置接口,以ServiceConfig, ReferenceConfig为中心,可以直接new配置类,也可以通过spring解析配置生成配置类 proxy,服务代理层,服务接口透明代理,生成服务的客户端...

语落心生
2019/07/18
0
0
dubbo请求处理线程模型实现分析

问题的由来: 如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识, 则直接在 IO 线程上处理更快,因为减少了线程池调度。 但如果事件处理逻辑较慢,或者需...

wannshan
2018/03/30
267
0

没有更多内容

加载失败,请刷新页面

加载更多

基于 rsync 和 ln 实现“写时复制”的快照备份功能

一、基本原理 这里“写时复制”加了一个引号,因为这是专门针对使用rsync备份时的写时复制效果,而不是事实上的写时复制(copy-on-write),其达到的目的如下: 使用 rsync 备份数据后,立即...

Inpool
22分钟前
17
0
郑州哪哪里可以开工程款发票-郑州_新闻网

【电薇同步;1.3.8 - 2.7.4.1 - 5.2.9.7.】张生、诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bridge,是Android手机通用...

yyqqvip
今天
30
0
Nginx 反向代理访问

在Nginx 配置 server { listen 80; server_name www.xiaocx.org www.xiaocx.org www.xiaocx.org; root /Users/maison/work/xiaocx/dist; index i......

韩庚庚
今天
33
0
python笔记:环境变量已设置CMD中一直报错"python"不是内部命令,也不是可运行的程序或批处理文件

这些天虽然也写了几个小工具,但是打包都是在anaconda prompt中完成的,因为CMD中一直报错"python"不是内部命令,也不是可运行的程序或批处理文件,各种查度,千篇一律的是环境变量配置的问题...

小玲_001
今天
13
0
AI+BI服务模式

术语与缩写解释 缩写、术语 解 释 BI 商业智能(Business Intelligence,简称:BI),又称商业智慧或商务智能,指用现代数据仓库技术、线上分析处理技术、数据挖掘和数据展现技术进行数据分析...

zoegu228
今天
28
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部