RabbitMQ 学习笔记(总结)

原创
2022/06/21 12:04
阅读数 146

一、RabbitMQ 的深入理解和最简单的用途说明

如何在新的系统中使用 RabbitMQ.
系统设计的两个重大问题.
第一条要满足未来的业务需求的不断变化和增加。也就是可扩展性.
第二条要满足性能的可伸缩性。也就是可集群性… 通过增加机器能处理更多的请求
第三条要解耦合.
如果不解耦合,未来业务增加或变更的时候你还在修改 3 年前写的代码。试问你有多大的把握保证升级好系统不出问题?如何可以写新的代码而不用修改老代码所带来的好处谁都知道…
第四条简单易懂.
以上 4 条在任何一个系统中都要遵循的原则。以前是无法做到的。自从有了 MQ 以后。这些都可以同时做到了.
以前的设计理念是把系统看作一个人,按照工作的指令从上到下的执行.
现在要建立的概念是,把系统的各个功能看作不同的人。人与人之间的沟通通过消息进行交流传递信息…
有了 MQ 以后把一个人的事情分给了不同的人,分工合作所带来的好处是专业化,并行化。当然也引入了一些麻烦,性能开销多一些,工作任务的完整性不能立即得到反馈。幸好我们可以通过最终一致性。来解决这个麻烦的问题…
下面进入正题.
第一个问题 RabbitMQ 是如何支持可扩展性的.

 

bc80d87dd4b413a21b2397964005d3df.png


如上图,寄件人 P 是系统的一个功能模块。用来发送消息。一般是在某些重要的业务状态变更时发送消息。例如:新订单产生时,订单已打包时,订单已出库时,订单已发出时.
那么当事件 新订单产生时,我们需要把这个信息告诉谁呢?给财务?还是给仓库发货?
这个地方最大的重点是。当事件产生时。根本不关心。该投递给谁.
我只要把我的重要的信息投到这个乱七八糟的 MQ 系统即可。其它人你该干嘛干嘛。反正我的任务完成了. (有没有甩手掌柜的感觉..)
我只要告诉系统,我的事件属于那一类.
例如: “某某省。某某市。某某公司。产生新订单”
那么这个地址就属于 投递地址.. 至于这个地址具体投到哪个邮箱那是邮局的事情.
当然还有一些具体的订单内容也属于要告诉系统的内容.
那么下一个问题来了,邮局怎么知道 你的这个消息应该投递给谁?
参考我们现实世界中的邮寄系统。是默认的省市县这么投递的。这是固定思维.
但是我们的 MQ 系统中不是这样的。是先有收件人的邮箱. (队列 Queue). MQ 才能投递。否则就丢弃这个信息…
所以 MQ 系统应该先有收件人的邮箱 Queue 也就是队列。才能接收到信息.
再有邮局
再有发信息的人.
RabbitMQ 能实现系统扩展的一个重要功能在于,可以两个邮箱收同一个地址的信.
翻译成专业的话 RabbitMQ 可以 两个队列 Queue 订阅同一个 RoutingKey 的信息..
RabbitMQ 在投递的时候,会把一份信息,投递到多个队列邮箱中 Queue…
这是系统可扩展性的基础.
第二个问题 RabbitMQ 如何满足性能的可伸缩性。也就是可集群性
先上图

 

08b5c0b5f024ed49df6ff0cdaee35836.png




从上图,可以看到。性能扩展的关键点就在于 订阅人 C1, 订阅人 C2 轮流收到邮箱队列里面的信息,订阅人 C1 和订阅人 C2 收到的信息内容不同,但都属于同一类….
所以。订阅人 C1 和订阅人 C2 是干同一种工作的客户端。用来提高处理能力.
上面说完了,如何使用。下面再分析一下几个关注点.
如果订阅人的 down 机了。信息会丢失吗?
事实上是不会的。只要有邮箱 (队列 Queue) 存在。信息就一直存在,除非订阅人去取走.
如果订阅人一直 down 机,邮箱队列能存多少信息?会不会爆掉?
理论上和实际上都是有上限的不可能无限多。具体多少看硬盘吧.. 我没测到过上限.
我这篇文章并不打算讲解邮局的 4 种投递模式。有其它文章讲的很好。我只打算使用 topic 这种模式。因为它更灵活一些.
再说一下我的另外两个观点.
不要在业务程序中用代码定义创建 邮局 ExChange. 和邮箱 Queue 队列 这属于系统设计者要构架的事情。要有专门独立的程序和规则去创建。这样可以统一管理事件类型。避免过多的乱七八糟的 RoutingKey 混乱.
我的理解认为
消息系统的分布式可扩展的实现在于消息广播,集群性的实现在于邮箱队列.
RabbitMQ 是先广播后队列的.
Exchange: 就是邮局的概念等同于 中国邮政和顺丰快递、
routingkey: 就是邮件地址的概念.
queue: 就是邮箱接收软件,但是可以接收多个地址的邮件,通过 bind 实现。
producer: 消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel 代表一个会话任务。

 

二、为什么要使用 RabbitMQ?RabbitMQ 有什么优点?

原因有三:

1. 解耦

 

 

9832ccf55e8d07a2416aa3f1e9f3073a.png

 

2. 异步

 

 

3bdf5475e483f034ed091188406c4cbd.png


3. 削峰

 

 

 

652b98ee01c5e56590de17f438790453.png

 

三、rabbitmq 的 routingkey 的作用

对于消息发布者而言它只负责把消息发布出去,甚至它也不知道消息是发到哪个 queue,消息通过 exchange 到达 queue,exchange 的职责非常简单,就是一边接收发布者的消息一边把这些消息推到 queue 中。

而 exchange 是怎么知道消息应该推到哪个 queue 呢,这就要通过绑定 queue 与 exchange 时的 routingkey 了,通过代码进行绑定并且指定 routingkey,下面有一张关系图,p (发布者) —> x (exchange) bindding (绑定关系也就是我们的 routingkey) 红色代表着 queue

我们来看代码:

在消息的生产者端:

@Component
public class RabbitOrderSender {
//自动注入RabbitTemplate模板类
@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private BrokerMessageLogMapper brokerMessageLogMapper;

//回调函数: confirm确认
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
String messageId = correlationData.getId();
if(ack){
//如果confirm返回成功 则进行更新
brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
} else {
//失败则进行具体的后续操作:重试 或者补偿等手段
System.err.println("异常处理...");
}
}
};

//发送消息方法调用: 构建自定义对象消息
public void sendOrder(Order order) throws Exception {
// 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
rabbitTemplate.setConfirmCallback(confirmCallback);
//消息唯一ID
CorrelationData correlationData = new CorrelationData(order.getMessageId());
rabbitTemplate.convertAndSend("order-exchange", "order.ABC", order, correlationData);
}
}

利用rabbitTemplate(import org.springframework.amqp.rabbit.core.RabbitTemplate;需要在pom.xml中导入amqp的依赖)的convertAndSend方法就可以发送,这里order-exchange为交换机exchange,order.ABC为routingKey,并没有指定对应消息需要发往哪个队列,还有指定消息回调。

在消息的消费者端:

@Component
public class OrderReceiver {
//配置监听的哪一个队列,同时在没有queue和exchange的情况下会去创建并建立绑定关系
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true"),
exchange = @Exchange(name="order-exchange",durable = "true",type = "topic"),
key = "order.*"
)
)
@RabbitHandler//如果有消息过来,在消费的时候调用这个方法
public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel) throws IOException {
//消费者操作
System.out.println("---------收到消息,开始消费---------");
System.out.println("订单ID:"+order.getId());

/**
* Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,
* 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
* RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
*/
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

/**
* multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认
* 如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认
*/
boolean multiple = false;

//ACK,确认一条消息已经被消费。不然的话,在rabbitmq首页会有Unacked显示为未处理数1.
channel.basicAck(deliveryTag,multiple);
}
}

更多编程知识学习和资源尽在公众号【优派编程】

感谢你的在看和 “点赞”,祝大家事业有成,学业进步,心想事成!

遇到问题不懂可以互相交流!

 

原文地址:

http://wp.fang1688.cn/java/460.html

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部