RocketMq之消费者消费流程

原创
08/09 18:29
阅读数 130




探索RocketMq消息中间件的奥秘:深入源码剖析消费者消费流程与存储架构。本文从RocketMq消息存储的精细设计出发,揭秘CommitLog与ConsumerQueue如何携手实现高效存储与快速消费,旨在为读者搭建起通往RocketMq核心原理的桥梁,激发进一步探索的兴趣。

我的疑问

在看源码之前,我对RocketMq中间件的实现方式有以下几个疑问:

  1. 消息的存储架构是如何的?

  2. 消费者是怎么拉取消息的?

  3. RocketMq是怎么做负载均衡策略的?


源码学习


  RocketMq存储架构



RocketMq存储架构中的实现通过两个核心的文件,分别是CommitLog文件和ConsumerQueue文件。CommitLog存储消息主体,消费者发送的消息会顺序的写入到CommitLog文件中,通过顺序写保障了写入的效率。


为了提高消费者消费消息的速度,引入了ConsumerQueue文件,文件位置位于$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每条记录的内容是8字节的CommitLog偏移量,4字节的消息长度以及8字节的Tag hashcode。通过ConsumerQueue,就能在保障写入效率的同时同时提高消费速度,起到了索引的作用。


  消费者如何消费消息


  • 消费者消费流程示意图



上图是消费者消费流程的示意图,核心是负载均衡模块消息处理模块

负载均衡线程会跑一个while循环,轮询当前客户端所有订阅的topic,根据topic进行负载均衡,通过负载均衡定位到要消费的队列以及消费偏移量,存放到pullRequstQueue中。若订阅了同一个topic的消费者没有变化(新加入或退出),负载均衡线程不会放入新的请求到pullRequstQueue中,只有当订阅者出现变化的时候,才会放入新的请求到队列。

消息处理线程也会跑一个while循环,不断的从pullRequstQueue中取拉消息请求,之后向Broker获取到指定队列指定偏移量的消息体,获取到消息后触发回调对消息进行消费。

  • 具体实现

负载均衡线程(RebalanceService)初始化

在消费者(DefaultMQPushConsumerImpl)初始化的过程中,会创建一个单例的MQClientInstance对象,然后调用该类的start方法进行初始化。

public void start() throws MQClientException {    synchronized (this) {        switch (this.serviceState) {            case CREATE_JUST:                this.serviceState = ServiceState.START_FAILED;                // If not specified,looking address from name server                if (null == this.clientConfig.getNamesrvAddr()) {                    this.mQClientAPIImpl.fetchNameServerAddr();                }                // Start request-response channel                this.mQClientAPIImpl.start();                // Start various schedule tasks                this.startScheduledTask();                // Start pull service                this.pullMessageService.start();                // Start rebalance service                this.rebalanceService.start();                // Start push service                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);                log.info("the client factory [{}] start OK", this.clientId);                this.serviceState = ServiceState.RUNNING;                break;

在start方法中,会开启一个负载均衡线程(this.rebalanceService.start())和一个消息处理线程(pullMessageService.start())。


负载均衡线程启动后,会跑一个while循环,每隔一段时间触发负载均衡策略, 更新消费队列。


public void run() {    log.info(this.getServiceName() + " service started");
while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); }
log.info(this.getServiceName() + " service end");}

从mqClientFactory.doRebalance()方法进入到org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance方法。


在该方法中,首先轮询获取当前客户端注册到本地的所有Consumer实例,对每个Consumer实例,调用doRebalance方法进行负载均衡处理。


public void doRebalance() {    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {        MQConsumerInner impl = entry.getValue();        if (impl != null) {            try {                impl.doRebalance();            } catch (Throwable e) {                log.error("doRebalance exception", e);            }        }    }}

从impl.doRebalance()进入到

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance方法。


在该方法内部,首先会轮询获取到所有的订阅信息,针对每个订阅的topic,进行负载均衡处理。


public void doRebalance(final boolean isOrder) {    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();    if (subTable != null) {        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {            final String topic = entry.getKey();            try {                this.rebalanceByTopic(topic, isOrder);            } catch (Throwable e) {                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                    log.warn("rebalanceByTopic Exception", e);                }            }        }    }

从this.rebalanceByTopic(topic, isOrder)方法进入到

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic()


在该方法内部,会获取到当前topic下的所有的队列集合(mqAll)和当前topic当前消费者组的消费者ID集合(cidAll)。


调用方法:

allocateResult = strategy.allocate

(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);

strategy.allocate方法内部会调用allocateAveragely这个负载均衡策略,确定该消费者实例要消费的队列。


private void rebalanceByTopic(final String topic, final boolean isOrder) {    switch (messageModel) {        case CLUSTERING: {            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);            if (null == mqSet) {                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);                }            }
if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); }
if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet);
Collections.sort(mqAll); Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; }
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); }
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; }}

默认是allocateAveragely这个平均分配负载均衡策略,举个例子:

  1. 当前topic下有3个队列,订阅该topic的消费者有3个,那么每个消费者消费一个队列。

  2. 当前topic下有3个队列,订阅该topic的消费者有4个,那么会有一个消费者空闲,不消费。

  3. 当前topic下有3个队列,订阅该topic的消费者有2个,那么会有一个消费者消费其中的两个队列。


public List<T> allocateAveragely(String currentCID, List<String> cidAll, List<T> queueAll) {    List<T> result = new ArrayList<T>();
int index = cidAll.indexOf(currentCID); int mod = queueAll.size() % cidAll.size(); int averageSize = queueAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? queueAll.size() / cidAll.size() + 1 : queueAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, queueAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(queueAll.get((startIndex + i) % queueAll.size())); }
return result;}

负载均衡策略执行后,确定了当前消费者消费的队列集合,接下来核心是构造pullRequest请求。


PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);


构造好pullRequest请求后,调用

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#dispatchPullRequest方法,将请求放入pullRequestQueue中。


消息拉取线程(pullService)初始化


消息拉取线程开启后,会跑一个while循环,不断在队列pullRequestQueue中取数据,解析取出的请求向特定的ConsumeQueue,特定的偏移量拉消息。拉到消息之后通过回调,调用用户编写的消费逻辑。


public void run() {    log.info(this.getServiceName() + " service started");
while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); if (pullRequest != null) { this.pullMessage(pullRequest); } } catch (InterruptedException e) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } }
log.info(this.getServiceName() + " service end");}


从this.pullMessage(pullRequest)方法进入到

org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage


该方法中,会根据request请求中的ConsumerGroup获取到对应的消费者实例,然后调用消费者的impl.pullMessage(pullRequest)方法进行消息拉取。


private void pullMessage(final PullRequest pullRequest) {    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());    if (consumer != null) {        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;        impl.pullMessage(pullRequest);    } else {        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);    }}


从impl.pullMessage(pullRequest)方法进入到

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

在该方法中,会定义一个回调函数,当拉取消息成功后执行。


PullCallback pullCallback = new PullCallback() {    @Override    public void onSuccess(PullResult pullResult) {        if (pullResult != null) {            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,                subscriptionData);
switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispathToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } }
if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); }
break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } }
@Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); }
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); }};

执行消费逻辑的核心:
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest()方法,从该方法进入到
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest()

在该方法中,会将初始化后的ConsumeRequest对象提交给线程池处理,ConsumeRequest实现了Runnable接口。


public void submitConsumeRequest(    final List<MessageExt> msgs,    final ProcessQueue processQueue,    final MessageQueue messageQueue,    final boolean dispatchToConsume) {    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();    if (msgs.size() <= consumeBatchSize) {        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);        try {            this.consumeExecutor.submit(consumeRequest);        } catch (RejectedExecutionException e) {            this.submitConsumeRequestLater(consumeRequest);        }    } else {        for (int total = 0; total < msgs.size(); ) {            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);            for (int i = 0; i < consumeBatchSize; i++, total++) {                if (total < msgs.size()) {                    msgThis.add(msgs.get(total));                } else {                    break;                }            }
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); }
this.submitConsumeRequestLater(consumeRequest); } } }}


进入到

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run方法。在该方法中,会获取开发者定义的MessageListenerConcurrently对象,调用consumeMessage执行消费逻辑。


public void run() {    if (this.processQueue.isDropped()) {        log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);        return;    }
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap<String, String>()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); }
long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs); if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); }


结语


至此,结合源码,对rocketMq消费流程及其数据存储设计进行了一些分析和总结。阅读源码确实能够加深对于中间件的理解,阅读过程中也感受到metaq在消息存储上,消费过程中的安全与高效。消费过程也涉及到了很多同步的操作,时间原因,本文中没有记录,后续有时间会进行深入的总结分析。




¤  拓展阅读  ¤

3DXR技术 |  终端技术 |  音视频技术

服务端技术 | 技术质量 | 数据算法





本文分享自微信公众号 - 大淘宝技术(AlibabaMTT)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部