文档章节

RabbitMQ erlang "Publish/Subscribe"

nao
 nao
发布于 2015/07/30 10:07
字数 1141
阅读 97
收藏 0

官网地址: http://www.rabbitmq.com/tutorials/tutorial-three-python.html

        在 work queue 例子中,每个消息只会发送给一个消费者,本例将演示完全不一样的例子,

就是一个消息被发送给多个消费者消费,这就是我们常说的发布/订阅模式。

        为了演示这个模式,我们将建立一个简单的log系统,它有两个程序组成,一个程序用来发布log消息,另一个程序接收log信息并且把log信息打印出来。

        在这个log系统中,所有正在运行的消费者都可以接收到发布者发布的log信息。通过这种方式我们可以启动一个消费者用来把收到的log信息写入磁盘,同时运行另外一个消费者,把收到的log信息显示在屏幕上。

        实际上,发布的log信息会被广播给所有的消费者。


Exchanges

        在前面的例子中我们从一个队列发送和接收消息。现在我们将介绍Rabbit的整个消息模型。

        RabbitMQ里消息模型的核心思想是生产者从不直接发送任何消息到队列。实际上,通常生产者甚至不知道消息是否发送到了任意一个消息队列中去。

    取而代之的是,生产者仅仅发送一个消息到exchang.  exchange 是一个很简单的东西。Exchange一方面从生产者接收消息,另一方面把消息放到相应的队列中去。 exchange必须真实的知道对于收到的消息做什么操作。这个消息是需要被发到特定的消息队列里面去?或者这个消息应该被丢弃?操作的原则在echange Type中定义。

        

        Exchange type 有以下几种类型: direct, topic, headers, fanout。我们现在关注最后一个fanout.我们先创建一个fanout类型的交换,我们把它取名为"logs":

amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"logs">>,
                                                   type = <<"fanout">>}),

    现在我们可以像下面这样声明我们的有名echange:

 amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"logs">>,
                                                   type = <<"fanout">>}),

Temporary queues(临时队列)

    前面的例子中我们使用的队列都是有名字的,比如说hello和task_queue。可以为队列命名对我们来说非常的重要,特别是当我们需要在生产者和消费者直接共享队列的时候。

但是现在我们的这个log系统不需要这样。消费者需要获取所有的log消息,我们也只对当前的消息感兴趣。为了解决这个问题,我们必须做两件事情。

首先,不管我们什么时候连接到了Rabbit,我们需要一个新的,空的消息队列。我们可以创建一个有随机名字的队列,或者让RabbitMQ为我们选择一个随机的队列名字。我们可以在声明队列的时候不指定queue参数来实现这个目的。

其次,一旦消费者断链,这个消费者使用的消息队列必须被删除,我们可以配置参数exclusive = true来实现这个目的。所以队列的声明看起来像下面这样:

#'queue.declare_ok'{queue = Queue} =
        amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),

Bindings (绑定)

    

    我们已经创建了一个 fanout exchange 和一个队列。现在我们需要告诉exchange 发送消息给我们的queue。exchange和 queue之间的关系我们称为binding(绑定)。

amqp_channel:call(Channel, #'queue.bind'{exchange = <<"logs">>,
                                             queue = Queue}),

    从现在起我们的 log exchange 就会添加消息到我们队列。



Putting it all together

    


完整代码:

    emit_log.erl

-moudle(emit_log).
-compile([export_all]).
-include_lib("amqp_client/include/amqp_client.hrl").

main(Argv) ->
    {ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

    amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"logs">>,
                                                   type = <<"fanout">>}),

    Message = case Argv of
          [] -> <<"info: Hello World!">>;
          Msg -> list_to_binary(string:join(Msg, " "))
          end,
    amqp_channel:cast(Channel,
                      #'basic.publish'{exchange = <<"logs">>},
                      #amqp_msg{payload = Message}),
    io:format(" [x] Sent ~p~n", [Message]),
    ok = amqp_channel:close(Channel),
    ok = amqp_connection:close(Connection),
    ok.

receive_logs.erl

-module(receive_logs).
-compile([export_all]).
-include_lib("amqp_client/include/amqp_client.hrl").

main(_) ->
    {ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

    amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"logs">>,
                                                   type = <<"fanout">>}),

    %% exclusive=true : 消费者断链,这个消费者使用的消息队列必须被删除
    #'queue.declare_ok'{queue = Queue} =
        amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),

    %% 绑定exchange和queue
    amqp_channel:call(Channel, #'queue.bind'{exchange = <<"logs">>,
                                             queue = Queue}),

    io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),

    amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue,
                                                     no_ack = true}, self()),
    receive
        #'basic.consume_ok'{} -> ok
    end,
    loop(Channel).

loop(Channel) ->
    receive
        {#'basic.deliver'{}, #amqp_msg{payload = Body}} ->
            io:format(" [x] ~p~n", [Body]),
            loop(Channel)
    end.

发出日志消息:

同时打开两个接收日志进程,可以看到,这两个接收消息的进程都接收到了日志消息:


© 著作权归作者所有

nao

nao

粉丝 27
博文 155
码字总数 108154
作品 0
成都
后端工程师
私信 提问
RabbitMQ 的安装与工作模式

RabbitMQ 概念: 交换机(exchange type)把消息推送到队列的方法: fanout:不处理路由键,转发到所有绑定的队列上 direct:处理路由键,必须完全匹配,即路由键字符串相同才会转发 topic:路由...

求学ing
2014/11/11
1K
0
Centos7 上安装配置 RabbitMQ

一、安装Erlang环境   网上百度了不少安装erlang的教程,大部分都是安装到一半就他丫的翻车了,搞得我好心累                1、在安装erlang之前先安装下依赖文件(这一步不要...

yzy121403725
2018/05/02
0
0
了解消息队列中间件——RabbitMQ

了解消息队列中间件 1. 消息:指的是在应用之间传送的数据,比如json字符串、纯文本字符串等 2. 消息队列中间件:指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进...

江左煤郎
01/13
45
0
【原创】RabbitMQ 之 Plugins(翻译)

为了方便工作中使用,对 RabbitMQ 的【插件】相关文档进行了翻译,鉴于自己水平有限,翻译中难免有纰漏产生,如果疑问,欢迎指出探讨。此文以中英对照方式呈现。 官方原文:http://www.rabb...

摩云飞
2012/12/12
1K
0
RabbitMQ与消息队列模式

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法; RabbitMQ是开源的,实现了AMQP协议的,采用Erlang(面向并发编程语言)编写的,可复用的企业级消息系统; AMQP(高级消...

Java技术汇
2018/05/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring使用ThreadPoolTaskExecutor自定义线程池及实现异步调用

多线程一直是工作或面试过程中的高频知识点,今天给大家分享一下使用 ThreadPoolTaskExecutor 来自定义线程池和实现异步调用多线程。 一、ThreadPoolTaskExecutor 本文采用 Executors 的工厂...

CREATE_17
今天
5
0
CSS盒子模型

CSS盒子模型 组成: content --> padding --> border --> margin 像现实生活中的快递: 物品 --> 填充物 --> 包装盒 --> 盒子与盒子之间的间距 content :width、height组成的 内容区域 padd......

studywin
今天
7
0
修复Win10下开始菜单、设置等系统软件无法打开的问题

因为各种各样的原因导致系统文件丢失、损坏、被修改,而造成win10的开始菜单、设置等系统软件无法打开的情况,可以尝试如下方法解决 此方法只在部分情况下有效,但值得一试 用Windows键+R打开...

locbytes
昨天
8
0
jquery 添加和删除节点

本文转载于:专业的前端网站➺jquery 添加和删除节点 // 增加一个三和一节点function addPanel() { // var newPanel = $('.my-panel').clone(true) var newPanel = $(".triple-panel-con......

前端老手
昨天
8
0
一、Django基础

一、web框架分类和wsgiref模块使用介绍 web框架的本质 socket服务端 与 浏览器的通信 socket服务端功能划分: 负责与浏览器收发消息(socket通信) --> wsgiref/uWsgi/gunicorn... 根据用户访问...

ZeroBit
昨天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部