文档章节

【原创】RabbitMQ之PublisherConfirm实战问题总结

摩云飞
 摩云飞
发布于 2015/03/30 19:12
字数 1501
阅读 2554
收藏 57

如何理解Publisher Confirm机制

      Publisher Confirm机制(又称为ConfirmsPublisher Acknowledgements)是作为解决事务机制性能开销大(导致吞吐量下降)而提出的另外一种保证消息不会丢失的方式。

Publisher Confirm的协议交互过程



(补充说明:上图中未显示出info信息的两条交互对应的就是 Confirm.Select 和 Confirm.Select-ok ,显示不出来是因为 wireshark 没有对该扩展进行支持


(手绘图一张,字迹潦草了些)


      为了使能
Confirm机制,client 首先要发送 confirm.select 方法帧。取决于是否设置了 no-wait属性broker 会相应的判定是否以 confirm.select-ok 进行应答。一旦在 channel 上使用 confirm.select 方法,channel 就将处于 confirm 模式。处于 transactional 模式 channel 不能再被设置成 confirm 模式,反之亦然。

      一旦 channel 处于 confirm 模式,broker client (注:client 侧计数需要自行实现)都将启动消息计数(以 confirm.select 为基础从 1 开始计数)。broker 会在处理完消息后,在当前 channel 上通过发送 basic.ack 的方式对其进行 confirm delivery-tag 的值标识了被 confirm 消息的序列号。broker 也可以通过设置 basic.ack 中的 multiple 来表明到指定序列号为止的所有消息都已被 broker 正确的处理了。

      在发生异常情况发生时,broker 将无法成功处理相应的消息,此时 broker 将发送 basic.nack 来代替 basic.ack 。在这个情形下,basic.nack 中各域值的含义与 basic.ack 中相应各域含义是相同的,同时 requeue 域的值应该被忽略。通过 nack 一条或多条消息,broker 表明自身无法对相应消息完成处理,并拒绝为这些消息的处理负责。在这种情况下,client 可以选择将消息 re-publish

      channel 被设置成 confirm 模式之后,所有被 publish 的消息都将被 confirm(即 ack) 或者被 nack 一次。但是没有针对消息被 confirm 的快慢做任何保证,但是限制了同一条消息不会既被 confirm 又被 nack

目前情况下,basic.nack 信令只会在负责 queue 功能的 Erlang 进程发生内部错误时被发送


消息在何时被 Confirm

      这个部分是该机制的核心,也是最容易被理解错误的地方。首先看下一下我之前翻译过的官网针对该问题的说明。可以针对该问题,官网的说法有了一些微妙的调整。

如果将上述说明拆开后进行对应,则为下面的部分:

对应了

(关于mandatory属性的说明,可以参考《RabbitMQ 之 mandatory》)

对应了

两者对比后,可以看到

这条约束被移除了!!其背后真正的原因可以参考《RabbitMQ publish方法中的immediate和mandatory属性》里针对immediate属性的详细解释。

简要的讲,就是

Support for "immediate" made many parts of the codebase more complex, particularly around mirrored queues. It also stood in the way of our being able to deliver substantial performance improvements in mirrored queues.

好了,到此总结一下上面内容中比较隐晦的点:

第一,mandatory属性的使用和Publisher Confirm机制没有必然关系(其实immediate属性也是这样,但因为该属性后续不被支持了,所以此处不进行讨论了)。

上图展示的就是,单独使用mandatory属性测试exchange绑定了queue和未绑定queue时的两种情况。

第二,只有将mandatory属性和Publisher Confirm机制结合使用,才能真正实现消息的可靠投递(消息到queue中)

      上面给出的是调用Basic.Publish方法时可以设置的属性。下面这张图展示的是启用了Publisher Confirm机制但是没有设置mandatory属性的情况,同时所投递的目标exchange也没有进行queue绑定。

而下面这个是绑定了queue的情况。

      可以看到,两种情况下,服务器的回应是没有差别的,都是对消息应答Basic.Ack。而事实上,第一种情况下,消息已经被blackholed了。很诡异吧,消息丢失了,却仍旧可以收到来自服务器的Ack。这也是实际使用中容易犯的错误。

再次将官方说明写在这里:

“对于无法路由的消息,broker 会在确认了通过 exchange 无法将消息路由到任何 queue 后,发送回客户端 basic.ack 进行确认(其中包含空的 queue 列表)。如果客户端发送消息时使用了 mandatory 属性,则会发送回客户端 basic.return + basic.ack 信息。”

貌似其中说,发回的basic.ack中会包含一个空的queue列表。但是确实没看到。既然如此还是乖乖的使用mandatory属性吧,使用后效果如下

      可以看到消息被通过Basic.Return发送回来,且告知原因是NO_ROUTE。同时通过Basic.Ack告知当前消息的Delivery-Tag1。于是乎,客户端侧可以根据回应进行相应的处理了(比如重新publish等)。

第三,在immediate属性被移除后,若想通过Publisher Confirm机制实现类似immediate对应的功能(即消息立即被Consumer消费,若未被消费,则Producer要知道),则需要结合DLXTTL设置。

一种可能的方案是:

  • 为目标queue设置DLX;
  • 为DLX绑定回收消息的queue,并对其进行consume;
  • 在通过 basic.publish 发送消息时针对每一条消息设置 expiration 字段为0;

关于DLXTTL的说明可以参考:

RabbitMQ 之 Dead Letter Exchanges(翻译)》和《RabbitMQ 之 TTL 详解(翻译) 

总结

本文针对RabbitMQ中的Publisher Confirm机制展开讨论,针对常见属性进行说明,并对可能遇到的问题给出方案建议。



© 著作权归作者所有

摩云飞
粉丝 375
博文 534
码字总数 952694
作品 0
徐汇
程序员
私信 提问
加载中

评论(12)

leeyi
leeyi
我的代码如下:
$attrs = array();
$attrs['uuid'] = uuid();
$attrs['content_type'] = 'application/json';
$attrs['content_encoding'] = 'utf-8';
$attrs['timestamp'] = time();
$attrs['delivery_mode'] = '2'; // 持久化消息
// 消息发布
// $flag=AMQP_MANDATORY 在发布消息时,消息必须被路由到一个有效的队列中。如果不是,将返回一个错误。
$channel->startTransaction();
    $exchange->publish($message, $routingkey_name, $flag=AMQP_MANDATORY, $attrs);
    $channel->commitTransaction();
    return true;
leeyi
leeyi
请问,安装的是 PHP扩展 amqp.so ;是使用 “发送方确认模式”??
摩云飞
摩云飞 博主

引用来自“znoodl”的评论

看楼主研究的那么深,也是用的rabbitmq-c,有几个问题想问一下
1、no_ack中,开启ack,如果ack一个不存在的delivery_tag会断开连接吗?我试的是会断开,go客户端也试过。后台有多个线程执行长时间任务(一个接收端consumer),如果无意间端开了连接,再ack 消息会导致连接断开,重连(代码中加入了自动重连)之后,另外一个线程完成任务,ack了一个上一个连接的delivery_tag又导致连接断开,无线循环。
虽然只是猜想,但是为了可靠要做好这些措施。
2、你做过测试连接断开的情况吗?测试中amqp_get_rpc_reply()并不可靠,只有靠正常调用的返回值确定。
3、暂时就2个0

引用来自“摩云飞”的评论

不好意思,这两天忙,才看到你提出的问题。我的看法是这样的: 1.设置no_ack=0后,即要求业务自己进行ack回复,也就要求需要记录basic.deliver中的delivery_tag,并在回复的时候设置到basic.ack中。在此种情况下,两种信令中的delivery_tag肯定是一致的,以上说的是正常情况。 2.在异常情况下,一般来说也就是Connection断开(这里不仔细区分channel断开的情况了),上述行为就需要做调整了。因为RabbitMQ服务器侧会在Connection断开后,重置delivery_tag的值,即重新开始从1计数,所以,如果你的业务还按照老的计数值进行回复,服务器当然会认为你是有问题的,所以必然会断开当前Connection。 3.结论:RabbitMQ文档中提到了一个“Assume Nothing”原则,大概的意思就是,一旦连接断开,你要假定服务器测的东西都是不可用的,你要按照一切都不存在的情况来处理你的代码。就你提出的问题而言,肯定你的处理代码中没有重置delivery_tag的值,所以才会不断断开、重连。 希望能解决你的问题~

引用来自“znoodl”的评论

感谢回复 第一个问题:我抓包测试是服务端接收到未知delivery_tag,然后给客户端发送channel-close,然后客户端回应channel-close,连接就断开了,感觉这样很不合理。根据你说的,要自己在业务上判断连接断开进行处理,则就是我说的第二个问题了。 因为我对rabbitmq-c封装,将重连也封装,而在外部使用上是不知道连接是否断开的,好麻烦啊
"服务端接收到未知delivery_tag"就是因为Connection断开后,服务器会在Connection再次建立时重新开始计数导致的。
nobodywho
nobodywho

引用来自“znoodl”的评论

看楼主研究的那么深,也是用的rabbitmq-c,有几个问题想问一下
1、no_ack中,开启ack,如果ack一个不存在的delivery_tag会断开连接吗?我试的是会断开,go客户端也试过。后台有多个线程执行长时间任务(一个接收端consumer),如果无意间端开了连接,再ack 消息会导致连接断开,重连(代码中加入了自动重连)之后,另外一个线程完成任务,ack了一个上一个连接的delivery_tag又导致连接断开,无线循环。
虽然只是猜想,但是为了可靠要做好这些措施。
2、你做过测试连接断开的情况吗?测试中amqp_get_rpc_reply()并不可靠,只有靠正常调用的返回值确定。
3、暂时就2个0

引用来自“摩云飞”的评论

不好意思,这两天忙,才看到你提出的问题。我的看法是这样的: 1.设置no_ack=0后,即要求业务自己进行ack回复,也就要求需要记录basic.deliver中的delivery_tag,并在回复的时候设置到basic.ack中。在此种情况下,两种信令中的delivery_tag肯定是一致的,以上说的是正常情况。 2.在异常情况下,一般来说也就是Connection断开(这里不仔细区分channel断开的情况了),上述行为就需要做调整了。因为RabbitMQ服务器侧会在Connection断开后,重置delivery_tag的值,即重新开始从1计数,所以,如果你的业务还按照老的计数值进行回复,服务器当然会认为你是有问题的,所以必然会断开当前Connection。 3.结论:RabbitMQ文档中提到了一个“Assume Nothing”原则,大概的意思就是,一旦连接断开,你要假定服务器测的东西都是不可用的,你要按照一切都不存在的情况来处理你的代码。就你提出的问题而言,肯定你的处理代码中没有重置delivery_tag的值,所以才会不断断开、重连。 希望能解决你的问题~
感谢回复 第一个问题:我抓包测试是服务端接收到未知delivery_tag,然后给客户端发送channel-close,然后客户端回应channel-close,连接就断开了,感觉这样很不合理。根据你说的,要自己在业务上判断连接断开进行处理,则就是我说的第二个问题了。 因为我对rabbitmq-c封装,将重连也封装,而在外部使用上是不知道连接是否断开的,好麻烦啊
摩云飞
摩云飞 博主

引用来自“znoodl”的评论

看楼主研究的那么深,也是用的rabbitmq-c,有几个问题想问一下
1、no_ack中,开启ack,如果ack一个不存在的delivery_tag会断开连接吗?我试的是会断开,go客户端也试过。后台有多个线程执行长时间任务(一个接收端consumer),如果无意间端开了连接,再ack 消息会导致连接断开,重连(代码中加入了自动重连)之后,另外一个线程完成任务,ack了一个上一个连接的delivery_tag又导致连接断开,无线循环。
虽然只是猜想,但是为了可靠要做好这些措施。
2、你做过测试连接断开的情况吗?测试中amqp_get_rpc_reply()并不可靠,只有靠正常调用的返回值确定。
3、暂时就2个0
不好意思,这两天忙,才看到你提出的问题。我的看法是这样的: 1.设置no_ack=0后,即要求业务自己进行ack回复,也就要求需要记录basic.deliver中的delivery_tag,并在回复的时候设置到basic.ack中。在此种情况下,两种信令中的delivery_tag肯定是一致的,以上说的是正常情况。 2.在异常情况下,一般来说也就是Connection断开(这里不仔细区分channel断开的情况了),上述行为就需要做调整了。因为RabbitMQ服务器侧会在Connection断开后,重置delivery_tag的值,即重新开始从1计数,所以,如果你的业务还按照老的计数值进行回复,服务器当然会认为你是有问题的,所以必然会断开当前Connection。 3.结论:RabbitMQ文档中提到了一个“Assume Nothing”原则,大概的意思就是,一旦连接断开,你要假定服务器测的东西都是不可用的,你要按照一切都不存在的情况来处理你的代码。就你提出的问题而言,肯定你的处理代码中没有重置delivery_tag的值,所以才会不断断开、重连。 希望能解决你的问题~
nobodywho
nobodywho
看楼主研究的那么深,也是用的rabbitmq-c,有几个问题想问一下
1、no_ack中,开启ack,如果ack一个不存在的delivery_tag会断开连接吗?我试的是会断开,go客户端也试过。后台有多个线程执行长时间任务(一个接收端consumer),如果无意间端开了连接,再ack 消息会导致连接断开,重连(代码中加入了自动重连)之后,另外一个线程完成任务,ack了一个上一个连接的delivery_tag又导致连接断开,无线循环。
虽然只是猜想,但是为了可靠要做好这些措施。
2、你做过测试连接断开的情况吗?测试中amqp_get_rpc_reply()并不可靠,只有靠正常调用的返回值确定。
3、暂时就2个0
摩云飞
摩云飞 博主

引用来自“Code_skylei”的评论

太有才了,手工打造
Code_skylei
Code_skylei
太有才了,手工打造
摩云飞
摩云飞 博主

引用来自“kobe_gino”的评论

顶......
我们是不认识?!
摩云飞
摩云飞 博主

引用来自“hhqqnu”的评论

问下截图上的工具是啥
wireshark
RabbitMQ 实战教程 文集

RabbitMQ 实战教程 文集 此系列博客经梁总(梁桂钊的博客)授权录入本站点,下面推荐下梁总的技术公众号【服务端的思维】,公众号不定时更新技术文章,干货满满!! RabbitMQ 实战教程(一) He...

chenssy
2018/10/06
0
0
Docker下RabbitMQ四部曲之四:高可用实战

本章是《Docker下RabbitMQ四部曲》系列的终篇,今天的我们一起来体验Rabbit’MQ集群的高可用能力,看看RabbitMQ集群中的部分节点宕机时,是否还能生产和消费消息; 原文地址:https://blog....

boling_cavalry
2018/05/19
0
0
php| 初探 rabbitmq

date: 2018-09-03 21:30:23 title: php| 初探 rabbitmq description: 零零散散折腾了 rabbitmq 几次, 归纳总结一下先 经常看到消息队列( MQ ), 实战中比较少, 说说我的一些粗线的理解: 引入消...

daydaygo
2018/09/05
0
0
Docker下RabbitMQ三部曲之一:极速体验(单机和集群)

从本章开始,我们一起在Docker环境实战RabbitMQ环境部署和对应的Java开发,当前是《Docker下RabbitMQ三部曲》系列的第一篇,整个三部曲由以下三篇文章组成: 1. 第一篇,即本章,我们用最快的...

boling_cavalry
2018/05/12
0
0
RabbitMQ实战2.消息轮询、响应、持久化

继上篇 RabbitMQ实战1.消息代理 消息轮询分配 如果生产者投递的消息需要运行相当长的时间,且有多个消费者在处理消息,那么RabbitMQ是怎么分配消息的? 新建 new_task.py 新建 worker.py 在三...

章鱼喵_
2018/08/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

java通过ServerSocket与Socket实现通信

首先说一下ServerSocket与Socket. 1.ServerSocket ServerSocket是用来监听客户端Socket连接的类,如果没有连接会一直处于等待状态. ServetSocket有三个构造方法: (1) ServerSocket(int port);...

Blueeeeeee
今天
6
0
用 Sphinx 搭建博客时,如何自定义插件?

之前有不少同学看过我的个人博客(http://python-online.cn),也根据我写的教程完成了自己个人站点的搭建。 点此:使用 Python 30分钟 教你快速搭建一个博客 为防有的同学不清楚 Sphinx ,这...

王炳明
昨天
5
0
黑客之道-40本书籍助你快速入门黑客技术免费下载

场景 黑客是一个中文词语,皆源自英文hacker,随着灰鸽子的出现,灰鸽子成为了很多假借黑客名义控制他人电脑的黑客技术,于是出现了“骇客”与"黑客"分家。2012年电影频道节目中心出品的电影...

badaoliumang
昨天
15
0
很遗憾,没有一篇文章能讲清楚线程的生命周期!

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本。 简介 大家都知道线程是有生命周期,但是彤哥可以认真负责地告诉你网上几乎没有一篇文章讲得是完全正确的。 ...

彤哥读源码
昨天
15
0
jquery--DOM操作基础

本文转载于:专业的前端网站➭jquery--DOM操作基础 元素的访问 元素属性操作 获取:attr(name);$("#my").attr("src"); 设置:attr(name,value);$("#myImg").attr("src","images/1.jpg"); ......

前端老手
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部