文档章节

RabbitMQ erlang "topics"

nao
 nao
发布于 2015/08/03 13:01
字数 1005
阅读 635
收藏 9

    原文链接:http://www.rabbitmq.com/tutorials/tutorial-five-python.html

    在前面的例子中我们改进了我们的日志系统。使用 fanout 类型的exchage 只能广播消息。我们使用 direct 来代替,获得了选择性接收消息的可能。

    虽然使用direct类型的exchange改善了我们的系统,但它仍然有缺陷,它不能基于多种条件 进行routing。

    在我们的日志系统中,我们可能想要订阅日志不仅基于严重性程度,而且基于发布日志的源码。你可能知道 syslog unix tool 中的概念, 那个基于 严格的(info/warning/crit...)和灵巧的(auth/cron/kern)。
    这将给我们许多灵活性——我们可能想听仅来自于 'corn'日志中的 严重的错误和kern 里面的所有日志。

    在我们的日志系统为了实现那个,我们需要学习更复杂的 topic exchange.


    Topic exchange

发送给topic exchange的消息不能随便定义一个routing key——routing key 必须是一个由”.”号分隔的单词列表。单词可以是任意的,但是一般情况下它应该代表消息的一些特征。比如下面一些routing key :

“stock.usd.nyse”  “nyse.vmw”  “quick.orange.rabbit”

Routing key 的最大长度限制为255字节。

队列的binding key 也必须是同样的格式。Topic exchange的逻辑跟direct exchange差不多——带有指定routing key的消息将会被发送到有相同binding key 的消息队列中。

Binding key 有两特例:

  • *(star) 代表一个单词

  • #(hash) 代表0个或者多个单词

    如下面的例子

在这个例子中我们将发送描述动物的消息。消息的routing key 由三个单词组成,第一个单词代表动物的速度,第二个单词代表动物的颜色,第三个单词代表动物的种类。”<celerity>.<colour>.<species>”

我们创建三个绑定:Q1使用binding key “*.orange.*”, Q2使用binding key “*.*.rabbit” 和”lazy.#”。 也就是说Q1对颜色为orange的动物感兴趣,而Q2对所有的rabbit和速度为lazy的动物感兴趣。

routing key 为'qucik.orange.rabbit'的消息将会被发送Q1和Q2队列, "lazy.orange.elephant"消息也会发送到Q1和Q2。 “quick.orange.fox”仅仅去Q1,“lazy.brown.fox”仅去Q2.  "lazy.pink.rabbit"将会被发送给Q2一次,虽然它匹配两个bindings. “quick.brown.fox”不匹配任意binding ,所以它被丢弃。

如果我们打破规则,发送一个单词或者四个单词的消息将会发生什么,如: “orange”或"quick.orange.male.rabbit"?好吧,这个消息将不匹配任何一个bindings,将会被丢弃。

另一种情况,“lazy.orange.male.rabbit”, 虽然他也是四个单词,但是它将会匹配Q2 binding 将会被发送到Q2。因为匹配“lazy.#”

    topic exchange 是强大的,你能表现的像其他的exchanges.

    当一个队列的 binding key 是"#"(hash) ——它将收到所有的消息,无视 routing key :向 fanout exchange.

    当 bindings里没有 使用"*"和"#"特殊字符时, topic echange 和 direct exchange 行为将会一样。


Putting it all together

We're going to use a topic exchange in our logging system. We'll start off with a working assumption that the routing keys of logs will have two words: "<facility>.<severity>".

emit_log_topic.erl

-module(emit_log_topic).
-compile([export_all]).

-include_lib("amqp_client/include/amqp_client.hrl").

main(Argv) ->
    {ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

    amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"topic_logs">>,
                                                   type = <<"topic">>}),

    {RoutingKey, Message} = case Argv of
                                [] ->
                                    {<<"anonymous.info">>, <<"Hello World!">>};
                                [R] ->
                                    {list_to_binary(R), <<"Hello World!">>};
                                [R | Msg] ->
                                    {list_to_binary(R), list_to_binary(string:join(Msg, " "))}
                            end,
    amqp_channel:cast(Channel,
                      #'basic.publish'{
                        exchange = <<"topic_logs">>,
                        routing_key = RoutingKey},
                      #amqp_msg{payload = Message}),
    io:format(" [x] Sent ~p:~p~n", [RoutingKey, Message]),
    ok = amqp_channel:close(Channel),
    ok = amqp_connection:close(Connection),
    ok.

receiver_logs_topic.erl

-module(receive_logs_topic).
-compile([export_all]).

-include_lib("amqp_client/include/amqp_client.hrl").

main(Argv) ->
    {ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

    amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"topic_logs">>,
                                                   type = <<"topic">>}),

    #'queue.declare_ok'{queue = Queue} =
        amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),

    [amqp_channel:call(Channel, #'queue.bind'{exchange = <<"topic_logs">>,
                                              routing_key = list_to_binary(BindingKey),
                                              queue = Queue})
     || BindingKey <- Argv],

    io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),

    amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue,
                                                     no_ack = true}, self()),
    receive
        #'basic.consume_ok'{} -> ok
    end,
    loop(Channel).

loop(Channel) ->
    receive
        {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Body}} ->
            io:format(" [x] ~p:~p~n", [RoutingKey, Body]),
            loop(Channel)
    end.

 接收所有的日志

接收所有来自 facility是 "kern"的日志

接收“critical”日志:

创建多个bindings:

发送一个 “kern.critical”类型 routing key 的日志:

  


© 著作权归作者所有

共有 人打赏支持
nao

nao

粉丝 28
博文 155
码字总数 108154
作品 0
成都
后端工程师
私信 提问
CentOS6.7系统安装RabbitMQ3.7.8集群环境

一、安装Erlang 1、RabbitMQ3.7.8 对Erlang/OTP的版本要求 RabbitMQ3.7.8 要求Erlang/OTP版本:19.3.6.4 ~ 21.0.x 这个链接(http://www.rabbitmq.com/which-erlang.html)有详细说明. 2、Erl......

静夜明灯
2018/10/07
0
0
rabbitmq-server 安装

一,安装rabbitmq-server 1.安装erlang wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm rpm -Uvh erlang-solutions-1.0-1.noarch.rpm rpm --import https:/......

丿小贰丶
2018/05/08
0
0
ubuntu下安装rabbitMQ

一.环境准备 rabbitMQ需要erlang语言的支持,因此需要先安装erlang语言 二.下载 http://www.erlang.org/download/optsrc17.3tar.gz 并解压 三.安装 ./configure --prefix=/home/liyixiang/e...

清风傲剑
2015/02/05
0
0
CentOS7.X安装RabbitMQ-3.6.10

CentOS7.0安装RabbitMQ 安装前的准备 源码安装erlang erlang下载 erlang-20.0 加入环境变量 测试启动erlang 安装RabbitMQ 下载地址 Binary .tar.xz .zip cd /rootwget http://www.rabbitmq.c...

qq2233466866
2018/06/11
0
0
center 安装 rabbitMQ

centerOs 安装 消息队列 rabbitMQ 下载 https://www.erlang-solutions.com/resources/download.html 编辑文件 centOS7 安装 使用 yum 安装, 推荐安装方式(解决依赖) 下载 rabbitMQ-server ht...

anziguoer
2018/08/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

JS 调用Angularjs 的方法

// 1. 获取 Controllerlet appElement = document.querySelector('[data-ng-controller=MessagesCtrl]');let scope = angular.element(appElement).scope();// 2. 调用方法scope.l......

Moks角木
26分钟前
0
0
dubbo+zookeeper与 eureka的区别

CAP CAP 原则指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可兼得 在分布式架构里, P必须有 Zookeeper保证C P 当...

群星纪元
36分钟前
1
0
云计算之边缘计算大势所趋

如果说边缘计算是公同认定的目标,那么我们看到,不同类型的厂商基于自身的特点,会从不同的起点、沿着不同的路径,向这个目标奔跑。上次参加阿里云的一次活动,看到他们将边缘计算的厂商分成...

linuxCool
41分钟前
0
0
前端通过后端传过来的'\n' ,''等字符串换行失败问题

后台推送换行符 '\n' 或 '<br/>' 等字符串到前台不会换行 详细描述 后台逻辑处理返回String字符串,其中包含\n或<br/>等换行符号,但是前端渲染时候却并没有真正的换行 也尝试了大佬的各种 ...

下次用oschina
47分钟前
2
0
volatile能保证有序性吗?

在前面提到volatile关键字能禁止指令重排序,所以volatile能在一定程度上保证有序性。   volatile关键字禁止指令重排序有两层意思:   1)当程序执行到volatile变量的读操作或者写操作时...

无精疯
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部