文档章节

rabbitmq——prefetch count

hncscwc
 hncscwc
发布于 2014/01/24 15:32
字数 1071
阅读 6074
收藏 13

消费者在开启acknowledge的情况下,对接收到的消息可以根据业务的需要异步对消息进行确认。

然而在实际使用过程中,由于消费者自身处理能力有限,从rabbitmq获取一定数量的消息后,希望rabbitmq不再将队列中的消息推送过来,当对消息处理完后(即对消息进行了ack,并且有能力处理更多的消息)再接收来自队列的消息。在这种场景下,我们可以通过设置basic.qos信令中的prefetch_count来达到这种效果。

先直观的看看设置了prefetch_count的效果,:

1) 对比测试:两个消费者都订阅同一队列,no_ack均设置为false即开启acknowledge机制,且均未设置prefetch_count,向队列发布5条消息

结果:不管消息是否被ack,rabbitmq会轮流向两个消费者投递消息,第一个消费者收到"1","3","5"三条消息, 第二个消费者收到"2","4"两条消息。

2)prefetch_count设置测试:两个消费者都订阅同一队列,开启acknowledge机制,第一个消费者prefetch_count设置为1,另一个消费者未设置prefetch_count,同样向队列发布5条消息

结果:rabbitmq向第一个消费者投递了一条消息后,消费者未对该消息进行ack,rabbitmq不会再向该消费者投递消息,剩下的四条消息均投递给了第二个消费者

看完效果后,再来看看rabbitmq里的一些实现。

1. rabbitmq对basic.qos信令的处理

首先,basic.qos是针对channel进行设置的,也就是说只有在channel建立之后才能发送basic.qos信令。

在rabbitmq的实现中,每个channel都对应会有一个rabbit_limiter进程,当收到basic.qos信令后,在rabbit_limiter进程中记录信令中prefetch_count的值,同时记录的还有该channel未ack的消息个数。

注:其实basic.qos里还有另外两个参数可进行设置(global和prefetch_size),但rabbitmq没有相应的实现。

2. 队列中的消息投递给消费者时的处理

当rabbitmq要将队列中的一条消息投递给消费者时,会遍历该队列上的消费者列表,选一个合适的消费者,然后将消息投递出去。其中挑选消费者的一个依据就是看消费者对应的channel上未ack的消息数是否达到设置的prefetch_count个数,如果未ack的消息数达到了prefetch_count的个数,则不符合要求。当挑选到合适的消费者后,中断后续的遍历。

rabbit_amqqueue_process.erl

deliver_msgs_to_consumers(_DeliverFun, true, State) ->
    {true, State};
deliver_msgs_to_consumers(DeliverFun, false,
                          State = #q{active_consumers =
                                     ActiveConsumers}) ->
    case priority_queue:out_p(ActiveConsumers) of
        {empty, _} ->
            {false, State};
        {{value, QEntry, Priority}, Tail} ->
            {Stop, State1} =
                deliver_msg_to_consumer(DeliverFun, QEntry,
                                        Priority,
                                        State#q{active_consumers =
                                                Tail}),
            %%如果处理结果为false,遍历下一个消费者
            deliver_msgs_to_consumers(DeliverFun, Stop, State1)
    end.

deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer},
                        Priority, State) ->
    ...
    %%判断是否可以将消息投递给该消费者
    case rabbit_limiter:can_send(C#cr.limiter,
                                 Consumer#consumer.ack_required,
                                 Consumer#consumer.tag) of
        %%可以投递,再将该消费者放到队列的尾部
        {continue, Limiter} ->
            AC1 = priority_queue:in(E, Priority,
                                    State#q.active_consumers),
            %%将消息投递给消费者
            deliver_msg_to_consumer0(DeliverFun, Consumer,
                                     C#cr{limiter = Limiter},
                                     State#q{active_consumers = AC1})
    ...

rabbit_limiter.erl

handle_call({can_send, QPid, AckRequired}, _From,
            State = #lim{volume = Volume}) ->
    case prefetch_limit_reached(State) of
        %%未ack的消息数达到prefetch_count设置的个数
        true  -> {reply, false, limit_queue(QPid, State)};
        false -> {reply, true,
                  %%消息需要被ACK, volume加1
                  State#lim{volume = if AckRequired -> Volume + 1;
                                        true        -> Volume
                                     end}}
    end

prefetch_limit_reached(#lim{prefetch_count = Limit, 
                            volume = Volume}) ->
    Limit =/= 0 andalso Volume >= Limit.



3. 消费者对消息ack后的处理

当消费者对消息进行ack后,最终会修改该消费者对应channel中未ack的消息数,这样队列又可以将消息投递给该消费者。

rabbit_limiter.erl

handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
    NewVolume = if Volume == 0 -> 0;
                   true        -> Volume - Count
                end,
    {noreply, maybe_notify(State, State#lim{volume = NewVolume})};



4. 扩展

在AMQP协议(0-9-1)中,有这么一段话

对于rabbitmq来说,最后一句话其实说的就是使用了acknowledge机制情况下,使用prefetch_count进行流量控制。另外在实际研究过程中发现还有channel.flow以及basic.credit(应该属于AMQP 1.0协议)可以进行一些控制,这里没有展开,有时间会研究下相应的机制以及可能使用的场景。

© 著作权归作者所有

共有 人打赏支持
上一篇: haproxy——配置
hncscwc
粉丝 67
博文 70
码字总数 76137
作品 0
杭州
程序员
私信 提问
加载中

评论(1)

chaun
chaun
79
python使用rabbitMQ介绍二(工作队列模式)

一模式介绍 第一章节的生产-消费者模式,是非常简单的模式,一发一收。在实际的应用中,消费者有的时候需要工作较长的时间,则需要增加消费者。 这时mq实现了一下几个功能: rabbitmq循环调度...

MyStitch
07/30
0
0
RabbitMQ安装和使用详解(转载+实践)

1.解压缩.tar.gz文件: tar -zxvf 文件名称 mv 目标文件 目的地址 --移动文件 mv 原名称 新名称 --重命名 2.配置环境变量 # vi profile --编辑配置文件 在文档的最后添加: export PATH=$PAT...

hanzhankang
2014/02/20
0
0
Rabbitmq-server 分析

一、 发送端: 接收端: 验证: 在多个shell窗口中运行consumer,然后运行procedure,发现每个consumer 轮流接收相同队列中的消息。 二、 发送端: 接收端: (1) rabbitmq循环调度,将消息循...

meteor_hy
05/31
0
0
(六)RabbitMQ消息队列-消息任务分发与消息ACK确认机制(PHP版)

在前面一章介绍了在PHP中如何使用RabbitMQ,至此入门的的部分就完成了,我们内心中一定还有很多疑问:如果多个消费者消费同一个队列怎么办?如果这几个消费者分任务的权重不同怎么办?怎么把...

Super_RD
2017/04/26
0
0
RabbitMQ-C客户端使用说明

rabbitmq-c是一个用于C语言的,与AMQP server进行交互的client库,AMQP协议为版本0-9-1。rabbitmq-c与server进行交互前需要首先进行login操作,在操作后,可以根据AMQP协议规范,执行一系列操...

龙鸟
2012/09/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

高并发编程:解析HashMap

底层实现原理 在JDK1.8以前版本中,HashMap的实现是数组+链表,它的缺点是即使哈希函数选择的再好,也很难达到元素百分百均匀分布,而且当HashMap中有大量元素都存到同一个桶中时,这个桶会有...

小刀爱编程
35分钟前
0
0
程序员请不要假装很努力,因为结果不会陪你演戏

前言: 我一直相信这样一句话:真正的危机,来源于在正确的时间做不正确的事。没有在正确的时间,为下一步做出积累,这才是危机的根源。 比如,当你迈过了30岁这个坎,你的能力还局限于程序的...

Java干货分享
41分钟前
2
0
Fio随机读IOPS测试值可能偏大的原因分析

问题描述: 在使用fio进行虚拟机磁盘(Ceph的RBD,格式化为ext4文件系统)的IOPS测试时,发现randread比预估值高许多; 在使用相同参数进行randwrite测试之后,再进行randread时会出现此现象...

LastRitter
44分钟前
2
0
JavaScript引用类型Object常见用法实例分析

1、JavaScript数据类型 (1)基本类型 5种基本类型:Undefined、Null、Boolean、Number、String (2)引用类型 5种引用类型:Object、Array、Date、RepExp、Function (3)基本类型与引用类型的异同...

peakedness丶
51分钟前
1
0
教你理清SpringBoot与SpringMVC的关系

spring boot就是一个大框架里面包含了许许多多的东西,其中spring就是最核心的内容之一,当然就包含spring mvc。spring mvc 是只是spring 处理web层请求的一个模块。因此他们的关系大概就是这...

别打我会飞
56分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部