文档章节

【原创】RabbitMQ 之 Publisher Acknowledgement(翻译)

摩云飞
 摩云飞
发布于 2013/07/03 10:34
字数 1926
阅读 2553
收藏 3

Confirms (aka Publisher Acknowledgements)

      Using standard AMQP, the only way to guarantee that a message isn't lost is by using transactions -- make the channel transactional, publish the message, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced.

      如果采用标准的 AMQP 协议,则唯一能够保证消息不会丢失的方式是利用事务机制 -- 令 channel 处于 transactional 模式、向其 publish 消息、执行 commit 动作。在这种方式下,事务机制会带来大量的多余开销,并会导致吞吐量下降 250% 。为了补救事务带来的问题,引入了 confirmation 机制(即 Publisher Confirm)。

      To enable confirms, a client sends the confirm.select method. Depending on whether no-wait was set or not, the broker may respond with a confirm.select-ok. Once the confirm.select method is used on a channel, it is said to be in confirm mode. A transactional channel cannot be put into confirm mode and once a channel is in confirm mode, it cannot be made transactional.

      为了使能 confirm 机制,client 首先要发送 confirm.select 方法帧。取决于是否设置了 no-wait 属性,broker 会相应的判定是否以 confirm.select-ok 进行应答。一旦在 channel 上使用 confirm.select 方法,channel 就将处于 confirm 模式处于 transactional 模式的 channel 不能再被设置成 confirm 模式,反之亦然。

      Once a channel is in confirm mode, both the broker and the client count messages (counting starts at 1 on the first confirm.select). The broker then confirms messages as it handles them by sending a basic.ack on the same channel. The delivery-tag field contains the sequence number of the confirmed message. The broker may also set the multiple field in basic.ack to indicate that all messages up to and including the one with the sequence number have been handled.

      一旦 channel 处于 confirm 模式,broker 和 client (译者注:client 的计数自行实现)都将启动消息计数(以 confirm.select 为基础从 1 开始计数)。broker 会在处理完消息后(译者注:这里的说法会让人产生错误理解,何为处理完消息?后续还有涉及),在当前 channel 上通过发送 basic.ack 的方式对其(消息)进行 confirm 。delivery-tag的值标识了被 confirm 消息的序列号。broker 也可以通过设置 basic.ack 中的 multiple来表明到指定序列号为止的所有消息都已被 broker 正确的处理了。

      An example in Java that publishes a large number of messages to a channel in confirm mode and waits for the acknowledgements can be found here.

      一个 Java 示例展现了 publish 大量消息到一个处于 confirm 模式的 channel 并等待获取 acknowledgement 的情况,示例在这里

Negative Acknowledgment

      In exceptional cases when the broker is unable to handle messages successfully, instead of a basic.ack, the broker will send a basic.nack. In this context, fields of the basic.nack have the same meaning as the corresponding ones in basic.ack and the requeue field should be ignored. By nack'ing one or more messages, the broker indicates that it was unable to process the messages and refuses responsibility for them; at that point, the client may choose to re-publish the messages.

      在异常情况发生时,broker 将无法成功处理相应的消息,此时 broker 将发送  basic.nack 来代替 basic.ack 。在这个情形下,basic.nack 中各域值的含义与  basic.ack 中相应各域含义是相同的,同时 requeue的值应该被忽略。 通过  nack 一条或多条消息 , broker 表明自身无法对相应消息完成处理,并拒绝为这些消息的处理负责。在这种情况下,client 可以选择将消息 re-publish 。

      After a channel is put into confirm mode, all subsequently published messages will be confirmed or nack'd once. No guarantees are made as to how soon a message is confirmed. No message will be both confirmed and nack'd.

      在 channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被 nack 一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm 又被 nack 。

      basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue.

      basic.nack 只会在负责 queue 功能的 Erlang 进程发生内部错误时被发送。

When will messages be confirmed?

The broker will confirm messages once:
broker 将在下面的情况中对消息进行 confirm :

  • it decides a message will not be routed to queues
    (if the mandatory flag is set then the basic.return is sent first) or
    broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)

  • a transient message has reached all its queues (and mirrors) or
    非持久属性的消息到达了其所应该到达的所有 queue 中(和镜像 queue 中)

  • a persistent message has reached all its queues (and mirrors) and been persisted to disk (and fsynced) or
    持久消息到达了其所应该到达的所有 queue 中(和镜像 queue 中),并被持久化到了磁盘(被 fsync)

  • a persistent message has been consumed (and if necessary acknowledged) from all its queues
    持久消息从其所在的所有 queue 中被 consume 了(如果必要则会被 acknowledge)

updated in 2015-03-02

      For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack.

      对于无法路由的消息,broker 会在确认了通过 exchange 无法将消息路由到任何 queue 后,发送回客户端 basic.ack 进行确认(其中包含空的 queue 列表)。如果客户端发送消息时使用了 mandatory 属性,则会发送回客户端 basic.return + basic.ack 信息。

      For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.

      对于能够进行路由的消息,broker 会在消息被所有 queue “接受”后,发送回客户端 basic.ack 进行确认。对于具有 persistent 属性且路由到 durable queue 的消息而言,意味着持久化到硬盘动作的完成。对于镜像队列而言,意味着所有镜像队列都“接受”了该消息之后。

Ack Latency for Persistent Messages

basic.ack for a persistent message routed to a durable queue will be sent after persisting the message to disk. The RabbitMQ message store persists messages to disk in batches after an interval (a few hundred milliseconds) to minimise the number of fsync(2) calls, or when a queue is idle. This means that under a constant load, latency for basic.ack can reach a few hundred milliseconds. To improve throughput, applications are strongly advised to process acknowledgements asynchronously (as a stream) or publish batches of messages and wait for outstanding confirms. The exact API for this varies between client libraries.

Confirms and Guaranteed Delivery

The broker loses persistent messages if it crashes before said messages are written to disk. Under certain conditions, this causes the broker to behave in surprising ways.

broker 会丢失持久化消息,如果 broker 在将上述消息写入磁盘前异常。在一定条件下,这种情况会导致 broker 以一种奇怪的方式运行。

For instance, consider this scenario:

例如,考虑下述情景:

  1. a client publishes a persistent message to a durable queue
    一个 client 将持久消息 publish 到持久 queue 中
  2. a client consumes the message from the queue (noting that the message is persistent and the queue durable), but doesn't yet ack it,
    另一个 client 从 queue 中 consume 消息(注意:该消息具有持久属性,并且 queue 是持久化的),当尚未对其进行 ack 
  3. the broker dies and is restarted, and
    broker 异常重启
  4. the client reconnects and starts consuming messages.
    client 重连并开始 consume 消息

      At this point, the client could reasonably assume that the message will be delivered again. This is not the case: the restart has caused the broker to lose the message. In order to guarantee persistence, a client should use confirms. If the publisher's channel had been in confirm mode, the publisher would  not  have received an ack for the lost message (since the consumer message hadn't ack'd it and it hadn't been written to disk yet).

      在上述情景下,client 有理由认为消息需要被(broker)重新 deliver 。但这并非事实:重启(有可能)会令 broker 丢失消息。为了确保持久性,client 应该使用 confirm 机制。如果 publisher 使用的 channel 被设置为 confirm 模式,publisher 将不会收到已丢失消息的 ack(这是因为 consumer 没有对消息进行 ack ,同时该消息尚未被写入磁盘)。

 

 

© 著作权归作者所有

共有 人打赏支持
摩云飞
粉丝 368
博文 534
码字总数 952694
作品 0
徐汇
程序员
私信 提问
加载中

评论(8)

摩云飞
摩云飞

引用来自“superchris”的评论

文档里写了 If the client has not configured a return listener for a particular channel, then the associated returned messages will be silently dropped.
哪个文档,请贴个链接
摩云飞
摩云飞

引用来自“superchris”的评论

客户端publish一个持久消息后就退出了,如果此时交换机和队列都存在,但是没有消费者,这个消息会怎么处理呢?
如果publish的时候使用的都是默认属性(没有设置其它东东),如果消息能被正确路由到某个queue中,那么消息就会保存到queue里;如果没有路由能匹配,则丢弃消息
superchris
superchris
文档里写了 If the client has not configured a return listener for a particular channel, then the associated returned messages will be silently dropped.
superchris
superchris
客户端publish一个持久消息后就退出了,如果此时交换机和队列都存在,但是没有消费者,这个消息会怎么处理呢?
笑天居士
笑天居士

引用来自“摩云飞”的评论

引用来自“笑天居士”的评论

你好,向你请教一个问题:
publish端,confirm.select后,怎样获取返回的确认信息呢,我是用C客户端库的,没有找到相应的API
JAVA里就有ConfirmListen

这里首先需要明白的是(请允许我啰嗦一下):rabbitmq-c中其实是在两个地方使用了 Basic.Ack 信令的。一种是客户端做为 consumer 时手动回复 rabbitmq 服务器 ack 的情况;另外一种就是启用 Publisher comfirm 机制的情况。前者是客户端程序主动向外发送 Basic.Ack 信令,后者是客户端程序接收来自服务器的 Basic.Ack 信令。至于你提的问题,在启用了 Publisher comfirm 机制后,协议流程变为
--> Basic.Publish
--> Content-Header
--> Content-Body
Basic.Ack <--
而 rabbitmq-c 中 amqp_basic_publish() api 不是针对该模式的,所以你要做的是通过 amqp_simple_rpc() 方式将上述两个事情统一起来。

谢谢你,我找到方法了,也不知道对不对,测试了一下,可以读回来,不过可能还需要进一步理解和完善下,谢谢你的回复
{
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */

amqp_confirm_select(conn, 1); //在通道上打开Publish确认
die_on_error(amqp_basic_publish(conn,
1,
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey),
0, //mandatory标志位,消息不能到达队列则返回basic.return
1, //immediate标志位,消息不能到达消费者返回basic.return
&props,
amqp_cstring_bytes(messagebody)),
"Publishing");
}

{
/* Publish消息后需要在当前通道上监听返回的信息,来判断消息是否成功投递
* 这里要息根据投递消息的方式来过滤判断几个方法
*/
amqp_frame_t frame;
amqp_rpc_reply_t ret;

if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {
return;
}

if (AMQP_FRAME_METHOD == frame.frame_type) {
amqp_method_t method = frame.payload.method;
fprintf(stdout, "method.id=%08X,method.name=%s\n",
method.id, amqp_method_name(method.id));
switch (method.id) {
case AMQP_BASIC_ACK_METHOD:{
/* if we've turned publisher confirms on, and we've published a message
* here is a message being confirmed
*/
{
amqp_basic_ack_t *s;
s = (amqp_basic_ack_t *) method.decoded;
fprintf(stdout, "Ack.delivery_tag=%d\n", s->delivery_tag);
fprintf(stdout, "Ack.multiple=%d\n", s->multiple);
}

break;

case AMQP_BASIC_NACK_METHOD:
/* if we've turned publisher confirms on, and we've published a message
* here is a message not being confirmed
*/
{
amqp_basic_nack_t *s;
s = (amqp_basic_nack_t *) method.decoded;
fprintf(stdout, "NAck.delivery_tag=%d\n", s->delivery_tag);
fprintf(stdout, "NAck.multiple=%d\n", s->multiple);
fprintf(stdout, "NAck.requeue=%d\n", s->requeue);
}

break;

case AMQP_BASIC_RETURN_METHOD:
/* if a published message couldn't be routed and the mandatory flag was set
* this is what would be returned. The message then needs to be read.
*/
{
amqp_message_t message;
amqp_basic_return_t *s;
char str[1024];
s = (amqp_basic_return_t *) method.decoded;
fprintf(stdout, "Return.reply_code=%d\n", s->reply_code);
strncpy(str, s->reply_text.bytes, s->reply_text.len); str[s->reply_text.len] = 0;
fprintf(stdout, "Return.reply_text=%s\n", str);

ret = amqp_read_message(conn, frame.channel, &message, 0);
if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
return;
}
strncpy(str, message.body.bytes, message.body.len); str[message.body.len] = 0;
fprintf(stdout, "Return.message=%s\n", str);

amqp_destroy_message(&message);
}

break;

摩云飞
摩云飞

引用来自“笑天居士”的评论

你好,向你请教一个问题:
publish端,confirm.select后,怎样获取返回的确认信息呢,我是用C客户端库的,没有找到相应的API
JAVA里就有ConfirmListen

这里首先需要明白的是(请允许我啰嗦一下):rabbitmq-c中其实是在两个地方使用了 Basic.Ack 信令的。一种是客户端做为 consumer 时手动回复 rabbitmq 服务器 ack 的情况;另外一种就是启用 Publisher comfirm 机制的情况。前者是客户端程序主动向外发送 Basic.Ack 信令,后者是客户端程序接收来自服务器的 Basic.Ack 信令。至于你提的问题,在启用了 Publisher comfirm 机制后,协议流程变为
--> Basic.Publish
--> Content-Header
--> Content-Body
Basic.Ack <--
而 rabbitmq-c 中 amqp_basic_publish() api 不是针对该模式的,所以你要做的是通过 amqp_simple_rpc() 方式将上述两个事情统一起来。
笑天居士
笑天居士
你好,向你请教一个问题:
publish端,confirm.select后,怎样获取返回的确认信息呢,我是用C客户端库的,没有找到相应的API
JAVA里就有ConfirmListen
笑天居士
笑天居士
你好,向你请教一个问题:
publish端,confirm.select后,怎样获取返回的确认信息呢,我是用C客户端库的,没有找到相应的API
JAVA里就有ConfirmListen
RabbitMQ的transaction、confirm、ack三个概念的解释,kafka ack

在使用RabbitMQ的过程中,肯定会遇到这样的几个概念:transaction、confirm、ack。本文介绍一下这几个概念,以及他们之间的关系。 RabbitMQ是采用的AMQP协议,AMQP协议定义了”确认”(ackno...

xiaomin0322
05/13
0
0
RabbitMQ 3.6.12 RC2 发布,AMQP 消息服务器

RabbitMQ 3.6.12 RC2 发布了。RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这...

淡漠悠然
2017/09/01
577
4
【原创】rabbitmq-server用户手册(翻译)

为了方便工作中使用,周末抽空对rabbitmq-server用户手册进行了翻译,鉴于自己水平有限,翻译中难免有纰漏产生,如果疑问,欢迎指出探讨。 官方原文:http://www.rabbitmq.com/man/rabbitmq-...

摩云飞
2012/11/12
0
0
【原创】rabbitmq-echopid用户手册(翻译)

为了方便工作中使用,周末抽空对 rabbitmq-echopid 用户手册进行了翻译,鉴于自己水平有限,翻译中难免有纰漏产生,如果疑问,欢迎指出探讨。 官方原文:http://www.rabbitmq.com/man/rabbi...

摩云飞
2012/11/12
0
0
【原创】rabbitmq-plugins用户手册(翻译)

为了方便工作中使用,周末抽空对 rabbitmq-plugins 用户手册进行了翻译,鉴于自己水平有限,翻译中难免有纰漏产生,如果疑问,欢迎指出探讨。 官方原文:http://www.rabbitmq.com/man/rabbi...

摩云飞
2012/11/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Iris框架

1、安装iris: $ go get -u github.com/kataras/iris 2、golang iris web项目热重启 # 安装rizla包 $ go get -u github.com/kataras/rizla # 热重启方式启动iris项目 $ rizla main.go......

Liens
14分钟前
3
0
初探sentinel实践思考

简单说下, sentinel的优势: 友好的控制面板,支持实时监控 多种限流。支持QPS限流,线程数限流,多种限流策略,如:直接拒绝,匀速模式(漏斗),冷启动(如设置限制1000,延迟10秒,那第一...

爱吃大肉包
15分钟前
4
0
转:MongDB分页查询

找到了一篇关于MongDB分页查询的博客 https://www.cnblogs.com/wslook/p/9275861.html

_liucui_
16分钟前
1
0
《边缘云计算技术及标准化白皮书》

12月12日,第八届中国云计算标准和应用大会在北京隆重召开,工业和信息化部党组成员,总工程师张峰先生,中国工程院副院长陈左宁女士,中国工程院院士沈昌祥先生,中国电子技术标准化研究院院...

阿里云官方博客
22分钟前
1
0
网站安全公司对于网站逻辑漏洞的修复方案分享

在网站安全的日常安全检测当中,我们SINE安全公司发现网站的逻辑漏洞占比也是很高的,前段时间某酒店网站被爆出存在高危的逻辑漏洞,该漏洞导致酒店的几亿客户的信息遭泄露,包括手机号,姓名...

网站安全
26分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部