文档章节

RocketMQ为什么要保证订阅关系的一致性?

后端进阶
 后端进阶
发布于 07/30 13:45
字数 2895
阅读 2876
收藏 44

微信公众号「后端进阶」,专注后端技术分享:Java、Golang、WEB框架、分布式中间件、服务治理等等。

前段时间有个朋友向我提了一个问题,他说在搭建 RocketMQ 集群过程中遇到了关于消费订阅的问题,具体问题如下:

然后他发了报错的日志给我看:

the consumer's subscription not exist

我第一时间在源码里找到了报错的位置:

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest:

subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
  log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
  response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
  response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
  return response;
}

此处源码是将该 Topic 的订阅信息找出来,然而这里却没找到,所以报了消费订阅不存在的错误。

朋友还跟我讲了他的消费集群中,每个消费者订阅了自己的 Topic,他的消费组中 有 c1 和 c2 消费者,c1 订阅了 topicA,而 c2 订阅了 topicB。

这时我已经知道什么原因了,我先说一下消费者的订阅信息在 broker 中是以 group 来分组的,数据结构如下:

org.apache.rocketmq.broker.client.ConsumerManager:

private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
  new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);

这意味着集群中的每个消费者在向 broker 注册订阅信息的时候相互覆盖掉对方的订阅信息了,这也是为什么同一个消费组应该拥有完全一样的订阅关系的原因,而朋友在同一个消费组的每个消费者订阅关系都不一样,就出现了订阅信息相互覆盖的问题。

可是朋友这时又有疑惑了,他觉得每个消费者订阅自己的主题,貌似没问题啊,逻辑上也行的通,他不明白为什么 RocketMQ 不允许这样做,于是秉承着老司机的职业素养,下面我会从源码的角度深度分析 RocketMQ 消费订阅注册,消息拉取,消息队列负载与重新分布机制,让大家彻底弄清 RocketMQ 消费订阅机制。

消费者订阅信息注册

消费者在启动时会向所有 broker 注册订阅信息,并启动心跳机制,定时更新订阅信息,每个消费者都有一个 MQClientInstance,消费者启动时会启动这个类,启动方法中会启动一些列定时任务,其中:

org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  @Override
  public void run() {
    try {
      MQClientInstance.this.cleanOfflineBroker();
      MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
    } catch (Exception e) {
      log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
    }
  }
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

上面是向集群内所有 broker 发送订阅心跳信息的定时任务,源码继续跟进去,发现会给集群中的每个 broker 都发送自己的 HeartbeatData,HeartbeatData 即是每个客户端的心跳数据,它包含了如下数据:

// 客户端ID
private String clientID;
// 生产者信息
private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
// 消费者信息
private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();

其中消费者信息包含了客户端订阅的主题信息。

我们继续看看 broker 如何处理 HeartbeatData 数据,客户端发送 HeartbeatData 时的请求类型为 HEART_BEAT,我们直接找到 broker 处理 HEART_BEAT 请求类型的逻辑:

org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat:

public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
  RemotingCommand response = RemotingCommand.createResponseCommand(null);
  // 解码,获取 HeartbeatData
  HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
  ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
    ctx.channel(),
    heartbeatData.getClientID(),
    request.getLanguage(),
    request.getVersion()
  );

  // 循环注册消费者订阅信息
  for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
    // 按消费组获取订阅配置信息
    SubscriptionGroupConfig subscriptionGroupConfig =
      this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
      data.getGroupName());
    boolean isNotifyConsumerIdsChangedEnable = true;
    if (null != subscriptionGroupConfig) {
      isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
      int topicSysFlag = 0;
      if (data.isUnitMode()) {
        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
      }
      String newTopic = MixAll.getRetryTopic(data.getGroupName());
      this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
        newTopic,
        subscriptionGroupConfig.getRetryQueueNums(),
        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
    }

    // 注册消费者订阅信息
    boolean changed = this.brokerController.getConsumerManager().registerConsumer(
      data.getGroupName(),
      clientChannelInfo,
      data.getConsumeType(),
      data.getMessageModel(),
      data.getConsumeFromWhere(),
      data.getSubscriptionDataSet(),
      isNotifyConsumerIdsChangedEnable
    );
    // ...
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
  }

在这里我们可以看到,broker 收到 HEART_BEAT 请求后,将请求数据解压获取 HeartbeatData,根据 HeartbeatData 里面的消费订阅信息,循环进行注册:

org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer:

public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
                                ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
                                final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {

  // 获取消费组内的消费者信息
  ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
  // 如果消费组的消费者信息为空,则新建一个
  if (null == consumerGroupInfo) {
    ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
    ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
    consumerGroupInfo = prev != null ? prev : tmp;
  }

  boolean r1 =
    consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                                    consumeFromWhere);
  // 更新订阅信息,订阅信息是按照消费组存放的,因此这步骤就会导致同一个消费组内的各个消费者客户端的订阅信息相互被覆盖
  boolean r2 = consumerGroupInfo.updateSubscription(subList);

  if (r1 || r2) {
    if (isNotifyConsumerIdsChangedEnable) {
      this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
    }
  }

  this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

  return r1 || r2;
}

这步骤是 broker 更新消费者订阅信息的核心方法,如果消费组的消费者信息 ConsumerGroupInfo 为空,则新建一个,从名字可知道,订阅信息是按照消费组进行存放的,因此在更新订阅信息时,订阅信息是按照消费组存放的,这步骤就会导致同一个消费组内的各个消费者客户端的订阅信息相互被覆盖

消息拉取

在 MQClientInstance 启动时,会启动一条线程来处理消息拉取任务:

org.apache.rocketmq.client.impl.factory.MQClientInstance#start:

// Start pull service
this.pullMessageService.start();

pullMessageService 继承了 ServiceThread,而 ServiceThread 实现了 Runnable 接口,它的 run 方法实现如下:

org.apache.rocketmq.client.impl.consumer.PullMessageService#run:

@Override
public void run() {
  while (!this.isStopped()) {
    try {
      // 从 pullRequestQueue 中获取拉取消息请求对象
      PullRequest pullRequest = this.pullRequestQueue.take();
      // 执行消息拉取
      this.pullMessage(pullRequest);
    } catch (InterruptedException ignored) {
    } catch (Exception e) {
      log.error("Pull Message Service Run Method exception", e);
    }
  }
}

消费端拿到 PullRequest 对象进行拉取消息,pullRequestQueue 是一个阻塞队列,如果 pullRequest 数据为空,执行 take() 方法会一直阻塞,直到有新的 pullRequest 拉取任务进来,这里是一个很关键的步骤,你可能会想,pullRequest 什么时候被创建然后放入 pullRequestQueue?pullRequest 它是在RebalanceImpl 中创建,它是 RocketMQ 消息队列负载与重新分布机制的实现

消息队列负载与重新分布

从上面消息拉取源码分析可知,pullMessageService 启动时由于 pullRequestQueue 中没有 pullRequest 对象,会一直阻塞,而在 MQClientInstance 启动时,同样会启动一条线程来处理消息队列负载与重新分布任务:

org.apache.rocketmq.client.impl.factory.MQClientInstance#start:

// Start rebalance service
this.rebalanceService.start();

rebalanceService 同样继承了 ServiceThread,它的 run 方法如下:

@Override
public void run() {
  while (!this.isStopped()) {
    this.waitForRunning(waitInterval);
    this.mqClientFactory.doRebalance();
  }
}

继续跟进去:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance:

public void doRebalance(final boolean isOrder) {
  // 获取消费者所有订阅信息
  Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  if (subTable != null) {
    for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
      final String topic = entry.getKey();
      try {
        // 消息队列负载与重新分布
        this.rebalanceByTopic(topic, isOrder);
      } catch (Throwable e) {
        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
          log.warn("rebalanceByTopic Exception", e);
        }
      }
    }
  }
  this.truncateMessageQueueNotMyTopic();
}

这里主要是获取客户端订阅的主题,并根据主题进行消息队列负载与重新分布,subTable 存储了消费者的订阅信息,消费者进行消息订阅时会填充到里面,我们接着往下:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

rebalanceByTopic 方法是实现 Consumer 端负载均衡的核心,我们这里以集群模式的消息队列负载与重新分布,首先从 topicSubscribeInfoTable 中获取订阅主题的队列信息,接着随机从集群中的一个 broker 中获取消费组内某个 topic 的订阅客户端 ID 列表,这里需要注意的是,为什么从集群内任意一个 broker 就可以获取订阅客户端信息呢?前面的分析也说了,消费者客户端启动时会启动一个线程,向所有 broker 发送心跳包。

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

// 如果 主题订阅信息mqSet和主题订阅客户端不为空,就执行消息队列负载与重新分布
if (mqSet != null && cidAll != null) {
  List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
  mqAll.addAll(mqSet);

  // 排序,确保每个消息队列只分配一个消费者
  Collections.sort(mqAll);
  Collections.sort(cidAll);

  // 消息队列分配算法
  AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

  // 执行算法,并得到队列重新分配后的结果对象allocateResult
  List<MessageQueue> allocateResult = null;
  try {
    allocateResult = strategy.allocate(
      this.consumerGroup,
      this.mQClientFactory.getClientId(),
      mqAll,
      cidAll);
  } catch (Throwable e) {
    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
              e);
    return;
  }
  // ...
}

以上是消息负载均衡的核心逻辑,RocketMQ 本身提供了 5 种负载算法,默认使用 AllocateMessageQueueAveragely 平均分配算法,它分配算法特点如下:

假设有消费组 g1,有消费者 c1 和 c2,c1 订阅了 topicA,c2 订阅了 topicB,集群内有 broker1 和broker2,假设 topicA 有 8 个消息队列,broker_a(q0/q1/q2/q3) 和 broker_b(q0/q1/q2/q3),前面我们知道 findConsumerIdList 方法会获取消费组内所有消费者客户端 ID,topicA 经过平均分配算法进行分配之后的消费情况如下:

c1:broker_a(q0/q1/q2/q3)

c2:broker_b(q0/q1/q2/q3)

问题就出现在这里,c2 根本没有订阅 topicA,但根据分配算法,却要加上 c2 进行分配,这样就会导致这种情况有一半的消息被分配到 c2 进行消费,被分配到 c2 的消息队列会延迟十几秒甚至更久才会被消费,topicB 同理

下面我用图表示 topicA 和 topicB 经过 rebalance 之后的消费情况:

至于为什么会报 the consumer's subscription not exist,我们继续往下撸:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

if (mqSet != null && cidAll != null) {
  // ...
  Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
  if (allocateResult != null) {
    allocateResultSet.addAll(allocateResult);
  }
  // 用户重新分配后的结果allocateResult来更新当前消费者负载的消息队列缓存表processQueueTable,并生成 pullRequestList 放入 pullRequestQueue 阻塞队列中
  boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
  if (changed) {
    log.info(
      "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
      strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
      allocateResultSet.size(), allocateResultSet);
    this.messageQueueChanged(topic, mqSet, allocateResultSet);
  }
}

以上代码逻辑主要是拿 mqSet 和 cidAll 进行消息队列负载与重新分布,得到结果 allocateResult,它是一个 MessageQueue 列表,接着用 allocateResult 更新消费者负载的消息队列缓存表 processQueueTable,生成 pullRequestList 放入 pullRequestQueue 阻塞队列中:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// 循环执行,将mqSet订阅数据封装成PullRequest对象,并添加到pullRequestList中
for (MessageQueue mq : mqSet) {
  // 如果缓存列表不存在该订阅信息,说明这次消息队列重新分配后新增加的消息队列
  if (!this.processQueueTable.containsKey(mq)) {
    if (isOrder && !this.lock(mq)) {
      log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
      continue;
    }
    this.removeDirtyOffset(mq);
    ProcessQueue pq = new ProcessQueue();
    long nextOffset = this.computePullFromWhere(mq);
    if (nextOffset >= 0) {
      ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
      if (pre != null) {
        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
      } else {
        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
        PullRequest pullRequest = new PullRequest();
        pullRequest.setConsumerGroup(consumerGroup);
        pullRequest.setNextOffset(nextOffset);
        pullRequest.setMessageQueue(mq);
        pullRequest.setProcessQueue(pq);
        pullRequestList.add(pullRequest);
        changed = true;
      }
    } else {
      log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
    }
  }
}
// 将pullRequestList添加到PullMessageService中的pullRequestQueue阻塞队列中,以唤醒PullMessageService线程执行消息拉取
this.dispatchPullRequest(pullRequestList);

前面我们讲到消息拉取是从 pullRequestQueue 阻塞队列中拿 pullRequest 执行拉取的,以上方法就是创建 pullRequest 的地方。

源码分析到这里,就可以弄清楚为什么会报 the consumer's subscription not exist 这个错误了:

假设有消费者组 g1,g1下有消费者 c1 和消费者 c2,c1 订阅了 topicA,c2 订阅了 topicB,此时c2 先启动,将 g1 的订阅信息更新为 topicB,c1 随后启动,将 g1 的订阅信息覆盖为 topicA,c1 的 Rebalance 负载将 topicA 的 pullRequest 添加到 pullRequestQueue 中,而恰好此时 c2 心跳包又将 g1 的订阅信息更新为 topicB,那么此时 c1 的 PullMessageService 线程拿到 pullRequestQueue 中 topicA 的 pullRequest 进行消息拉取,然而在 broker 端找不到消费者组 g1 下 topicA 的订阅信息(因为此时恰好被 c2 心跳包给覆盖了),就会报消费者订阅信息不存在的错误了

公众号「后端进阶」,专注后端技术分享!

© 著作权归作者所有

后端进阶
粉丝 75
博文 24
码字总数 51855
作品 0
广州
程序员
私信 提问
加载中

评论(7)

木九天
木九天
不喜欢最后加二维码的任何一个博客!
后端进阶
后端进阶 博主
走好不送.
OSC_uCMeOv
OSC_uCMeOv
放到不同的消费组是不是就没这个问题了
Skqing
Skqing
是的,人家官方文档都已经说明了,一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags("TagA")。
https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
后端进阶
后端进阶 博主
tag确实是个法宝, 可以大大减少topic的数量, 提高性能
淘淘我的小宝宝
淘淘我的小宝宝
有tag为何不用
后端进阶
后端进阶 博主
文章的问题是朋友碰到的消费组topic不一致的问题, 他的topic导致不一致是因为原来旧服务只有topicA, 但是服务代码更新了, 集群有些服务还没更新到新代码, 出现这个问题.
Kafka vs RocketMQ—— Topic数量对单机性能的影响

引言 上一期我们对比了三类消息产品(Kafka、RabbitMQ、RocketMQ)单纯发送小消息的性能,受到了程序猿们的广泛关注,其中大家对这种单纯的发送场景感到并不过瘾,因为没有任何一个网站的业务只...

a137268431
2018/04/15
0
0
让你rocketmq用得比预期要好的 1 种方法

匠心零度 转载请注明原创出处,谢谢! 方法 让你rocketmq用得比预期要好的 1 种方法:就是认真思考下面的几个问题: 使用rocketmq能解决那些问题?那些问题是不能解决的? 我们什么时候该添加...

匠心零度
2018/04/19
0
0
分布式消息系列:详解RocketMQ的简介与演进、架构设计、关键特性与应用场景

终身学习是程序员的必备能力,一群人在一起走得更远,一起学习,共抗惰性。今天,我们来重点了解RocketMQ的简介与演进、架构设计、关键特性及应用场景等内容。 本文内容大纲: RocketMQ的简介...

mikechen优知
01/11
163
0
说说MQ之RocketMQ(一)

原文出处:Valleylord RocketMQ 是出自 A 公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上比 Kafka 更好,目前,RocketMQ 的文档仍然不够丰...

Valleylord
2018/10/12
0
0
消息中间件 - Apache RocketMQ

RocketMQ是什么? 2016.11,阿里巴巴宣布捐赠RocketMQ到Apache软件基金会孵化项目 【Apache RocketMQ】RocketMQ捐赠给Apache那些鲜为人知的故事 阿里开源消息中间件RocketMQ的前世今生 Apac...

vintagewan
2014/02/18
98.6K
22

没有更多内容

加载失败,请刷新页面

加载更多

计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
今天
5
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
今天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
今天
6
0
【技术分享】TestFlight测试的流程文档

上架基本需求资料 1、苹果开发者账号(如还没账号先申请-苹果开发者账号申请教程) 2、开发好的APP 通过本篇教程,可以学习到ios证书申请和打包ipa上传到appstoreconnect.apple.com进行TestF...

qtb999
今天
10
0
再见 Spring Boot 1.X,Spring Boot 2.X 走向舞台中心

2019年8月6日,Spring 官方在其博客宣布,Spring Boot 1.x 停止维护,Spring Boot 1.x 生命周期正式结束。 其实早在2018年7月30号,Spring 官方就已经在博客进行过预告,Spring Boot 1.X 将维...

Java技术剑
今天
18
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部