文档章节

聊聊rocketmq的consumeConcurrentlyMaxSpan

go4it
 go4it
发布于 2019/11/18 22:46
字数 765
阅读 50
收藏 0

本文主要研究一下rocketmq的consumeConcurrentlyMaxSpan

consumeConcurrentlyMaxSpan

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    private final InternalLogger log = ClientLogger.getLog();

    //......

    /**
     * Concurrently max span offset.it has no effect on sequential consumption
     */
    private int consumeConcurrentlyMaxSpan = 2000;

    public int getConsumeConcurrentlyMaxSpan() {
        return consumeConcurrentlyMaxSpan;
    }

    public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) {
        this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
    }

    //......
}
  • DefaultMQPushConsumer定义了consumeConcurrentlyMaxSpan属性,默认值为2000

checkConfig

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
	//......

    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());

        //......

        // consumeConcurrentlyMaxSpan
        if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
            || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
            throw new MQClientException(
                "consumeConcurrentlyMaxSpan Out of range [1, 65535]"
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                null);
        }

        //......
    }

	//......
}
  • checkConfig方法要求defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()大于等于1且小于等于65535

pullMessage

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    /**
     * Delay some time when exception occur
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
    /**
     * Flow control interval
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;

	//......

    public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) {
            log.info("the pull request[{}] is dropped.", pullRequest.toString());
            return;
        }

        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

        //......

        if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                }
                return;
            }
        } else {
            if (processQueue.isLocked()) {
                if (!pullRequest.isLockedFirst()) {
                    final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                    boolean brokerBusy = offset < pullRequest.getNextOffset();
                    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                        pullRequest, offset, brokerBusy);
                    if (brokerBusy) {
                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                            pullRequest, offset);
                    }

                    pullRequest.setLockedFirst(true);
                    pullRequest.setNextOffset(offset);
                }
            } else {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                log.info("pull message later because not locked in broker, {}", pullRequest);
                return;
            }
        }

        //......        

    }

    private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
    }

	//......
}	
  • pullMessage方法在不是consumeOrderly的时候,会判断processQueue.getMaxSpan()是否大于this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan(),大于则执行executePullRequestLater方法进行流控,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL默认值为50

ProcessQueue

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java

public class ProcessQueue {
    public final static long REBALANCE_LOCK_MAX_LIVE_TIME =
        Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
    public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
    private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
    private final InternalLogger log = ClientLogger.getLog();
    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

	//......

    public long getMaxSpan() {
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getMaxSpan exception", e);
        }

        return 0;
    }

    //......
}
  • ProcessQueue的getMaxSpan取的是msgTreeMap.lastKey() - this.msgTreeMap.firstKey()

PullMessageService

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/PullMessageService.java

public class PullMessageService extends ServiceThread {
    private final InternalLogger log = ClientLogger.getLog();
    private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
    private final MQClientInstance mQClientFactory;
    private final ScheduledExecutorService scheduledExecutorService = Executors
        .newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "PullMessageServiceScheduledThread");
            }
        });

    public PullMessageService(MQClientInstance mQClientFactory) {
        this.mQClientFactory = mQClientFactory;
    }

    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        if (!isStopped()) {
            this.scheduledExecutorService.schedule(new Runnable() {
                @Override
                public void run() {
                    PullMessageService.this.executePullRequestImmediately(pullRequest);
                }
            }, timeDelay, TimeUnit.MILLISECONDS);
        } else {
            log.warn("PullMessageServiceScheduledThread has shutdown");
        }
    }

    public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

    private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }

    //......
}
  • executePullRequestLater方法往scheduledExecutorService调度一个延时任务,该任务执行的是executePullRequestImmediately(pullRequest)方法,该方法往pullRequestQueue队列放入pullRequest;run方法会从pullRequestQueue取pullRequest,然后执行pullMessage方法;pullMessage方法首先通过mQClientFactory.selectConsumer取出consumer,然后执行该consumer的pullMessage方法

小结

DefaultMQPushConsumer定义了consumeConcurrentlyMaxSpan属性,默认值为2000;DefaultMQPushConsumerImpl的pullMessage方法在不是consumeOrderly的时候,会判断processQueue.getMaxSpan()是否大于this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan(),大于则执行executePullRequestLater方法进行流控,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL默认值为50

doc

© 著作权归作者所有

go4it
粉丝 91
博文 1247
码字总数 1165263
作品 0
深圳
私信 提问
加载中

评论(0)

聊聊rocketmq的RemotingException

序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/RemotingException.java 继承自checked exception,底下有RemotingCommandExce......

go4it
2018/08/08
426
0
RocketMQ原理(4)——消息ACK机制及消费进度管理

https://zhuanlan.zhihu.com/p/25140744 中剖析过,consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的(RocketMQ有保证消息肯...

xiaomin0322
2018/04/14
388
0
聊聊rocketmq的NettyEncoder及NettyDecoder

序 本文主要研究一下rocketmq的NettyEncoder及NettyDecoder NettyEncoder org/apache/rocketmq/remoting/netty/NettyEncoder.java 这里继承MessageToByteEncoder,类型是RemotingCommand,先......

go4it
2018/08/07
35
0
聊聊rocketmq的ProducerImpl

序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java 发送消息的方法主要是代理给rocketmqProducer 另外调用OMSUtil.msgConver......

go4it
2018/07/28
39
0
聊聊rocketmq的NettyClientConfig

序 本文主要研究一下rocketmq的NettyClientConfig NettyClientConfig org/apache/rocketmq/remoting/netty/NettyClientConfig.java 这里主要有几个参数: clientWorkerThreads,默认为4 cli......

go4it
2018/08/04
221
0

没有更多内容

加载失败,请刷新页面

加载更多

00-Java 面试准备

面试之前 面试前准备简历需要注意的几个方面: 写简历、改简历,这个一定要干的。简历有两个作用,一个是吸引别人,能让别人邀请你去面试,这是前提;另一个是引导面试的人,让面试的人问你所...

源程序
今天
54
0
OSChina 周二乱弹 —— 大王(@罗马的王)颜值制霸Osc社区

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @巴拉迪维 :Lunik的单曲《Seeing You Soar》 I hope you’re smiling,When seeing me soar. #今日歌曲推荐# 《Seeing You Soar》- Lunik 手...

小小编辑
今天
75
0
wordcount代码

1.写出map类 public class WCMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ @Override protected void map(LongWritable key,Text value,Context context)throws IOExcepti......

七宝1
今天
59
0
Spring Batch 小任务(Tasklet)步骤

Chunk-Oriented Processing不是处理 step 的唯一方法。 考虑下面的一个场景,如果你仅仅需要调用一个存储过程,你可以在 ItemReader 中实现这个调用,然后在存储过程完成调用后返回 null。这...

honeymoose
今天
67
0
Linux日志分析

1. Linux日志文件的类型 2. 系统服务日志 2.1 syslogd的简介 2.2 syslogd的配置和使用 2.3 日志的安全性设置 2.4 远程日志记录服务 3. 日志的轮替 3.1 logrotate简介 3.2 logrotate的配置 3....

JiaMing
昨天
67
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部