文档章节

聊聊rocketmq的adjustThreadPoolNumsThreshold

go4it
 go4it
发布于 11/17 11:16
字数 511
阅读 17
收藏 0

【推荐】2019 Java 开发者跳槽指南.pdf(吐血整理) >>>

本文主要研究一下rocketmq的adjustThreadPoolNumsThreshold

DefaultMQPushConsumer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    private final InternalLogger log = ClientLogger.getLog();

    //......

    /**
     * Threshold for dynamic adjustment of the number of thread pool
     */
    private long adjustThreadPoolNumsThreshold = 100000;

    public long getAdjustThreadPoolNumsThreshold() {
        return adjustThreadPoolNumsThreshold;
    }

    public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) {
        this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
    }

    //......
}
  • DefaultMQPushConsumer定义了adjustThreadPoolNumsThreshold属性,默认为100000

DefaultMQPushConsumerImpl

ocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

	//......

    public void adjustThreadPool() {
        long computeAccTotal = this.computeAccumulationTotal();
        long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();

        long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);

        long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);

        if (computeAccTotal >= incThreshold) {
            this.consumeMessageService.incCorePoolSize();
        }

        if (computeAccTotal < decThreshold) {
            this.consumeMessageService.decCorePoolSize();
        }
    }

    private long computeAccumulationTotal() {
        long msgAccTotal = 0;
        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
        Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            ProcessQueue value = next.getValue();
            msgAccTotal += value.getMsgAccCnt();
        }

        return msgAccTotal;
    }

    //......
}
  • adjustThreadPool方法会计算computeAccTotal,然后使用adjustThreadPoolNumsThreshold * 1.0作为incThreshold,使用adjustThreadPoolNumsThreshold * 0.8作为decThreshold;对于computeAccTotal大于等于incThreshold的,执行consumeMessageService.incCorePoolSize();对于computeAccTotal小于decThreshold的执行consumeMessageService.decCorePoolSize()

MQClientInstance

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {
	//......

    public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

    private void startScheduledTask() {

        //......

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }

    public void adjustThreadPool() {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    if (impl instanceof DefaultMQPushConsumerImpl) {
                        DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl) impl;
                        dmq.adjustThreadPool();
                    }
                } catch (Exception e) {
                }
            }
        }
    }

	//......
}
  • MQClientInstance的start方法对于CREATE_JUST状态会执行startScheduledTask()方法,后者会注册一个定时任务,每隔1分钟执行一次adjustThreadPool方法;adjustThreadPool方法则遍历consumerTable的MQConsumerInner,对于DefaultMQPushConsumerImpl类型的MQConsumerInner执行adjustThreadPool方法

小结

DefaultMQPushConsumer定义了adjustThreadPoolNumsThreshold属性,默认为100000;MQClientInstance的start方法对于CREATE_JUST状态会执行startScheduledTask()方法,后者会注册一个定时任务,每隔1分钟执行一次adjustThreadPool方法;adjustThreadPool方法则遍历consumerTable的MQConsumerInner,对于DefaultMQPushConsumerImpl类型的MQConsumerInner执行adjustThreadPool方法

doc

© 著作权归作者所有

go4it
粉丝 89
博文 1179
码字总数 1103596
作品 0
深圳
私信 提问
聊聊rocketmq的RemotingException

序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/RemotingException.java 继承自checked exception,底下有RemotingCommandExce......

go4it
2018/08/08
288
0
聊聊rocketmq的NettyEncoder及NettyDecoder

序 本文主要研究一下rocketmq的NettyEncoder及NettyDecoder NettyEncoder org/apache/rocketmq/remoting/netty/NettyEncoder.java 这里继承MessageToByteEncoder,类型是RemotingCommand,先......

go4it
2018/08/07
25
0
聊聊rocketmq的ProducerImpl

序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java 发送消息的方法主要是代理给rocketmqProducer 另外调用OMSUtil.msgConver......

go4it
2018/07/28
33
0
聊聊rocketmq的NettyClientConfig

序 本文主要研究一下rocketmq的NettyClientConfig NettyClientConfig org/apache/rocketmq/remoting/netty/NettyClientConfig.java 这里主要有几个参数: clientWorkerThreads,默认为4 cli......

go4it
2018/08/04
192
0
聊聊rocketmq的SequenceProducerImpl

序 本文主要研究一下rocketmq的SequenceProducerImpl SequenceProducerImpl io/openmessaging/rocketmq/producer/SequenceProducerImpl.java 采用的是LinkedBlockingQueue,send方法实际调用......

go4it
2018/07/30
15
0

没有更多内容

加载失败,请刷新页面

加载更多

阿里巴巴的 Kubernetes 应用管理实践经验与教训

作者 | 孙健波(天元) 阿里巴巴技术专家 导读:本文整理自孙健波在 ArchSummit 大会 2019 北京站演讲稿记录。首先介绍了阿里巴巴基于 Kubernetes 项目进行大规模应用实践过程中遇到的问题;...

阿里巴巴云原生
10分钟前
2
0
pinpoint采样原理分析

使用pinpoint进行全链路监控时,支持对请求的采样,某条请求是否被采样,取决于整个链路开始的机器。该机器使用特定的采样算法。采样的标志会一直在链路中透传。比如在http里面,会在header里...

xiaomin0322
14分钟前
2
0
在IDEA开发工具中使用lombok

1. 首先我们需要安装IntelliJ IDEA中的lombok插件,打开IntelliJ IDEA后点击菜单栏中的File-->Settings,或者使用快捷键Ctrl+Alt+S进入到设置页面 我们点击设置中的Plugins进行插件的安装,在...

欧阳飘
16分钟前
2
0
爱码仕 5G生活畅想 (五) 每个人每个家庭都有一朵私有的云

30年前,微软让每个家庭都有一台电脑的理念成为了现实;而今云计算的观念已为老百姓们所熟识。数据就是能源;数据就是财富;谁生产了数据,这数据的所有权就归谁所有。随着原生云基础设施的完...

LitStone
17分钟前
3
0
嵌入式入门:嵌入式领域的职业发展方向是什么?

嵌入式入门:嵌入式领域的职业发展方向是什么? 在如今的IT市场上看,嵌入式的发展的应用都是广受欢迎的,在嵌入式入门学习中,我们可以发现嵌入式的发展方向有很多,门槛高低不一样。下面就...

xyd118
18分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部