文档章节

rabbitmq——prefetch count

hncscwc
 hncscwc
发布于 2014/01/24 15:32
字数 1071
阅读 5877
收藏 13

消费者在开启acknowledge的情况下,对接收到的消息可以根据业务的需要异步对消息进行确认。

然而在实际使用过程中,由于消费者自身处理能力有限,从rabbitmq获取一定数量的消息后,希望rabbitmq不再将队列中的消息推送过来,当对消息处理完后(即对消息进行了ack,并且有能力处理更多的消息)再接收来自队列的消息。在这种场景下,我们可以通过设置basic.qos信令中的prefetch_count来达到这种效果。

先直观的看看设置了prefetch_count的效果,:

1) 对比测试:两个消费者都订阅同一队列,no_ack均设置为false即开启acknowledge机制,且均未设置prefetch_count,向队列发布5条消息

结果:不管消息是否被ack,rabbitmq会轮流向两个消费者投递消息,第一个消费者收到"1","3","5"三条消息, 第二个消费者收到"2","4"两条消息。

2)prefetch_count设置测试:两个消费者都订阅同一队列,开启acknowledge机制,第一个消费者prefetch_count设置为1,另一个消费者未设置prefetch_count,同样向队列发布5条消息

结果:rabbitmq向第一个消费者投递了一条消息后,消费者未对该消息进行ack,rabbitmq不会再向该消费者投递消息,剩下的四条消息均投递给了第二个消费者

看完效果后,再来看看rabbitmq里的一些实现。

1. rabbitmq对basic.qos信令的处理

首先,basic.qos是针对channel进行设置的,也就是说只有在channel建立之后才能发送basic.qos信令。

在rabbitmq的实现中,每个channel都对应会有一个rabbit_limiter进程,当收到basic.qos信令后,在rabbit_limiter进程中记录信令中prefetch_count的值,同时记录的还有该channel未ack的消息个数。

注:其实basic.qos里还有另外两个参数可进行设置(global和prefetch_size),但rabbitmq没有相应的实现。

2. 队列中的消息投递给消费者时的处理

当rabbitmq要将队列中的一条消息投递给消费者时,会遍历该队列上的消费者列表,选一个合适的消费者,然后将消息投递出去。其中挑选消费者的一个依据就是看消费者对应的channel上未ack的消息数是否达到设置的prefetch_count个数,如果未ack的消息数达到了prefetch_count的个数,则不符合要求。当挑选到合适的消费者后,中断后续的遍历。

rabbit_amqqueue_process.erl

deliver_msgs_to_consumers(_DeliverFun, true, State) ->
    {true, State};
deliver_msgs_to_consumers(DeliverFun, false,
                          State = #q{active_consumers =
                                     ActiveConsumers}) ->
    case priority_queue:out_p(ActiveConsumers) of
        {empty, _} ->
            {false, State};
        {{value, QEntry, Priority}, Tail} ->
            {Stop, State1} =
                deliver_msg_to_consumer(DeliverFun, QEntry,
                                        Priority,
                                        State#q{active_consumers =
                                                Tail}),
            %%如果处理结果为false,遍历下一个消费者
            deliver_msgs_to_consumers(DeliverFun, Stop, State1)
    end.

deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer},
                        Priority, State) ->
    ...
    %%判断是否可以将消息投递给该消费者
    case rabbit_limiter:can_send(C#cr.limiter,
                                 Consumer#consumer.ack_required,
                                 Consumer#consumer.tag) of
        %%可以投递,再将该消费者放到队列的尾部
        {continue, Limiter} ->
            AC1 = priority_queue:in(E, Priority,
                                    State#q.active_consumers),
            %%将消息投递给消费者
            deliver_msg_to_consumer0(DeliverFun, Consumer,
                                     C#cr{limiter = Limiter},
                                     State#q{active_consumers = AC1})
    ...

rabbit_limiter.erl

handle_call({can_send, QPid, AckRequired}, _From,
            State = #lim{volume = Volume}) ->
    case prefetch_limit_reached(State) of
        %%未ack的消息数达到prefetch_count设置的个数
        true  -> {reply, false, limit_queue(QPid, State)};
        false -> {reply, true,
                  %%消息需要被ACK, volume加1
                  State#lim{volume = if AckRequired -> Volume + 1;
                                        true        -> Volume
                                     end}}
    end

prefetch_limit_reached(#lim{prefetch_count = Limit, 
                            volume = Volume}) ->
    Limit =/= 0 andalso Volume >= Limit.



3. 消费者对消息ack后的处理

当消费者对消息进行ack后,最终会修改该消费者对应channel中未ack的消息数,这样队列又可以将消息投递给该消费者。

rabbit_limiter.erl

handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
    NewVolume = if Volume == 0 -> 0;
                   true        -> Volume - Count
                end,
    {noreply, maybe_notify(State, State#lim{volume = NewVolume})};



4. 扩展

在AMQP协议(0-9-1)中,有这么一段话

对于rabbitmq来说,最后一句话其实说的就是使用了acknowledge机制情况下,使用prefetch_count进行流量控制。另外在实际研究过程中发现还有channel.flow以及basic.credit(应该属于AMQP 1.0协议)可以进行一些控制,这里没有展开,有时间会研究下相应的机制以及可能使用的场景。

© 著作权归作者所有

共有 人打赏支持
hncscwc
粉丝 66
博文 70
码字总数 76137
作品 0
杭州
程序员
加载中

评论(1)

chaun
chaun
79
RabbitMQ安装和使用详解(转载+实践)

1.解压缩.tar.gz文件: tar -zxvf 文件名称 mv 目标文件 目的地址 --移动文件 mv 原名称 新名称 --重命名 2.配置环境变量 # vi profile --编辑配置文件 在文档的最后添加: export PATH=$PAT...

hanzhankang
2014/02/20
0
0
python使用rabbitMQ介绍二(工作队列模式)

一模式介绍 第一章节的生产-消费者模式,是非常简单的模式,一发一收。在实际的应用中,消费者有的时候需要工作较长的时间,则需要增加消费者。 这时mq实现了一下几个功能: rabbitmq循环调度...

MyStitch
07/30
0
0
Rabbitmq-server 分析

一、 发送端: 接收端: 验证: 在多个shell窗口中运行consumer,然后运行procedure,发现每个consumer 轮流接收相同队列中的消息。 二、 发送端: 接收端: (1) rabbitmq循环调度,将消息循...

meteor_hy
05/31
0
0
RabbitMQ-C客户端使用说明

rabbitmq-c是一个用于C语言的,与AMQP server进行交互的client库,AMQP协议为版本0-9-1。rabbitmq-c与server进行交互前需要首先进行login操作,在操作后,可以根据AMQP协议规范,执行一系列操...

龙鸟
2012/09/20
0
0
RabbitMQ 入门【精+转】

rabbitmq可以用一本书取讲,这里只是介绍一些使用过程中,常用到的基本的知识点。 官方文档覆盖的内容,非常全面:http://www.rabbitmq.com/documentation.html 。 1. 介绍 RabbitMQ,即消息...

sunsky303
05/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周一乱弹 —— 鱼生不值得

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @瘟神灬念:分享新裤子的单曲《没有理想的人不伤心 (Remix版)》: 《没有理想的人不伤心 (Remix版)》- 新裤子 手机党少年们想听歌,请使劲儿戳...

小小编辑
14分钟前
3
3
arts-week10

Algorithm 905. Sort Array By Parity - LeetCode Review Who’s Afraid of the Big Bad Preloader? 一文读懂前端缓存 一个网络请求3个步骤:请求,处理,响应,而前端缓存主要在请求处响应这两步...

yysue
今天
4
0
00.编译OpenJDK-8u40的整个过程

前言 历经2天的折腾总算把OpenJDK给编译成功了,要说为啥搞这个,还得从面试说起,最近出去面试经常被问到JVM的相关东西,总感觉自己以前学的太浅薄,所以回来就打算深入学习,目标把《深入理...

凌晨一点
今天
5
0
python: 一些关于元组的碎碎念

初始化元组的时候,尤其是元组里面只有一个元素的时候,会出现一些很蛋疼的情况: def checkContentAndType(obj): print(obj) print(type(obj))if __name__=="__main__": tu...

Oh_really
昨天
6
2
jvm crash分析工具

介绍一款非常好用的jvm crash分析工具,当jvm挂掉时,会产生hs_err_pid.log。里面记录了jvm当时的运行状态以及错误信息,但是内容量比较庞大,不好分析。所以我们要借助工具来帮我们。 Cras...

xpbob
昨天
176
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部