RocketMQ学习教程:07.RocketMQ消息查询【云图智联】

原创
2020/07/21 15:12
阅读数 151

在实际开发中,经常需要排查一条消息是否成功发送到底层MQ中,或者查看MQ中消息的内容,以及如何将消息发送给指定的/所有的消费者组重新消费。本文对RocketMQ提供到的查询机制和背后原理进行深入的介绍。文章主要包括4个部分:

  • 消息查询介绍:介绍消息查询中使用到的Message Key 、Unique Key、Message Id 的区别

  • 消息查询工具:分别介绍命令行工具、管理平台、客户端API这三种工具的详细用法,以及如何让消费者重新消费特定的消息。

  • 核心实现原理:介绍Message Key & Unique Key与Message Id的实现机制上区别,Unique Key在Exactly Once语义下的作用,以及为什么Message Id查询效率更高。

  • 索引机制:介绍Message Key & Unique Key底层使用的哈希索引机制

 

1 消息查询介绍

RocketMQ提供了3种消息查询方式:           

  • 按照Message Key 查询:消息的key是业务开发同学在发送消息之前自行指定的,通常会把具有业务含义,区分度高的字段作为消息的key,如用户id,订单id等。

  • 按照Unique Key查询:除了业务开发同学明确的指定消息中的key,RocketMQ生产者客户端在发送发送消息之前,会自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一代表一条消息。

  • 按照Message Id 查询:Message Id 是消息发送后,在Broker端生成的,其包含了Broker的地址,和在CommitLog中的偏移信息,并会将Message Id作为发送结果的一部分进行返回。Message Id中属于精确匹配,可以唯一定位一条消息,不需要使用哈希索引机制,查询效率更高。

RocketMQ有意弱化Unique Key与Message Id的区别,对外都称之为Message Id。在通过RocketMQ的命令行工具或管理平台进行查询时,二者可以通用。在根据Unique Key进行查询时,本身是有可能查询到多条消息的,但是查询工具会进行过滤,只会返回一条消息。种种情况导致很多RocketMQ的用户,并未能很好对二者进行区分。

业务开发同学在使用RocketMQ时,应该养成良好的习惯,在发送/消费消息时,将这些信息记录下来,通常是记录到日志文件中,以便在出现问题时进行排查。

以生产者在发送消息为例,通常由以下3步组成:

  1. //1 构建消息对象Message
  2. Message msg = new Message();
  3. msg.setTopic("TopicA");
  4. msg.setKeys("Key1");
  5. msg.setBody("message body".getBytes());
  6. try{
  7.     //2 发送消息
  8.     SendResult result = producer.send(msg);
  9.     
  10.     //3 打印发送结果
  11.     System.out.println(result);
  12. }catch (Exception e){
  13.     e.printStackTrace();
  14. }

第1步:构建消息

构建消息对象Message,在这里我们通过setKeys方法设置消息的key,如果有多个key可以使用空格" "进行分割

第2步:发送消息

发送消息,会返回一个SendResult对象表示消息发送结果。

第3步:打印发送结果

结果中包含Unique Key和Message Id,如下所示:

  1. SendResult [
  2. sendStatus=SEND_OK, 
  3. msgId=C0A801030D4B18B4AAC247DE4A0D0000,
  4. offsetMsgId=C0A8010300002A9F000000000007BEE9,
  5. messageQueue=MessageQueue [topic=TopicA, brokerName=broker-a, queueId=0], 
  6. queueOffset=0]

其中:

  • sendStatus:表示消息发送结果的状态       

  • msgId:注意这里的命名虽然是msgId,但实际上其是Unique Key

  • offsetMsgId:Broker返回的Message ID 。在后文中,未进行特殊说明的情况下,Message ID总是表示offsetMsgId。

  • messageQueue:消息发送到了哪个的队列,如上图显示发送到broker-a的第0个的队列 

  • queueOffset:消息在队列中的偏移量,每次发送到一个队列时,offset+1 

事实上,用户主动设置的Key以及客户端自动生成的Unique Key,最终都会设置到Message对象的properties属性中,如下图所示:

下载 (1).png

 

其中:

  • KEYS:表示用户通过setKeys方法设置的消息key,

  • UNIQ_KEY:表示消息发送之前由RocketMQ客户端自动生成的Unique Key。细心的读者发现了其值与上述打印SendResult结果中的msgId字段的值是一样的,这验证了前面所说的msgId表示的实际上就是Unique Key的说法。

在了解如何主动设置Key,以及如何获取RocketMQ自动生成的Unique Key和Message Id后,就可以利用一些工具来进行查询。

 

2 消息查询工具

RocketMQ提供了3种方式来根据Message Key、Unique Key、Message Id来查询消息,包括:

  • 命令行工具:主要是运维同学使用

  • 管理平台:运维和开发同学都可以使用

  • 客户端API:主要是开发同学使用

这些工具除了可以查询某条消息的内容,还支持将查询到的历史消息让消费者重新进行消费,下面分别进行讲述。

2.1 命令行工具

RocketMQ自带的mqadmin命令行工具提供了一些子命令,用于查询消息,如下:

  1. $ sh bin/mqadmin 
  2. The most commonly used mqadmin commands are: 
  3. ...
  4.    queryMsgById         按照Message Id查询消息
  5.    queryMsgByKey        按照Key查询消息 
  6.    queryMsgByUniqueKey  按照UNIQ_KEY查询消息
  7. ...

此外,还有一个queryMsgByOffset子命令,不在本文讲述范畴内

2.1.1 按照Message Key查询

mqadmin工具的queryMsgByKey子命令提供了根据key进行查询消息的功能。注意,由于一个key可能对应多条消息,查询结果只会展示出这些消息对应的Unique Key,需要根据Unique Key再次进行查询。

queryMsgByKey子命令使用方法如下所示:

  1. $ sh bin/mqadmin queryMsgByKey -h
  2. usage: mqadmin queryMsgByKey [-h] -k <arg> [-n <arg>] -t <arg>
  3.  -h,--help                打印帮助信息
  4.  -k,--msgKey <arg>        指定消息的key,必须提供
  5.  -n,--namesrvAddr <arg>   指定nameserver地址
  6.  -t,--topic <arg>         指定topic,必须提供

例如,要查询在TopicA中,key为Key1的消息

  1. $ sh bin/mqadmin queryMsgByKey -k Key1 -t TopicA -n localhost:9876
  2. #Message ID                           #QID         #Offset
  3. C0A80103515618B4AAC2429A6E970000         0               0
  4. C0A80103511B18B4AAC24296D2CB0000         0               0
  5. C0A8010354C418B4AAC242A281360000         1               0
  6. C0A8010354C718B4AAC242A2B5340000         1               1

这里,我们看到输出结果中包含了4条记录。其中:

  • Message ID列:这里这一列的名字显示有问题,实际上其代表的是Unique Key

  • QID列:表示队列的ID,注意在RocketMQ中唯一地位一个队列需要topic+brokerName+queueId。这里只显示了queueId,其实并不能知道在哪个Broker上。

  • Offset:消息在在队列中的偏移量

在查询到Unique Key之后,我们就可以使用另外一个命令:queryMsgByUniqueKey,来查询消息的具体内容。

2.1.2 按照Unique Key查询

mqadmin工具的queryMsgByUniqueKey的子命令有2个功能:

  • 根据Unique Key查询消息,并展示结果

  • 让消费者重新消费Unique Key对应的消息 

我们将分别进行讲述。queryMsgByUniqueKey子命令的使用方式如下:

  1. $ sh bin/mqadmin queryMsgByUniqueKey -h
  2. usage: mqadmin queryMsgByUniqueKey [-d <arg>] [-g <arg>] [-h] -i <arg> [-n <arg>] -t <arg>
  3.  -d,--clientId <arg>        消费者 client id
  4.  -g,--consumerGroup <arg>   消费者组名称
  5.  -h,--help                  打印帮助信息
  6.  -i,--msgId <arg>           消息的Unique Key,或者Message Id
  7.  -n,--namesrvAddr <arg>     NameServer地址
  8.  -t,--topic <arg>           消息所属的Topic,必须提供

这里对-i 参数进行下特殊说明,其即可接受Unique Key,即SendResult中的msgId字段;也可以接受Message Id,即SendResult中的offsetMsgId字段。

根据Unique Key查询消息:

通过-i 参数指定Unique Key,通过-t 参数指定topic,如:

  1. $ sh bin/mqadmin queryMsgByUniqueKey -i C0A80103511B18B4AAC24296D2CB0000 -t TopicA -n localhost:9876
  2. Topic:               TopicA
  3. Tags:                [null]
  4. Keys:                [Key1]
  5. Queue ID:            0
  6. Queue Offset:        0
  7. CommitLog Offset:    507625
  8. Reconsume Times:     0
  9. Born Timestamp:      2019-12-13 22:19:40,619
  10. Store Timestamp:     2019-12-13 22:19:40,631
  11. Born Host:           192.168.1.3:53974
  12. Store Host:          192.168.1.3:10911
  13. System Flag:         0
  14. Properties:          {KEYS=Key1, UNIQ_KEY=C0A80103511B18B4AAC24296D2CB0000, WAIT=true}
  15. Message Body Path:   /tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000

对于消息体的内容,会存储到Message Body Path字段指定到的路径中。可通过cat命令查看(仅适用于消息体是字符串):

  1. $ cat /tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000
  2. message body

指定消费者重新消费:

queryMsgByUniqueKey子命令还接收另外两个参数:-g参数用于指定消费者组名称,-d参数指定消费者client id。指定了这两个参数之后,消息将由消费者直接消费,而不是打印在控制台上。

首先,通过consumerStatus命令,查询出消费者组下的client id信息,如:

  1. $ sh bin/mqadmin consumerStatus -g group_X -n localhost:9876
  2. 001  192.168.1.3@26868    V4_5_0    1576300822831/192.168.1.3@26868
  3. Same subscription in the same group of consumer
  4. Rebalance OK

这里显示了消费者组group_X下面只有一个消费者,client id为192.168.1.3@26868。

接着我们可以在queryMsgByUniqueKey子命令中,添加-g和-d参数,如下所示:

  1. $ sh bin/mqadmin queryMsgByUniqueKey \
  2. -g group_X \
  3. -d 192.168.1.3@26868 \
  4. -t TopicA \
  5. -i C0A80103511B18B4AAC24296D2CB0000 \
  6. -n localhost:9876
  7.  
  8. ConsumeMessageDirectlyResult [
  9. order=false, 
  10. autoCommit=true, 
  11. consumeResult=CR_SUCCESS, 
  12. remark=null, 
  13. spentTimeMills=1]

可以看到,这里并没有打印出消息内容,取而代之的是消息消费的结果。

在内部,主要是分为3个步骤来完成让指定消费者来消费这条消息,如下图所示:

Xnip2019-12-17_00-05-14.png

第1步:

命令行工具给所有Broker发起QUERY_MESSAGE请求查询消息,因为并不知道UNIQ_KEY这条消息在哪个Broker上,且最多只会返回一条消息,如果超过1条其他会过滤掉;如果查询不到就直接报错。

第2步:

根据消息中包含了Store Host信息,也就是消息存储在哪个Broker上,接来下命令行工具会直接给这个Broker发起CONSUME_MESSAGE_DIRECTLY请求,这个请求会携带msgId,group和client id的信息

第3步:

Broker接收到这个请求,查询出消息内容后,主动给消费者发送CONSUME_MESSAGE_DIRECTLY通知请求,注意虽然与第2步使用了同一个请求码,但不同的是这个请求中包含了消息体的内容,消费者可直接处理。注意:这里并不是将消息重新发送到Topic中,否则订阅这个Topic的所有消费者组,都会重新消费这条消息。

2.1.3 根据Message Id进行查询

前面讲解生产者发送消息后,返回的SendResult对象包含一个offsetMsgId字段,这也就是我们常规意义上所说的Message Id,我们也可以根据这个字段来查询消息。

根据Message Id查询使用queryMsgById子命令,这个命令有3个作用:

  • 根据Message Id查询消息

  • 通知指定消费者重新消费这条消息,与queryMsgByUniqueKey类似,这里不再介绍

  • 将消息重新发送到Topic中,所有消费者组都将重新消费 

queryMsgById子命令用法如下所示:

  1. $ sh bin/mqadmin queryMsgById -h
  2. usage: mqadmin queryMsgById [-d <arg>] [-g <arg>] [-h] -i <arg> [-n <arg>] [-s <arg>] [-u <arg>]
  3.  -d,--clientId <arg>        消费者id
  4.  -g,--consumerGroup <arg>   消费者组名称
  5.  -h,--help                  打印帮助信息
  6.  -i,--msgId <arg>           Message Id
  7.  -n,--namesrvAddr <arg>     Name server 地址
  8.  -s,--sendMessage <arg>     重新发送消息
  9.  -u,--unitName <arg>        unit name

参数说明如下:

-d和-g参数:类似于queryMsgById命令,用于将消息发送给某个消费者进行重新消费

-i 参数:指定Message Id,即SendResult对象的offsetMsgId字段,多个值使用逗号","分割。

-s参数:是否重新发送消息到Topic。如果同时指定了-d和-g参数,-s参数不生效。

根据Message Id查询消息:

下图根据SendResult的offsetMsgId字段,作为-i参数,来查询一条消息:

  1. $ sh bin/mqadmin queryMsgById -i C0A8010300002A9F000000000007BEE9 -n localhost:9876
  2. OffsetID:            C0A8010300002A9F000000000007BEE9
  3. OffsetID:            C0A8010300002A9F000000000007BEE9
  4. Topic:               TopicA
  5. Tags:                [null]
  6. Keys:                [Key1]
  7. Queue ID:            0
  8. Queue Offset:        0
  9. CommitLog Offset:    507625
  10. Reconsume Times:     0
  11. Born Timestamp:      2019-12-13 22:19:40,619
  12. Store Timestamp:     2019-12-13 22:19:40,631
  13. Born Host:           192.168.1.3:53974
  14. Store Host:          192.168.1.3:10911
  15. System Flag:         0
  16. Properties:          {KEYS=Key1, UNIQ_KEY=C0A80103511B18B4AAC24296D2CB0000, WAIT=true}
  17. Message Body Path:   /tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000

与queryMsgByUniqueKey子命令输出基本类似,主要是在输出开头多出了OffsetID字段,即offsetMsgId。需要注意的是,queryMsgById不能接受Unqiue Key作为查询参数。

重新发送消息到topic:

在指定-s参数后,消息将重新发送到topic,如下(输出进行了格式化):

  1. $ sh bin/mqadmin queryMsgById -i C0A8010300002A9F000000000007BEE9 -n localhost:9876 -s true
  2.  
  3. prepare resend msg. originalMsgId=C0A8010300002A9F000000000007BEE9
  4.  
  5. SendResult [
  6. sendStatus=SEND_OK, 
  7. msgId=C0A80103511B18B4AAC24296D2CB0000, 
  8. offsetMsgId=C0A80103000078BF000000000004D923, 
  9. messageQueue=MessageQueue [topic=TopicA, brokerName=broker-b, queueId=1], 
  10. queueOffset=1]

可以看到,这里因为消息是重新发送到了Topic中,因此与我们之前使用生产者发送消息一样,输出的是一个SendResult。在这种情况下,订阅这个Topic的所有消费者组都会重新消费到这条消息。

在实际开发中,如果多个消费者组订阅了某个Topic的消息,如果所有的消费者都希望重新消费,那么就应该使用-s参数。如果只是某个消费者希望重新消费,那么应该指定-g和-d参数。

另外,我们看到发送前打印的originalMsgId和发送后SendResult中的offsetMsgId值并不一样,这是因为消息发送到Topic重新进行了存储,因此值不相同。这也是为什么我们说Message Id可以唯一对应一条消息的原因。

而输出的SendResult结果中的msgId,即Unique Key,并没有发生变化,因此尽管名字是Unique Key,但是实际上还是有可能对应多条消息的。而前面根据queryMsgByUniqueKey查询之所以只有一条消息,实际上是进行了过滤。

2.2 管理平台

RocketMQ提供的命令行工具,虽然功能强大,一般是运维同学使用较多。通过RocketMQ提供的管理平台进来行消息查询,则对业务开发同学更加友好。在管理平台的消息一栏,有3个TAB,分别用于:根据Topic时间范围查询、Message Key查询、Message Id查询,下面分别进行介绍。

根据Topic时间范围查询:

按 Topic 查询属于范围查询,不推荐使用,因为时间范围内消息很多,不具备区分度。查询时,尽可能设置最为精确的时间区间,以便缩小查询范围,提高速度。最多返回2000条数据。

Xnip2019-12-17_00-09-41

根据Message Key查询:

按 Message Key 查询属于模糊查询,仅适用于没有记录 Message ID 但是设置了具有区分度的 Message Key的情况。 目前,根据Message Key查询,有一个很大局限性:不能指定时间范围,且最多返回64条数据。如果用户指定的key重复率比较高的话,就有可能搜不到。

Xnip2019-12-17_00-11-23.png

根据Message Id查询:

按 Message ID 查询属于精确查询,速度快,精确匹配,只会返回一条结果,推荐使用。在这里,传入Unique Key,offsetMsgId都可以。

查看消息详情:

在按照Topic 时间范围查询,按照Message Key查询,结果列表有一个Message Detail按钮,点击可以看到消息详情:包括消息key、tag、生成时间,消息体内容等。在详情页面,也可以将消息直接发送给某个消费者组进行重新消费。

需要注意的是,在消息体展示的时候,只能将消息体转换成字符串进行展示,如果消息的内容是protobuf、thrift、hessian编码的,那么将显示一堆乱码。

如果公司内部有统一的IDL/Schema管理平台,则可以解决这个问题,通过为每个Topic关联一个IDL,在消息展示时,可以根据IDL反序列化后在进行展示。

2.3 客户端API

除了通过命令行工具和管理平台,还可以通过客户端API的方式来进行查询,这其实是最本质的方式,命令行工具和管理平台的查询功能都是基于此实现。

  1. 在org.apache.rocketmq.client.MQAdmin接口中,定义了以下几个方法用于消息查询:
  2. //msgId参数:仅接收SendResult中的offsetMsgId,返回单条消息
  3. MessageExt viewMessage(final String msgId)
  4. //msgId参数:传入SendResult中的offsetMsgId、msgId都可以,返回单条消息
  5. MessageExt viewMessage(String topic,String msgId)
  6. //在指定topic下,根据key进行查询,并指定最大返回条数,以及开始和结束时间
  7. QueryResult queryMessage(final String topic, final String key, 
  8.                          final int maxNum, final long begin,final long end)

对于MQAdmin接口,可能部分同学比较陌生。不过我们常用的DefaultMQProducer、DefaultMQPushConsumer等,都实现了此接口,因此都具备消息查询的能力,如下所示:

对于命令行工具,底层实际上是基于MQAdminExt接口的实现来完成的。

细心的读者会问,相同的查询功能在在多处实现是不是太麻烦了?事实上,这只是对外暴露的接口,在内部,实际上都是基于MQAdminImpl这个类来完成的。

viewMessage方法:

两种viewMessage方法重载形式,都只会返回单条消息。下面以生产者搜索为例,讲解如何使用API进行查询:

  1. //初始化Producer
  2. DefaultMQProducer producer = new DefaultMQProducer();
  3. producer.setNamesrvAddr("127.0.0.1:9876");
  4. producer.start();
  5. //根据UniqueKey查询
  6. String uniqueKey = "C0A8010354C418B4AAC242A281360000";
  7. MessageExt msg = producer.viewMessage("TopicA", uniqueKey);
  8. //打印结果:这里仅输出Unique Key与offsetMsgId
  9. MessageClientExt msgExt= (MessageClientExt) msg;
  10. System.out.println("Unique Key:"+msgExt.getMsgId()//即UNIQUE_KEY
  11.         +"\noffsetMsgId:"+msgExt.getOffsetMsgId());

输出结果如下:

  1. Unique Key:C0A8010354C418B4AAC242A281360000
  2. offsetMsgId:C0A8010300002A9F000000000007BF94

如果我们把offsetMsgId当做方法参数传入,也可以查询到相同的结果。这是因为,在方法内部实际上是分两步进行查询的:

  1. 先把参数当做offsetMsgId,即Message Id进行查询

  1. 如果失败,再尝试当做Unique Key进行查询。

源码如下所示:

DefaultMQProducer#viewMessage(String,String)

  1. @Override
  2. public MessageExt viewMessage(String topic, String msgId) {//省略异常声明
  3.     try {
  4.         //1 尝试当做offsetMsgId进行查询
  5.         MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId);
  6.         return this.viewMessage(msgId);
  7.     } catch (Exception e) {
  8.         //查询失败直接忽略
  9.     }
  10.     //2 尝试当做UNIQ_KEY进行查询
  11.     return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId);
  12. }

前面提到,Unique Key只是从逻辑上代表一条消息,实际上在Broker端可能存储了多条,因此在当做Unique Key进行查询时,会进行过滤,只取其中一条。源码如下所示:

MQAdminImpl#queryMessageByUniqKey

  1. public MessageExt queryMessageByUniqKey(String topic,String uniqKey) {
  2.     //根据uniqKey进行查询
  3.     QueryResult qr = this.queryMessage(topic, uniqKey, 32,
  4.             MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000,
  5.             Long.MAX_VALUE,
  6.             true);
  7.     
  8.     //对查询结果进行过滤,最多只取一条
  9.     if (qr != null && qr.getMessageList() != null 
  10.                             && qr.getMessageList().size() > 0) {
  11.         return qr.getMessageList().get(0);
  12.     } else {
  13.         return null;
  14.     }
  15. }

我们也可以通过另外只接收一个参数的viewMessage方法进行查询,但是需要注意的是,参数只能是offsetMsgId,不能是Unique Key。

  1. String offsetMsgId = "C0A8010300002A9F000000000007BF94";
  2. producer.viewMessage(offsetMsgId);

queryMessage方法:

其是根据消息Key进行查询,这里不再介绍API如何使用。则与前面两种viewMessage方法重载不同,其返回的是一个QueryResult对象,包含了多条消息。

主要是注意这个方法接收时间范围参数,相比较于管理平台更加灵活。管理平台按照消息Key查询,默认最多返回64条消息,且不能支持指定时间范围,如果消息Key重复度较高,那么可能有些消息搜索不到。如果是在指定时间范围内返回64条消息,如果没有发现想找到的消息,再选择其他时间范围,则可以规避这个问题。

 

3 实现原理

    Unqiue Key & Message Key都需要利用RocketMQ的哈希索引机制来完成消息查询,由于建立索引有一定的开销,因此Broker端提供了相关配置项来控制是否开启索引。关于RocketMQ索引机制将在后面的文章进行详细的介绍。

    Message Id是在Broker端生成的,其包含了Broker地址和commit Log offset信息,可以精确匹配一条消息,查询消息更好。下面分别介绍 Unqiue Key & Message Id的生成和作用。

3.1 Unique Key的生成与作用

3.1.1 Unique Key生成

Unique Key是生产者发送消息之前,由RocketMQ 客户端自动生成的,具体来说,RocketMQ发送消息之前,最终都要通过以下方法:

DefaultMQProducerImpl#sendKernelImpl

  1. private SendResult sendKernelImpl(final Message msg,
  2.                                   final MessageQueue mq,
  3.                                   final CommunicationMode communicationMode,
  4.                                   final SendCallback sendCallback,
  5.                                   final TopicPublishInfo topicPublishInfo,
  6.                                   final long timeout)  {//省略异常声明
  7.      //...略
  8.      
  9.         try {
  10.             //如果不是批量消息,则生成Unique Key
  11.             if (!(msg instanceof MessageBatch)) {
  12.                 MessageClientIDSetter.setUniqID(msg);
  13.             }
  14.             
  15.      //...略

如上所示,如果不是批量消息,会通过MessageClientIDSettersetUniqID方法为消息设置Unique key,该方法实现如下所示:

MessageClientIDSetter#setUniqID

  1. public static void setUniqID(final Message msg) {
  2.     // Unique Key不为空的情况下,才进行设置
  3.     if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,) == null) {
  4.         msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,, createUniqID());
  5.     }
  6. }

 

如果消息的Unique Key属性为null,就通过createUniqID()方法为消息创建一个新的Unique Key,并设置到消息属性中。之所以要判断Unique Key是否为null与其作用有关。

3.1.2 Unique Key作用

了解Unique Key的作用对于我们理解消息重复的原因有很大的帮助。RocketMQ并不保证消息投递过程中的Exactly Once语义,即消息只会被精确消费一次,需要消费者自己做幂等。而通常导致消息重复消费的原因,主要包括:

  • 生产者发送时消息重复:RocketMQ对于无序消息发送失败,默认会重试2次。对于有序消息和普通有序消息为什么不进行重试,可参考:RocketMQ NameServer详解 

  • 消费者Rebalance时消息重复:这里不做介绍,可参考RocketMQ Rebalance机制详解 

导致生产者发送重复消息的原因可能是:一条消息已被成功发送到服务端并完成持久化,由于网络超时此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败,此时生产者将再次尝试发送消息。

在重试发送时,sendKernelImpl会被重复调用,意味着setUniqID方法会被重复调用,不过由于setUniqID方法实现中进行判空处理,因此重复设置Unique Key。在这种情况下,消费者后续会收到两条内容相同并且 Unique Key 也相同的消息(offsetMsgId不同,因为对Broker来说存储了多次)。

那么消费者如何判断,消费重复是因为重复发送还是Rebalance导致的重复消费呢?

消费者实现MessageListener接口监听到的消息类型是MessageExt,可以将其强制转换为MessageClientExt,之后调用getMsgId方法获取Unique Key,调用getOffsetMsgId获得Message Id。如果多消息的Unique Key相同,但是offsetMsgId不同,则有可能是因为重复发送导致。

3.1.3 批量发送模式下的Unique Key

DefaultMQProducer提供了批量发送消息的接口:

  1. public SendResult send(Collection<Message> msgs)

在内部,这批消息首先会被构建成一个MessageBatch对象。在前面sendKernelImpl方法中我们也看到了,对于MessageBatch对象,并不会设置Unique Key。这是因为在将批量消息转换成MessageBatch时,已经设置过了。

可能有一部分同学会误以为一个批量消息中每条消息Unique Key是相同的,其实不然,每条消息Unique Key都不同。

这里通过一个批量发送案例进行说明:

  1. //构建批量消息
  2. ArrayList<Message> msgs = new ArrayList<>();
  3. Message msg1 = new Message("Topic_S",("message3").getBytes());
  4. Message msg2 = new Message("Topic_S",("message4").getBytes());
  5. msgs.add(msg1);
  6. msgs.add(msg2);
  7. //发送
  8. SendResult result = producer.send(msgs);
  9. //打印
  10. System.out.println(result);

输出如下所示:

  1. SendResult [sendStatus=SEND_OK,
  2. msgId=C0A80103583618B4AAC24CDC29F10000,C0A80103583618B4AAC24CDC29F10001,
  3. offsetMsgId=C0A80103000051AF00000000000B05BD,C0A80103000051AF00000000000B065B,
  4. messageQueue=MessageQueue [topic=Topic_S, brokerName=broker-c, queueId=2],
  5. queueOffset=3]

可以看到,此时输出的msgId(即Unique Key)和offsetMsgId都会包含多个值。客户端给批量消息中每条消息设置不同的Unqiue Key,可以参考DefaultMQProducer#batch方法源码:

  1. private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
  2.     MessageBatch msgBatch;
  3.     try {
  4.         //1 将消息集合转换为MessageBatch
  5.         msgBatch = MessageBatch.generateFromList(msgs);
  6.         
  7.         //2 迭代每个消息,逐一设置Unique Key
  8.         for (Message message : msgBatch) {
  9.             Validators.checkMessage(message, this);
  10.             MessageClientIDSetter.setUniqID(message);
  11.         }
  12.         //3 设置批量消息的消息体
  13.         msgBatch.setBody(msgBatch.encode());
  14.     } catch (Exception e) {
  15.         throw new MQClientException("Failed to initiate the MessageBatch", e);
  16.     }
  17.     return msgBatch;
  18. }

 

3.2 Message Id生成

SendResult中的offsetMsgId,即常规意义上我们所说的Message Id是在Broker端生成的,用于唯一标识一条消息,在根据Message Id查询的情况下,最多只能查询到一条消息。Message Id总共 16 字节,包含消息存储主机地址,消息 Commit Log offset。如下图所示:

image.png

RocketMQ内部通过一个MessageId对象进行表示:

  1. public class MessageId {
  2.     private SocketAddress address; //broker地址
  3.     private long offset; //commit log offset

并提供了一个MessageDecoder对象来创建或者解码MessageId。

  1. public static String createMessageId(final ByteBuffer input,
  2.                                      final ByteBuffer addr, final long offset)
  3. public static MessageId decodeMessageId(final String msgId)

Broker端在顺序存储消息时,首先会通过createMessageId方法创建msgId。源码如下所示:

CommitLog.DefaultAppendMessageCallback#doAppend

  1. public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, 
  2.                       final int maxBlank,final MessageExtBrokerInner msgInner) {
  3.  
  4.     //1 PHY OFFSET:即Commit Log Offset 或者称之为msgOffsetId
  5.     long wroteOffset = fileFromOffset + byteBuffer.position();
  6.  
  7.     //2 hostHolder用于维护broker地址信息
  8.     this.resetByteBuffer(hostHolder, 8);
  9.  
  10.     //3 创建msgOffsetId
  11.     String msgId = MessageDecoder.createMessageId(this.msgIdMemory, 
  12.                         msgInner.getStoreHostBytes(hostHolder), wroteOffset);

而客户端在根据msgId向Broker查询消息时,首先会将通过MessageDecoder的decodeMessageId方法,之后直接向这个broker进行查询指定位置的消息。

参见:MQAdminImpl#viewMessage

  1. public MessageExt viewMessage(String msgId) {//省略异常声明
  2.  
  3.     //1 根据msgId解码成MessageId对象
  4.     MessageId messageId = null;
  5.     try {
  6.         messageId = MessageDecoder.decodeMessageId(msgId);
  7.     } catch (Exception e) {
  8.         throw new MQClientException(ResponseCode.NO_MESSAGE, 
  9.                       "query message by id finished, but no message.");
  10.     }
  11.     //2 根据MessageId中的Broker地址和commit log offset信息进行查询
  12.     return this.mQClientFactory.getMQClientAPIImpl().viewMessage(
  13.             RemotingUtil.socketAddress2String(messageId.getAddress()),
  14.             messageId.getOffset(), 
  15.             timeoutMillis);
  16. }

由于根据Message Id进行查询,实际上是直接从特定Broker的CommitLog中的指定位置进行查询的,属于精确匹配,并不像用户设置的key,或者Unique Key那么样,需要使用到哈希索引机制,因此效率很高。

 

4 总结

  • RocketMQ提供了3种消息查询方式:Message Key & Unique Key & Message Id

  • RocketMQ提供了3种消息查询工具:命令行、管理平台、客户端API,且支持将查询到让特定/所有消费者组重新消费

  • RocketMQ有意对用户屏蔽Unique Key & Message Id区别,很多地方二者可以通用

  • Message Key & Unique Key 需要使用到哈希索引机制,有额外的索引维护成本

  • Message Id由Broker和commit log offset组成,属于精确匹配,查询效率更好

免费学习视频欢迎关注云图智联:https://e.yuntuzhilian.com/ 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部