文档章节

三、RabbitMQ如何实现AMQ协议(读书笔记)

XuePeng77
 XuePeng77
发布于 03/12 23:34
字数 2617
阅读 89
收藏 0

AMQP作为一种RPC传输机制

RabbitMQ作为一种AMPQ代理服务器,提供了一套严格的通信方式,核心部分的通信几乎都使用了RPC(远程过程调用)模式。

启动会话

AMQP协议定义,当客户端要与RabbitMQ交互时,首先要向RabbitMQ发送一个协议头(protocol header):

要完全连接到RabbitMQ,会经过由三个同步RPC请求所组成请求序列,这三个RPC请求分别是启动、调整和打开连接。对于应用RabbitMQ而言(不准备开发RabbitMQ Client),启动会话这个阶段并不是非常重要,稍作理解就好。

设置信道

AMQP协议定义,信道要使用AMQP连接作为互相传输信息的渠道,而且要将传输过程与其他正在进行中的会话隔离开。一个AMQP连接可以有多个信道,允许客户端和RabbitMQ之间进行多次会话,技术上称之为多路复用(multiplexing)。

不要使用过多的信道! 在传输过程中,信道只是分配给客户端和RabbitMQ之间所传递消息的一个整数值。但在客户端和RabbitMQ中,会为每个信道设置内存结构和对象,连接中的信道越多,用于管理该连接的消息流所需的内存也就越多。

AMQP RPC帧

AMQP使用类和方法在客户端和RabbitMQ之间创建公共语言,这些类和方法被称为AMQP命令(AMQP Commands)。AMQP中的类定义了一个功能范围,每个类都包含执行不同任务的方法。

例如:Connection.Start命令,由两部分组成:AMQP类(Class)和方法(Method)。

当使用命令与RabbitMQ进行交互时,执行这些命令,和所需要的所有参数,都被封装在一种数据结构中,并且对这个数据结构进行编码,以便传输。

帧由五个部分组成:

  1. 帧类型;
  2. 信道编号;
  3. 帧大小(以字节为单位);
  4. 有效载荷;
  5. 结束字节标记(ASCII值206);

帧类型

帧类型分为五种:

  1. 协议头帧:用于链接到RabbitMQ,仅在链接时使用一次;
  2. 方法帧:用于携带发送给RabbitMQ或从RabbitMQ接收到的RPC请求和响应;
  3. 内容头帧:包含一条消息的大小和属性;
  4. 消息体帧:包含消息的内容;
  5. 心跳帧:客户端与RabbitMQ之间进行传递,作为一种校验机制,确保连接的两端都可用;

可以在rabbitmq.config配置文件中设置心跳间隔,设置为0则代表关闭心跳检测机制(如果你的程序在一定程度上阻塞了通信,使得心跳检测难以正常运作)。

帧的编组

如果要向RabbitMQ发送消息,则会对方法帧、内容头帧和消息帧进行编组。

发送的第一个帧是方法帧,它携带了命令和执行它所需的参数(如交换器和路由键)。

在方法帧之后的是内容头帧,它包含了消息属性以及消息体大小,AMQP的帧大小是有上限的,默认为131KB,如果消息体超过了这个上限,消息内容将被拆分成多个消息体帧。

接着是消息帧,它携带了具体的消息内容。

为了更高效的处理这些帧,方法帧和内容头帧会被打包成二进制数据,消息帧则么有进行任何打包或编码。

虽然帧默认大小是131KB,但客户端在链接过程中可以设置更大或更小的帧,其最大可达一个32位值。

方法帧结构

方法帧携带构建RPC请求所需的类、方法以及相关参数。

这些属性告知RabbitMQ如何路由消息。Mandatory标识则告知RabbitMQ消息必须投递成功,否则发布消息的过程就应该是失败的。

通常,使用Basic.Publish RPC请求发送消息是一个单向会话。但是,如果你在发布消息时使用了mandatory标志,则应用程序应该监听从RabbitMQ发送回来的Basic.Return命令。如果RabbitMQ不能满足mandatory标志设置的要求,它将在同一个信道上发送一个Basic.Return命令到客户端。

内容头帧结构

内容头帧除了告知RabbitMQ该消息的大小之外,还包含消息的各种属性。这些属性存储在Basic.Properties映射表中,可包含描述消息内容的数据,也可完全是空白的。大多数客户端会预先填充一小部分字段,比如内容类型和投递模式。

属性是编写消息的强大工具,他们可以用来在发布者和消费则之间就消息的内容创建契约,从而允许对消息进行大量的定制操作。

消息体帧结构

消息题帧包含了实际消息数据的结构,可以是图片的二进制数据、序列化后的JSON、XML或字符串等等。

使用协议

在将消息发布到队列前,有几个与配置相关的步骤,至少需要设置交换器和队列,然后将他们绑定到一起,下面会从协议级别来看看进行这么步骤所发生的内部过程。

声明交换器

协议中,创建交换器会使用Exchange.Declare命令,该命令提供了定义交换器名称和类型的参数,以及用于消息处理的其他元数据。

一旦命令被发出,RabbitMQ在创建了交换器之后将发送一个Exchange.DeclareOk的方法帧作为相应。如果处于某些原因创建失败了,则RabbitMQ将使用Channel.Close命令关闭发送Channel.Declare命令的信道。该响应包含一个数字编码和文本值,用于说明Exchange.Declare失败并关闭信道的原因。

声明队列

协议中,创建队列会使用Queue.Declare命令,和创建交换器的过程是一样的。

在声明队列时,多次发送同一个Queue.Declare命令并不会有任何副作用。

当尝试声明一个与现有队列同名的新队列时,如果新队列的属性与现有队列不一样,那么RabbitMQ将关闭发出RPC请求的通道。 要正确的处理错误,一般是由客户端的实现来监听来自RabbitMQ的Channel.Close命令,以便能做出正确响应。 某些客户端实现可以让你返回异常,让你的应用程序去捕获并处理。还有的客户端提供回调风格,通过注册一个回调方法来处理。

绑定队列到交换器

协议中,绑定交换器与队列的命令是Queue.Bind,每次只能指定一个队列。这一步和创建交换器或创建队列是一样的。

发布消息

协议中,当发布消息到RabbitMQ时,多个帧封装了发送到服务器的消息数据。

在实际的消息内容到达RabbitMQ之前,客户端应用程序发送一个Basic.Publish方法帧,一个内容头帧和至少一个消息体帧。

当RabbitMQ接收到一个消息的所有帧并确定下一步操作之前,它将检查方法帧以获取它所需要的信息。Basic.Publish方法帧携带消息的交换器名称和路由键。在评估这些数据时,RabbitMQ会尝试将Basic.Publish帧中的交换器名称与配置交换器的数据库进行匹配。

默认情况下,如果使用RabbitMQ配置中不存在交换器进行消息发布,RabbitMQ将自动丢弃该消息。

如果交换器存在,但是路由不到队列,那么,想要确保消息成功投递,需要在发布时将mandatory标识设置为true,或者使用投递确认机制。

当RabbitMQ发现某一个交换器与Basic.Properties方法帧中的交换器名称相匹配时,它将判断该交换器中的绑定信息,并通过路由键寻找匹配的队列。当消息与任一绑定的队列符合匹配标准时,RabbitMQ服务器将以FIFO的顺序将消息放入队列中。

放入队列数据结构中的并不是实际消息,而是消息的引用。当RabbitMQ准备投递消息时,它将使用这个引用来编组消息并通过网络进行发送。这为发布到多个队列的消息提供了实质性的优化。

一旦不再需要这个消息,实际消息数据将会从RabbitMQ的内存中移除。

默认情况下,只要没有消费者正在监听队列,消息就会被存储在队列中。RabbitMQ可以将这些消息保存到内存或磁盘,具体取决于Basic.Properties中指定的delivery-mode属性。

消费消息

要消费RabbitMQ队列中的消息,消费者应用程序发出Basic.Consume命令来订阅RabbitMQ中的队列。服务器将返回Basic.ConsumerOk来响应。然后开始向消费者投递消息。

如果要让消费者停止接收消息,则可以发送Basic.Cancel命令。这个命令是异步发出的,而RabbitMQ可能仍然在发送消息,所以消费者在接收到一个Bsaic.CancelOk响应之前,仍然可以接收到来自RabbitMQ的消息。

消费消息时,有几个设置可以让RabbitMQ知道你要如何接收它们。其中一个设置是Basic.Consume命令中的no_ack参数。当设置为ture时,RabbitMQ将连续发送消息直到Basic.Cancel或者连接断开。如果no_ack标记为false,则消费者必须通过发送Basic.Ack命令来确认收到的每条消息。

当发送Basic.Ack时,消费者必须在Basic.Deliver方法帧中传递一个名为delivery tag(投递标签)的参数。RabbitMQ使用投递标签和信道作为唯一标识符来实现消息确认、拒绝等操作。

© 著作权归作者所有

XuePeng77
粉丝 48
博文 146
码字总数 194285
作品 0
丰台
私信 提问
我的RabbitMQ的学习成果

背景 在研发分布式事务的最终一致性事务模式时,使用了RabbitMQ。 在这之前也接触过RabbitMQ,但没有特别深入的去了解它的特性与原理。这次决定系统的学习一次,所以业余时间阅读大神们的书籍...

XuePeng77
04/15
265
0
NET下RabbitMQ实践[配置篇]

这个系列目前计划写四篇,分别是配置,示例,WCF发布,实战。当然不排除加餐情况。 介绍: rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统。他遵循Mozilla Public Licens...

长平狐
2012/11/06
366
0
【Python模块】rabbitMQ

RabbitMQ介绍: 父进程与子进程间,同一父继承可以用multiprocess的Manager模块来实现数据互访。 作用:RabbitMQ是为了实现相互独立的两个进程数据互访。 应用场景:不需要立即操作的数据。比...

等你的破船
2018/08/13
0
0
RabbitMQ学习之:(三)AMQP和RabbitMQ介绍

准备开始 高级消息队列协议(AMQP1)是一个异步消息传递所使用的应用层协议规范。作为线路层协议,而不是API(例如JMS2),AMQP客户端能够无视消息的来源任意发送和接受信息。现在,已经有相...

如风达
2015/01/07
348
0
RabbitMQ的transaction、confirm、ack三个概念的解释,kafka ack

在使用RabbitMQ的过程中,肯定会遇到这样的几个概念:transaction、confirm、ack。本文介绍一下这几个概念,以及他们之间的关系。 RabbitMQ是采用的AMQP协议,AMQP协议定义了”确认”(ackno...

xiaomin0322
2018/05/13
337
0

没有更多内容

加载失败,请刷新页面

加载更多

58. 静态工厂方法

参考:https://www.jianshu.com/p/ceb5ec8f1174 https://www.jianshu.com/p/fa15f63d399a 1.定义 用一个静态方法来对外提供自身实例的方法,即为我们所说的静态工厂方法(Static factory met...

20190513
9分钟前
4
0
Mybatis之StatementHandler

mybatis-3.4.6.release. 图1 StatementHandler是对CallableStatement、PreparedStatement、Statement的统一封装,在JDK中CallableStatement继承PreparedStatement,PreparedStatement继承Sta......

克虏伯
28分钟前
5
0
遇到API安全问题怎么办?F5 API加固解决方案怎么样?

  在各种APP泛滥的现在,背后都有同样泛滥的API接口在支撑,其中鱼龙混杂,直接裸奔的WEB API大量存在,安全性令人堪优在以前都采用自已定义的接口和结构,对于公开访问的接口,专业点的都会做...

梅丽莎好
51分钟前
10
0
迁Aliyun Redis踩坑之路 - 实践总结

背景: 从自建 redis(CacheCloud)到迁移到 aliyun redis 1. 踩“坑”一 问题: 平常小伙伴们在项目中可能用到redis key expire nofity的场景(比如:处理延时任务等),但是发现迁移后 ke...

少年与海
今天
3
0
干货:Kibana 可视化ElasticSearch数据展示分析

当你把数据存入ES中后,怎么更方面的查看这些数据,还想用不同的维度去看这些数据,是不是纠结,不能再专门搞个后台显示把。这里有神器 Kibana 专门干这个事情的,可以帮你把ES中的数据,通过...

枕邊書
今天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部