RabbitMQ——镜像队列问题(一)

原创
2019/10/23 23:48
阅读数 290

最近在使用镜像队列的过程中遇到了一些坑,通过阅读相关源码,大量的测试,不敢说对其中的原理掌握得非常透彻, 但基本能分析定位问题的原因,并且能自圆其说。这里整理总结下, 方便后续的回溯。欢迎大家交流指正。


【问题现象】

在镜像队列模式下,镜像队列所在的节点全部停止然后同时启动,启动后可能会出现一些奇怪的现象,比如:

  • WEB上部分队列为stopped状态

  • 部分队列并没有slave

  • 队列看着是存在的,但消息无法投递到该队列中


部分现象如下图所示:




其实所有这些现象最终本质是同一个问题,下面重点讲述镜像队列的相关原理并对该问题进行分析。


【准备知识】

在分析问题前,先讲解镜像队列相关的信息进行铺垫。

1、队列进程

懂一点erlang知识的都知道,erlang应用程序内部由成千上万个进程组成,这些进程大体可以分为两类,一类是工作者进程;一类是监督者进程。工作进程负责处理业务逻辑;监督者进程负责启动工作者进程,并对其进行监控,在必要的时候重启工作者进程,比如工作者进程异常退出时。


在rabbitmq中,队列对应的进程(rabbit_amqqueue_process)就属于工作者进程,每个这样的进程负责一个队列消息的处理;每个工作者进程也都有一个自己的监督者进程(rabbit_amqqueue_sup);每个监督者进程又共同有一个监督者进程(rabbit_amqqueue_sup_sup),这些进程构成了逻辑上的父子关系,rabbitmq在启动时会按照其父子关系依次将进程创建启动。具体层级关系如下图所示:



对于镜像队列模式,除了队列进程外,还有用于队列master/slave协调选主的coordinator进程,以及用于广播消息生产消费操作的gm进程。



生产者发送的消息、消费者消费的消息都由队列的master进程处理,master进程对消息的处理通过gm广播给其他节点的gm进程,其他节点的gm进程收到消息后再转发给对应的slave进程,slave进程收到消息后进行相应的处理保证与master的同步。(这里的master进程,slave进程就是前面图中的rabbit_amqqueue_process)


2、队列对应的内存数据表

在rabbitmq内部,维护了一个队列信息的表,记录了队列名称,队列master进程PID,slave进程PID等等信息,具体的表结构为:



这里主要关注的字段有

  • name:队列的名称。

  • pid:队列master进程的PID,生产者发送的消息,通过routing-key匹配找到对应的队列后,在查找该表找到队列master进程的PID,然后将消息发送给master进程。

  • slave_pids:队列slave进程PID集合,按照加入先后顺序进行排序

  • sync_slave_pids:已完成消息同步的slave进程PID集合

  • gm_pids:gm进程PID集合

  • state:队列的状态,live、crashed、stopped其中一个


队列进程在启动过程中会动态更新这几个字段的值,并在集群中实时同步。


注:实际上,在rabbitmq内部为队列维护了两张表,一个是记录持久化队列信息的rabbit_durable_queue表,该表中的数据会定期刷到磁盘中,便于重启后的恢复;一个是rabbit_queue表,这个表是内存态的,记录了所有队列的全部信息。

rabbit_durable_queue表会记录PID这个可能动态变化的字段信息,用于重启后判断创建持久化队列的master进程。而其他可能动态变化的字段信息例如队列的slave进程pid集合,gm进程pid集合则不会记录;rabbit_queue表则记录队列的全部信息,运行过程中也都是通过查找该表找队列的master进程、slave进程,gm进程。


【启动流程】

1、队列master的启动流程

1)完成自身初始化,并在rabbit_queue表中插入记录,填充相关字段,例如pid字段。

2)启动coordinator、gm进程,并增量更新rabbit_queue表数据中的gm_pids字段信息。

3)根据镜像配置规则,在合适的节点上创建队列的镜像,即执行队列slave创建启动的相关流程。

4)如果slave创建成功,则进行消息同步,然后处理生产者发送消息,消费者消费消息。


2、队列slave启动流程

1)完成自身初始化,创建gm进程,并在rabbit_queue表对应记录中增量更新slave_pids、gm_pids字段

2)与master进行消息的同步

3)同步完成后,在rabbit_queue表对应记录中增量更新sync_slave_pids字段。


这里有几点需要说明:

1)两个节点同时启动,怎么判断谁将是队列的master,谁是队列的slave?

答案是查rabbit_queue表。

rabbitmq启动后,首先读取rabbit_durable_queue表中的数据,确定有哪些持久化队列,然后读取rabbit_queue,确定哪些队列是没有对应的记录的,或者记录的队列PID(队列的master进程)实际并不存在。对于这些队列,会创建队列进程并指定为master,该进程启动运行过程中会将自身的pid写入数据表中,以标识自己就是队列的master进程。

当然,整个过程对数据库的操作是上锁的,并且会在分布式多节点中保持一致性。


2)怎么判断是否有合适的节点?

集群内的节点启动后,首先会进行数据库的同步,一旦同步完成,队列master在启动过程中,就可以根据镜像配置规则选择该节点作为队列的镜像,并在该节点上创建队列的slave进程。

即在队列master进程启动运行过程中,通过rpc在其他节点上创建队列的slave进程。但这个过程仅在队列master启动过程中进行,如果master启动过程中并没有合适的节点,启动完成后,即便有节点启动可以成为成为队列的slave,这个时候队列的master也不会在主动触发创建slave的。

因此,后启动的节点在完成数据库同步后,也会根据镜像配置规则主动创建队列的slave进程。这样,不管怎样的先后启动方式,镜像队列都会是有slave的。


3)队列的监督者进程

创建队列master/slave进程前,都会先创建rabbit_amqqueue_sup_sup这个监督者进程,然后按层级依次创建相关队列进程。

队列master进程rpc调用其他节点创建slave进程时,如果该节点上的rabbit_amqqueue_sup_sup进程还未创建,那么会报一个模式匹配失败的错误导致队列master进程异常退出,随后被自身的监督者进程捕获并重新创建。即master队列异常后,会被重新创建出来,重新创建出来的进程会重新走一遍原来的处理流程。


【问题分析】

有了上面的铺垫,其实已经可以发现一些可疑的地方了,我们用一张图来说明下存在问题的地方。



按时间轴顺序,我们来依次看两个节点的运行流程

时间1:节点A启动。


时间2:节点B启动,并完成与节点A的数据库同步。


时间3:节点A对持久化的队列创建master进程。


时间4:节点A上,队列的master进程启动后,在rabbit_queue表中插入相关记录,然后发现节点B可用,通过rpc在节点B上创建队列的slave。但此时,节点B上rabbit_amqqueue_sup_sup进程未启动,导致master进程crash退出。


时间5:节点B上,rabbit_amqqqueue_sup_sup进程启动,并主动创建队列进程,并指定为slave。


时间6:slave进程查数据库表,发现队列的master进程当前并非存活状态,因此将自身提升为master。


时间7:节点A上, 之前crash的master进程被重新创建出来了。重新创建的master进程,在处理过程中并不会去查看数据库表该队列的相关信息,即仍旧认为自己是master,并且在更新数据库记录时覆盖更新了pid字段,并再次通过rpc在节点B上创建队列的slave。


时间8:在节点B上,队列已经有master进程的情况下,又再次创建了slave进程(节点A远程调用创建的)。


时间9:由于gm_pids字段是增量更新的,master与slave在同步消息的过程中,master会根据该字段将消息同步给所有记录的gm进程。gm进程收到消息后,转发给对应的slave、coordinator进程。也就是说,此时B节点上同时存在两个gm进程,并且都会收到来自节点A的同步消息,一个转发给slave进程,一个转发给coordinator进程。而coordinator进程对这些消息认定为是非法的(因为自己是master,不应该收到这些消息),其处理方式是进程直接退出,进而master进程也跟着退出。


时间10:节点B上的master进程在退出(结束)过程中,会广播停止所有的slave,这样节点B上的slave进程也就跟着停止了。这样也就出现了启动后,队列没有slave的情况了。


附上对应的日志信息(部分信息是修改源码新增的打印,方便问题分析)

A节点日志:

%% master启动2019-10-21 15:26:19.763 [debug] <0.6502.0> Supervisor {<0.6502.0>,rabbit_amqqueue_sup} started rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"celtics_5">>},true,false,none,[],<0.7744.0>,[],[],[],[{vhost,...},...],...}, recovery, <0.6501.0>) at pid <0.6503.0>2019-10-21 15:26:19.793 [debug] <0.7017.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} pid:<0.6503.0>, slave_pids [], gm_pids []2019-10-21 15:26:19.943 [debug] <0.7914.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} pid:<0.6503.0>, slave_pids [], gm_pids [{<0.7275.0>,<0.6503.0>}]%% 对端节点mnesia数据库已经完成同步, 但rabbit_amqqueue_sup_sup进程未启动, 因此这里添加镜像时会匹配失败进程退出%% 进程异常退出,被监督者捕获,然后重新创建master进程2019-10-21 15:26:19.980 [error] <0.7978.0> Restarting crashed queue 'celtics_5' in vhost '/'.** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"celtics_5">>},true,false,none,[],<0.6503.0>,[],[],[],[{vhost,<<"/">>},{name,<<"ha-all">>},{pattern,<<"^">>},{'apply-to',<<"all">>},{definition,[{<<"ha-mode">>,<<"all">>},{<<"ha-sync-mode">>,<<"automatic">>},{<<"max-length">>,60000000},{<<"message-ttl">>,172800000}]},{priority,0}],undefined,[],undefined,live,0,[],<<"/">>,#{user => <<"root">>}},none,false,undefined,undefined,{state,{queue,[],[],0},{active,-576460695583028,1.0}},undefined,undefined,undefined,undefined,{state,fine,20000,undefined},{0,nil},undefined,undefined,undefined,{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},delegate},undefined,undefined,undefined,undefined,'drop-head',0,0,running}2019-10-21 15:26:20.070 [debug] <0.8385.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} pid:<0.7978.0>, slave_pids [], gm_pids [{<0.7275.0>,<0.6503.0>}]2019-10-21 15:26:20.229 [error] <0.6502.0> Supervisor {<0.6502.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"celtics_5">>},true,false,none,[],<0.7744.0>,[],[],[],[{vhost,...},...],...}, recovery, <0.6501.0>) at <0.6503.0> exit with reason no match of right hand value {error,{queue_supervisor_not_found,[]}} in rabbit_amqqueue_sup_sup:start_queue_process/3 line 45 in context child_terminated2019-10-21 15:26:20.391 [debug] <0.6502.0> Supervisor {<0.6502.0>,rabbit_amqqueue_sup} started rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"celtics_5">>},true,false,none,[],<0.7744.0>,[],[],[],[{vhost,...},...],...}, recovery, <0.6501.0>) at pid <0.7978.0>2019-10-21 15:26:20.398 [debug] <0.9599.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} pid:<0.7978.0>, slave_pids [], gm_pids [{<0.8601.0>,<0.7978.0>},{<0.7275.0>,<0.6503.0>}]%% 同上,再次重启创建master进程2019-10-21 15:26:20.439 [error] <0.9635.0> Restarting crashed queue 'celtics_5' in vhost '/'.2019-10-21 15:26:21.255 [debug] <0.10455.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} pid:<0.9635.0>, slave_pids [], gm_pids [{<0.8601.0>,<0.7978.0>},{<0.7275.0>,<0.6503.0>}]2019-10-21 15:26:21.383 [debug] <0.11055.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} pid:<0.9635.0>, slave_pids [], gm_pids [{<0.10536.0>,<0.9635.0>},{<0.8601.0>,<0.7978.0>},{<0.7275.0>,<0.6503.0>}]2019-10-21 15:26:21.390 [info] <0.9635.0> Mirrored queue 'celtics_5' in vhost '/': Adding mirror on node cba@node1: <19334.12919.0>2019-10-21 15:26:22.321 [info] <0.9635.0> Mirrored queue 'celtics_5' in vhost '/': Synchronising: 0 messages to synchronise2019-10-21 15:26:22.321 [info] <0.9635.0> Mirrored queue 'celtics_5' in vhost '/': Synchronising: batch size: 163842019-10-21 15:26:22.322 [info] <0.12522.0> Mirrored queue 'celtics_5' in vhost '/': Synchronising: all slaves already synced** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"celtics_5">>},true,false,none,[],<0.7978.0>,[],[],[],[{vhost,<<"/">>},{name,<<"ha-all">>},{pattern,<<"^">>},{'apply-to',<<"all">>},{definition,[{<<"ha-mode">>,<<"all">>},{<<"ha-sync-mode">>,<<"automatic">>},{<<"max-length">>,60000000},{<<"message-ttl">>,172800000}]},{priority,0}],undefined,[{<0.7275.0>,<0.6503.0>}],[],live,0,[],<<"/">>,#{user => <<"root">>}},none,false,undefined,undefined,{state,{queue,[],[],0},{active,-576460695356014,1.0}},undefined,undefined,undefined,undefined,{state,fine,20000,undefined},{0,nil},undefined,undefined,undefined,{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},delegate},undefined,undefined,undefined,undefined,'drop-head',0,0,running}2019-10-21 15:26:22.384 [error] <0.6502.0> Supervisor {<0.6502.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"celtics_5">>},true,false,none,[],<0.7744.0>,[],[],[],[{vhost,...},...],...}, recovery, <0.6501.0>) at <0.7978.0> exit with reason no match of right hand value {error,{queue_supervisor_not_found,[]}} in rabbit_amqqueue_sup_sup:start_queue_process/3 line 45 in context child_terminated2019-10-21 15:26:22.386 [debug] <0.6502.0> Supervisor {<0.6502.0>,rabbit_amqqueue_sup} started rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"celtics_5">>},true,false,none,[],<0.7744.0>,[],[],[],[{vhost,...},...],...}, recovery, <0.6501.0>) at pid <0.9635.0>


B节点日志:

%% 主动添加镜像2019-10-21 15:26:20.976 [info] <0.6171.0> Mirrored queue 'celtics_5' in vhost '/': Adding mirror on node cba@node1: <0.11086.0>2019-10-21 15:26:20.976 [debug] <0.11085.0> Supervisor {<0.11085.0>,rabbit_amqqueue_sup} started rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"celtics_5">>},true,false,none,[],<23290.7978.0>,[],[],[],[{...},...],...}, slave, <0.11084.0>) at pid <0.11086.0>2019-10-21 15:26:20.982 [debug] <0.11154.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} pid:<23290.7978.0>, slave_pids [<0.11086.0>], gm_pids [{<0.11089.0>,<0.11086.0>},{<23290.8601.0>,<23290.7978.0>},{<23290.7275.0>,<23290.6503.0>}]%% 发现当前master挂了, 被提升为master2019-10-21 15:26:20.985 [debug] <0.11212.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} pid:<0.11086.0>, slave_pids [], gm_pids [{<0.11089.0>,<0.11086.0>}]2019-10-21 15:26:20.987 [info] <0.11086.0> Mirrored queue 'celtics_5' in vhost '/': Slave <cba@node1.3.11086.0> saw deaths of mirrors <cba@node2.3.7978.0> <cba@node2.3.6503.0>2019-10-21 15:26:20.987 [info] <0.11086.0> Mirrored queue 'celtics_5' in vhost '/': Promoting slave <cba@node1.3.11086.0> to master%% 进行消息同步2019-10-21 15:26:20.987 [info] <0.11086.0> Mirrored queue 'celtics_5' in vhost '/': Synchronising: 0 messages to synchronise2019-10-21 15:26:20.987 [info] <0.11086.0> Mirrored queue 'celtics_5' in vhost '/': Synchronising: batch size: 163842019-10-21 15:26:20.987 [info] <0.11231.0> Mirrored queue 'celtics_5' in vhost '/': Synchronising: all slaves already synced%% 对端master 添加 slave 创建的进程%% 注 数据库信息已经被覆盖2019-10-21 15:26:21.365 [debug] <0.12916.0> Supervisor {<0.12916.0>,rabbit_amqqueue_sup} started rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"celtics_5">>},true,false,none,[],<23290.9635.0>,[],[],[],[{...},...],...}, slave, <0.12915.0>) at pid <0.12919.0>2019-10-21 15:26:21.446 [debug] <0.14640.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} pid:<23290.9635.0>, slave_pids [<0.12919.0>], gm_pids [{<0.12944.0>,<0.12919.0>},{<23290.10536.0>,<23290.9635.0>},{<23290.8601.0>,<23290.7978.0>},{<23290.7275.0>,<23290.6503.0>}]%% 发现对端的master挂了2019-10-21 15:26:21.486 [info] <0.12919.0> Mirrored queue 'celtics_5' in vhost '/': Slave <cba@node1.3.12919.0> saw deaths of mirrors <cba@node2.3.7978.0> <cba@node2.3.6503.0>2019-10-21 15:26:21.533 [debug] <0.15157.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} pid:<23290.9635.0>, slave_pids [<0.12919.0>], gm_pids [{<0.12944.0>,<0.12919.0>},{<23290.10536.0>,<23290.9635.0>},{<23290.8601.0>,<23290.7978.0>},{<23290.7275.0>,<23290.6503.0>}]%% 本节点master进程退出2019-10-21 15:26:21.535 [warning] <0.11086.0> Mirrored queue 'celtics_5' in vhost '/': Stopping all nodes on master shutdown since no synchronised slave is available2019-10-21 15:26:21.535 [debug] <0.11086.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} stop all slaves with reason: shutdown2019-10-21 15:26:21.543 [debug] <0.15159.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} pid:<23290.9635.0>, slave_pids [<0.12919.0>], gm_pids [{<0.12944.0>,<0.12919.0>},{<23290.10536.0>,<23290.9635.0>},{<23290.8601.0>,<23290.7978.0>},{<23290.7275.0>,<23290.6503.0>}]2019-10-21 15:26:36.536 [warning] <0.11086.0> Mirrored queue 'celtics_5' in vhost '/': Missing 'DOWN' message from <0.12919.0> in node cba@node12019-10-21 15:26:39.557 [debug] <0.16110.0> hncscwc {resource,<<"/">>,queue,<<"celtics_5">>} pid:<23290.9635.0>, slave_pids [], gm_pids []


从上面的分析中可以总结出问题的本质其实有两个方面

1、master进程启动添加slave时会有概率出现crash。这个问题github上有人反馈过(https://github.com/rabbitmq/rabbitmq-server/issues/2009),但代码维护者一方面推荐使用3.8.0的新特性quorum队列,一方面表示不解决此问题,因为同时停止启动的场景不在考虑范围内。

2、节点间通信的时序问题,在master进程crash还未被重新创建时,slave进程恰好启动并感知master进程不在,进而将自己提升为master;而crash后重新创建的master进程,不会读取数据库中的信息判断后再做处理,而是把自己当成是master,按照master的逻辑走对应的流程。导致同时出现两个master进程,并最终导致其中一个退出,顺带也停止了所有的slave。


其他几个现象,本质上也是因为这两个方面引发的,不同的时序导致了不同的现象。


【解决办法】

对于该问题没有好的解决办法,因为队列master进程启动概率出现crash暂时无法解决,时序问题就会导致有一定概率出现上面的现象。因此避免同时启停是一种手段。另外一种规避手段:将ha-promote-on-failure设置为when-synced,即master异常后,如果slave还未完成同步的情况下,不会提升为master(详细描述见这里)。实测了一段时间,暂时未出现上述现象。


本文分享自微信公众号 - hncscwc(gh_383bc7486c1a)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部