文档章节

RabbitMQ erlang "Routing"

nao
 nao
发布于 2015/08/01 14:25
字数 948
阅读 352
收藏 3

官方网址:http://www.rabbitmq.com/tutorials/tutorial-four-python.html

前面的例子中,我们构建了一个简单的日志系统。我们可以广播日志消息给所有的接收者。

在这个例子中,我们准备增加一个新特性。我们将能仅仅订阅消息的一部分。 例如:我们直接仅仅把 critical error 类型的消息写入日志文件(保存到磁盘空间),然而还能够打印所有的日志消息到控制台。


Bindings(绑定)

在前面的例子中,我们已经创建bindings. 如下:

amqp_channel:call(Channel, #'queue.bind'{exchange = <<"logs">>,
                                             queue = Queue}),

binding 把 exchange和queue联系了起来。这个可以简单的理解为:这个queue对这个exchage感兴趣。

Bindings 可以带上额外的routing_key 参数。 为了取消对 basic_publish 参数的迷惑,我们将会称它为 binding key. 以下是我们如何通过一个 key 创建一个绑定。

[amqp_channel:call(Channel, #'queue.bind'{exchange = <<"direct_logs">>,
                                              routing_key = list_to_binary(Severity),
                                              queue = Queue})
     || Severity <- Argv],

binding key 的意义依赖于exchange 的类型。前面我们使用的 fanout 类型的exchange 会忽略 binding key的值。


Direct exchange

    前面的日志系统例子会广播所有的消息给所有消费者。我们想要扩展它使其允许过滤信息。

    使用 fanout 类型的 exchange,  将不会有太多的灵活性,仅仅能做广播。

    使用 direct类型的 exchange , direct exchange 的算法很简单,消息的routing key 和队列的 binding key 相同的消息会被投递给这个队列。看下图:

    这个设置,我们可以看到有两个队列绑定到了一个 direct 类型的exchange。 Q1的 binding key 是orange,

Q2的binding key 是 black 和 green.

    在这个系统中,如果消息带的routing key 是orange, 则此消息会被投递给Q1, 如果消息的routing key是black 或者是green, 则这些消息会被投递给Q2,其他的消息则会被丢弃。


Multiple bindings

相同的binding 可以绑定多个 queues . 在上图中,Q1和Q2的binding key都是 black ,这种情况下,direct 类型的exchange会和 fanout类型一样,广播消息给所有匹配的queues.routing key是black 的消息会被发送到Q1和Q2。


Emitting logs

    我们的日志系统将会使用上面的模型, 我们将发送消息到 direct 类型的exchange来代替 fanout类型。 我们将提供日志的严重性作为 routing key。 接收消息的应用程序将能选择它想要接收的严重性的消息。让我们首先专注于发送日志。

    首先,我们需要创建一个 exchange:

amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"direct_logs">>,
                                                   type = <<"direct">>}),

  另外,我们准备发送消息:

amqp_channel:cast(Channel,
                      #'basic.publish'{
                        exchange = <<"direct_logs">>,
                        routing_key = Severity},
                      #amqp_msg{payload = Message}),

一个简单的事情:我们将评估严重性为: 'info','warning','error'中的其中一种。


Subscribing(订购)

    接收消息的程序和前面的例子差不多,一个不同: 我们将会为我们感兴趣的每个严重性创建一个新的binding。

[amqp_channel:call(Channel, #'queue.bind'{exchange = <<"direct_logs">>,
                                              routing_key = list_to_binary(Severity),
                                              queue = Queue})
     || Severity <- Argv],

Putting it all together

    

emit_log_direct.erl

-module(emit_log_direct).
-compile([export_all]).
-include_lib("amqp_client/include/amqp_client.hrl").

main(Argv) ->
    {ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

    amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"direct_logs">>,
                                                   type = <<"direct">>}),

    {Severity, Message} = case Argv of
                              [] ->
                                  {<<"info">>, <<"Hello World!">>};
                              [S] ->
                                  {list_to_binary(S), <<"Hello World!">>};
                              [S | Msg] ->
                                  {list_to_binary(S), list_to_binary(string:join(Msg, " "))}
                          end,
    amqp_channel:cast(Channel,
                      #'basic.publish'{
                        exchange = <<"direct_logs">>,
                        routing_key = Severity},
                      #amqp_msg{payload = Message}),
    io:format(" [x] Sent ~p:~p~n", [Severity, Message]),
    ok = amqp_channel:close(Channel),
    ok = amqp_connection:close(Connection),
    ok.

receive_logs_direct.erl

-module(receive_logs_direct).
-compile([export_all]).

-include_lib("amqp_client/include/amqp_client.hrl").

main(Argv) ->
    {ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

    amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"direct_logs">>,
                                                   type = <<"direct">>}),

    #'queue.declare_ok'{queue = Queue} =
        amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),

    [amqp_channel:call(Channel, #'queue.bind'{exchange = <<"direct_logs">>,
                                              routing_key = list_to_binary(Severity),
                                              queue = Queue})
     || Severity <- Argv],

    io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),

    amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue,
                                                     no_ack = true}, self()),
    receive
        #'basic.consume_ok'{} -> ok
    end,
    loop(Channel).

loop(Channel) ->
    receive
        {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Body}} ->
            io:format(" [x] ~p:~p~n", [RoutingKey, Body]),
            loop(Channel)
    end.


© 著作权归作者所有

共有 人打赏支持
nao

nao

粉丝 28
博文 155
码字总数 108154
作品 0
成都
后端工程师
私信 提问
加载中

评论(3)

nao
nao

引用来自“hncscwc”的评论

感觉都要快看不懂了 惭愧惭愧~~
俺还在入门的边缘徘徊,岂不是更惭愧~~那么晚还在学习,佩服,佩服
hncscwc
hncscwc
感觉都要快看不懂了 惭愧惭愧~~
s
shaw007
1010101010
【Python模块】rabbitMQ

RabbitMQ介绍: 父进程与子进程间,同一父继承可以用multiprocess的Manager模块来实现数据互访。 作用:RabbitMQ是为了实现相互独立的两个进程数据互访。 应用场景:不需要立即操作的数据。比...

等你的破船
2018/08/13
0
0
win7下安装RabbitMQ消息服务器 + 读写队列

RabbitMQ是什么 ? RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。 1:安装RabbitMQ需要先安装Erlang语言开发包。下载地址 http://www.e...

gavin
2015/07/31
0
0
Spring Boot整合RabbitMQ实例

什么是消息? 消息是一个或者多个实体之间沟通的一种方式并且无处不在。 自从计算机发明以来,计算机以多种多样的方式发送消息,消息定义了软硬件或者应用程序之间的沟通方式。消息总是有一个...

英雄有梦没死就别停
2018/06/27
0
0
消息队列RabbitMQ入门介绍

(一)基本概念 RabbitMQ是流行的开源消息队列系统,用erlang语言开发。我曾经对这门语言挺有兴趣,学过一段时间,后来没坚持。RabbitMQ是AMQP(高级消息队列协议)的标准实现。如果不熟悉A...

icheer
2013/10/09
0
0
RabbitMQ 的安装与工作模式

RabbitMQ 概念: 交换机(exchange type)把消息推送到队列的方法: fanout:不处理路由键,转发到所有绑定的队列上 direct:处理路由键,必须完全匹配,即路由键字符串相同才会转发 topic:路由...

求学ing
2014/11/11
0
0

没有更多内容

加载失败,请刷新页面

加载更多

AWS的自动部署工具codedeploy 部署前的准备工作

开始部署codedeploy: 1.先预置IAM用户: 创建一个IAM用户或使用一个与AWS相关联的用户; 复制以下的策略附加到IAM用户,向IAM用户赋予对codedeploy(及codedeploy所依赖的AWS服务和操作)的...

守护-创造
8分钟前
0
0
这可能是最详细的一线大厂Mysql面试题详解了

1、MySQL的复制原理以及流程 基本原理流程,3个线程以及之间的关联; 主:binlog线程——记录下所有改变了数据库数据的语句,放进master上的binlog中; 从:io线程——在使用start slave 之后...

Java干货分享
18分钟前
0
0
人的精力是什么?如何强化精力

人的精力是什么? 人的精力是什么? 精力指精神和体力。精神包括一个人的精神状态,兴奋度,做事情的投入度,专注度,持续时间等。 人的精力来源 人的精力有4种来源,身体的、情感的、思想的和...

莫库什勒
36分钟前
0
0
JFinal开发的旅游线路营销Saas平台演示系统我部署了一个

今天部署了一个旅游线路营销管理系统的演示版: 演示地址:http://lvyou.jfinalxueyuan.com 演示账号:(暂时只给一个门店版的吧,批发商和总部的如果需要 演示看看 单独联系我微信:1876673...

山东-小木
今天
2
0
如何学习大数据技术

学习大数据技术,首先要明确大数据的概念。 大数据的概念作者认为有如下几点: 1.数据的来源多样性。例如关系数据库+文本+excel等 2.数据量大。TB级别的数据。 3.业务应用领域。实时性高与实...

董黎明
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部