轻松一刻,图解RocketMQ原理

原创
2021/12/30 11:24
阅读数 832

从故事开始

一个小公司XX,开发团队需要2个服务器互相通信。最开始消息量不大,使用3台物理服务器,通过固定IP地址方式,互相通信。 消息服务简单到可以通过RMI(Remote Method Invocation,JAVA原生远程方法调用)互相直联通信,开发团队出于未来扩展需要,使用 独立的 192.168.2.2服务来承载消息通信。

image.png

随着公司的逐渐壮大,1个消息服务不够用了,需要扩容到2台服务。那么消息者与生产者怎样知道在哪台消息服务发送与接收消息呢? 

image.png

通过添加一个消息注册中心,消息注册中心持有所有消息服务的: topic、服务地址,解决了Topic不知道在哪个服务器上的问题。 生产者、消息者 发送或接收前,主动询问“消息注册中心”就好了。

image.png

我们看下今天的主角“RocketMQ”的架构图。是不是和上面的很像呢?

image.png

RocketMQ实际的Topic组织方式是怎样的?

image.png

到这里你已经了解了RoekctMQ的大致原理,下面我们继续了解每个部分。

 

RokectMQ

1. RokectMQ-生产者 Producer

1) 发给谁?

生产者采集轮询的方式发送消息(选择逻辑可通过MessageQueueSelector自定义)。下面是筛选逻辑(位置:TopicPublishInfo#selectOneMessageQueue(java.lang.String)):

image.png

如果消息发送再失败的话,下次进行消息队列选择时,参数会传递lastBrokerName以规避上次MesageQueue所在的Broker,否则还是很有可能再次失败。

 

2) 怎样发送?

RocketMQ支持3种消息发送方式:同步(sync)、异步(async)、单向(oneway)。

  • 同步:发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回发送结果。
  • 异步:发送者向MQ执行发送消息API时,指定消息发送成功后的回掉函数,然后调用消息发送API后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。
  • 单向:消息发送者向MQ执行发送消息API时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上。

image.png

 

2. 注册中心 NameServer

1) 注册流程

Broker消息服务器在启动时向所有Name Server注册,Producer在发送消息之前先从 Name Server获取 Broker 服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。NameServer与每台Broker服务器保持长连接,并间隔 30s 检测Broker是否存活,如果检测到Broker右机 ,则从路由注册表中将其移除 。但是路由变化不会马上通知消 息生产者,这是为了降低NameServer实现的复杂性 ,在消息发送端提供容错机制来保证消息发送的高可用性。

image.png

2) 注册了哪些信息?

image.png

3) 怎样维护注册信息?

image.png

 

3. 消息服务 Broker

1) 消息处理流程与消息存储

(1)消息的存储流转路径

图中蓝色线条代表处理流程,灰色代表文件夹中的内容展开。

image.png

文件

描述

特点

commitlog

单个文件大小默认1G ,文件名长度为20位,以文件中第一条消息的偏移量命名

顺序写入,随机读取

consumequeue

存储路径consumequeue/{topic}/{queueId}/{fileName},文件采取定长设计,20个字节/条(8字节的commitlog物理偏移量、4字节的消息长度、8字节tagHashcode),单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M

顺序写入,顺序读取

(2)消息的逻辑流转路径

image.png

2) 定时消息的处理逻辑

image.png

 

4. 消费者 consumer

image.png

1) 多个消费者怎样协调?

image.png

2) 消费者内部的消息拉取流程

image.png

 

要点回顾

集群模式下,一个Queue只能由一个消费者消费,消费者进度保存在服务器端;

生产者

  • 发送方式:同步(sync)、异步(async)、单向(oneway);
  • 发送时,默认轮询发送到ConsumerQueue中;可通过自行实现MessageQueueSelector发送指定的队列中;
  • 发送时,消息重试1次;

消费者

1) 拉取消息的控制参数:

  • pullBatchSize: 每次从MetaQ服务器拉取的消息数
  • pullInterval: 两次拉取之间的时间间隔ms(默认不间隔)

2) 本地消息堆积控制参数:

  • pullThresholdForQueue: 本地堆积的消息数量
  • pullThresholdSizeForQueue: 本地堆积的消息大小
  • consumeConcurrentlyMaxSpan: 本地堆积的拉取次数

3) 消费者参数

  • consumeThreadMin: 消费线程最小数量
  • consumeThreadMax: 消费线程最大数量
  • consumeMessageBatchMaxSize: 消费线程一次可以消费的消息数量

 

--------------------当你读到这里,代表这篇文章对你是有价值的,鼓励下作者,点个赞吧!!!--------------------

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部