文档章节

RabbitMQ消息队列:发布/订阅(Publish/Subscribe)

 小亮89
发布于 2015/12/18 15:45
字数 1420
阅读 1071
收藏 6

【推荐】2019 Java 开发者跳槽指南.pdf(吐血整理) >>>

        前面我们把每个Message都是deliver到某个单一的Consumer。今天我们将了解如何把同一个Message deliver到多个Consumer中。这个模式也被称作 "publish / subscribe"。
    首先我们将创建一个日志系统,它包含两个部分:第一个部分是发出log(Producer),第二个部分接收到并打印(Consumer)。 我们将构建两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。

 

1. (转发器)Exchanges

    关于exchange的概念在在这里做一下简单的介绍。

    RabbitMQ 的Messaging Model就是Producer并不会直接发送Message到queue。实际上,Producer并不知道它发送的Message是否已经到达queue。

   RabbitMQ消息模型的核心理念是生产者永远不会直接发送给任何的消息队列,一般情况下Producer是不知道消息应该发送到那个队列的。Producer发送的Message实际上是发到了Exchange中。它的功能也很简单:从Producer接收Message,然后投递到queue中。Exchange需要知道如何处理Message,是把它放到某个queue中,还是放到多个queue中?这个rule是通过Exchange 的类型定义的。

   我们知道有三种类型的Exchange:direct, topic ,Headers和fanout。fanout就是广播模式,会将所有的Message都放到它所知道的queue中。创建一个名字为logs,类型为fanout的Exchange:

channel.exchange_declare(exchange='logs',type='fanout');

fanout类型转发器特别简单,吧所有他接受到的消息,广播多有的他知道的队列。

前面说到的生产者只能发送详细给转发器(Exchange),但是我们之前的例子中并没有使用到转发器啊,我们仍然可以发送和接收消息,这是为什么呢?是匿名转发器(nameless exchange)搞的鬼。因为我们使用了一个默认转发器。他的标识为" ".

channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

第一个参数为转发器名,第二个为消息队列名,如果不为空由其决定发送到那个队列中。

现在我们可以指定消息发送到转发器中。

channel.basicPublish( "logs","", null, message.getBytes());

Listing exchanges

通过rabbitmqctl可以列出当前所有的Exchange:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done.

注意 amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默认创建的。

现在我们可以通过exchange,而不是routing_key来publish Message了:

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

2. 临时队列(Temporary queues)

    截至现在,我们用的queue都是有名字的,能够为队列命名对我们来说很关键。使用有名字的queue,使得在Producer和Consumer之前共享queue成为可能。

    但是对于我们将要构建的日志系统,并不需要有名字的queue。我们希望得到所有的log,而不是它们中间的一部分。而且我们只对当前的log感兴趣。为了实现这个目标,我们需要两件事情:
    1) 每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。

    2)当Consumer关闭连接时,这个queue要被deleted。 

 

String queueName = channel.queueDeclare().getQueue();

通过result.method.queue 可以取得queue的名字。基本上都是这个样子:amq.gen-JzTY20BRgKO-HjmUJj1wLg

3. 绑定Bindings

现在我们已经创建了fanout类型的exchange和没有名字的queue(实际上是RabbitMQ帮我们取了名字)。那exchange怎么样知道它的Message发送到哪个queue呢?答案就是通过bindings:绑定。

channel.queueBind(queueName, “logs”, ””)参数1:队列名称 ;参数2:转发器名称

现在logs的exchange就将它的Message附加到我们创建的queue了。Listing bindings

使用命令rabbitmqctl list_bindings。

4. 最终版本

    我们最终实现的数据流图如下:

package com.zhy.rabbit._03_bindings_exchanges;

import java.io.IOException;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog
{
 private final static String EXCHANGE_NAME = "ex_log";

 public static void main(String[] args) throws IOException
 {
  // 创建连接和频道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  // 声明转发器和类型
  channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
  
  String message = new Date().toLocaleString()+" : log something";
  // 往转发器上发送消息
  channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

  System.out.println(" [x] Sent '" + message + "'");

  channel.close();
  connection.close();

 }

}

 

 接收端:

package com.zhy.rabbit._03_bindings_exchanges;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsToSave
{
 private final static String EXCHANGE_NAME = "ex_log";

 public static void main(String[] argv) throws java.io.IOException,
   java.lang.InterruptedException
 {
  // 创建连接和频道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();

  channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  // 创建一个非持久的、唯一的且自动删除的队列
  String queueName = channel.queueDeclare().getQueue();
  // 为转发器指定队列,设置binding
  channel.queueBind(queueName, EXCHANGE_NAME, "");

  System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  QueueingConsumer consumer = new QueueingConsumer(channel);
  // 指定接收者,第二个参数为自动应答,无需手动应答
  channel.basicConsume(queueName, true, consumer);

  while (true)
  {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());

   print2File(message);
  }

 }

 private static void print2File(String msg)
 {
  try
  {
   String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
   String logFileName = new SimpleDateFormat("yyyy-MM-dd")
     .format(new Date());
   File file = new File(dir, logFileName+".txt");
   FileOutputStream fos = new FileOutputStream(file, true);
   fos.write((msg + "\r\n").getBytes());
   fos.flush();
   fos.close();
  } catch (FileNotFoundException e)
  {
   e.printStackTrace();
  } catch (IOException e)
  {
   e.printStackTrace();
  }
 }
}

 

接收端:

package com.zhy.rabbit._03_bindings_exchanges;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsToConsole
{
 private final static String EXCHANGE_NAME = "ex_log";

 public static void main(String[] argv) throws java.io.IOException,
   java.lang.InterruptedException
 {
  // 创建连接和频道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();

  channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  // 创建一个非持久的、唯一的且自动删除的队列
  String queueName = channel.queueDeclare().getQueue();
  // 为转发器指定队列,设置binding
  channel.queueBind(queueName, EXCHANGE_NAME, "");

  System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  QueueingConsumer consumer = new QueueingConsumer(channel);
  // 指定接收者,第二个参数为自动应答,无需手动应答
  channel.basicConsume(queueName, true, consumer);

  while (true)
  {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());
   System.out.println(" [x] Received '" + message + "'");

  }

 }

}

© 著作权归作者所有

粉丝 5
博文 41
码字总数 58888
作品 0
昌平
私信 提问
SpringBoot集成RabbitMQ

官方说明:http://www.rabbitmq.com/getstarted.html 什么是MQ? MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往...

xuanm
2018/08/10
0
0
Spring Boot+RabbitMQ学习笔记

RabbitMQ RabbitMQ 是一个由 Erlang 语言开发的 的开源实现。最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 消息队列 先从基础开始,加...

听_风
2018/06/07
380
0
了解消息队列中间件——RabbitMQ

了解消息队列中间件 1. 消息:指的是在应用之间传送的数据,比如json字符串、纯文本字符串等 2. 消息队列中间件:指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进...

江左煤郎
01/13
56
0
RabbitMQ之消息发布订阅与信息持久化技术

信息发布与订阅 Rabbit的核心组件包含Queue(消息队列)和Exchanges两部分,Exchange的主要部分就是对信息进行路由,通过将消息队列绑定到Exchange上,则可以实现订阅形式的消息发布及Publish...

墨鱼ID
2012/08/09
2.6K
0
Centos7 上安装配置 RabbitMQ

一、安装Erlang环境   网上百度了不少安装erlang的教程,大部分都是安装到一半就他丫的翻车了,搞得我好心累                1、在安装erlang之前先安装下依赖文件(这一步不要...

yzy121403725
2018/05/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Redis面试题详解:哨兵+复制+事务+集群+持久化等

Redis主要有哪些功能? 1.哨兵(Sentinel)和复制(Replication) Redis服务器毫无征兆的罢工是个麻烦事,如何保证备份的机器是原始服务器的完整备份呢?这时候就需要哨兵和复制。 Sentinel...

Java阿七
6分钟前
1
0
响应国家号召 1+X 证书 Web 前端开发考试模拟题

官方QQ群:736413913 1+x证书Web前端开发初级理论考试样题2019 http://blog.zh66.club/index.php/archives/149/ 1+x证书Web前端开发初级实操考试样题2019 http://blog.zh66.club/index.php/...

张帅个人博客
10分钟前
3
0
如何检查一个字符串在Objective-C中是否包含另一个字符串?

如何检查字符串( NSString )是否包含另一个较小的字符串? 我希望有这样的东西: NSString *string = @"hello bla bla";NSLog(@"%d",[string containsSubstring:@"hello"]); 但是我能找到......

javail
27分钟前
2
0
.NET Core开发的iNeuOS工业互联网平台,发布 iNeuDA 数据分析展示组件,快捷开发图形报表和数据大屏

目 录 1. 概述... 2 2. 演示信息... 2 3. 简单介绍... 3 4. 产品特点... 4 5. 价值体现... 5 1. 概述 经过一段时间的努力,iNeuDA产品组件已经开发和测试完成,现在正式上线。现在iNeuOS工业...

wxzz
29分钟前
3
0
在每个GROUP BY组中选择第一行?

顾名思义,我想选择以GROUP BY分组的每组行的第一行。 具体来说,如果我有一个如下的purchases表: SELECT * FROM purchases; 我的输出: id | customer | total---+----------+------ 1...

技术盛宴
42分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部