文档章节

轻松搞定RabbitMQ(五)——路由选择

白志华
 白志华
发布于 2015/10/18 10:56
字数 1376
阅读 47
收藏 1

       翻译地址:http://www.rabbitmq.com/tutorials/tutorial-four-java.html

       在前篇博文中,我们建立了一个简单的日志系统。可以广播消息给多个消费者。本篇博文,我们将添加新的特性——我们可以只订阅部分消息。比如:我们可以接收Error级别的消息写入文件。同时仍然可以在控制台打印所有日志。


Bindings(绑定)

       在上一篇博客中我们已经使用过绑定。类似下面的代码:

channel.queueBind(queueName, EXCHANGE_NAME, "");
       绑定表示转换器与队列之间的关系。可以简单的人为:队列对该转发器上的消息感兴趣。

       绑定可以设定额外的routingKey参数。为了与避免basicPublish方法(发布消息的方法)的参数混淆,我们准备把它称作绑定键(binding key)。下面展示如何使用绑定键(binding key)来创建一个绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");
       绑定键关键取决于转换器的类型。对于fanout类型,忽略此参数。


Direct exchange(直接转发)

       前面讲到我们的日志系统广播消息给所有的消费者。我们想对其扩展,根据消息的严重性来过滤消息。例如:我们希望将致命错误的日志消息记录到文件,而不是把磁盘空间浪费在warn和info类型的日志上。我们使用的fanout转发器,不能给我们太多的灵活性。它仅仅只是盲目的广播而已。我们使用direct转发器进行代替,其背后的算法很简单——消息会被推送至绑定键(binding key)和消息发布附带的选择键(routing key)完全匹配的队列。


       在上图中,我们可以看到direct类型的转发器与2个队列进行了绑定。第一个队列使用的绑定键是orange,第二个队列绑定键为black和green。这样当消息发布到转发器是,附带orange绑定键的消息将被路由到队列Q1中去。附带black和green绑定键的消息被路由到Q2中去。其他消息全部丢弃。


Multiple bindings(多重绑定)


       使用一个绑定键绑定多个队列是完全合法的。如上图,绑定键black绑定了2个队列——Q1和Q2。


Emitting logs(发送日志)

       我们将这种模式用于日志系统,发送消息给direct类型的转发器。我们将 提供日志严重性做为绑定键。那样,接收程序可以选择性的接收严重性的消息。首先关注发送日志的代码:

像往常一样首先创建一个转换器:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
       然后为发送消息做准备:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
       为了简化代码,我们假定日志的严重性是‘info’,‘warning’,‘error’中之一。


Subscribing(订阅)

       接收消息跟前面博文中的一样。我们仅需要修改一个地方:为每一个我们感兴趣的严重性的消息,创建一个新的绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){    
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

完整的例子

发送端代码(EmitLogDirect.java)

public class EmitLogDirect {
	private final static String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] args) throws IOException {
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定转发——广播
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");

		//所有日志严重性级别
		String[] severities={"error","info","warning"};
		for(int i=0;i<3;i++){
			String severity = severities[i%3];//每一次发送一条不同严重性的日志
			
			// 发送的消息
			String message = "Hello World"+Strings.repeat(".", i+1);
			//参数1:exchange name
			//参数2:routing key
			channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
			System.out.println(" [x] Sent '" + severity +"':'"+ message + "'");
		}
		// 关闭频道和连接
		channel.close();
		connection.close();
	}
}
消费者1(ReceiveLogs2Console.java)

public class ReceiveLogs2Console {
	private static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();

		//所有日志严重性级别
		String[] severities={"error","info","warning"};
		for (String severity : severities) {
			//关注所有级别的日志(多重绑定)
			channel.queueBind(queueName, EXCHANGE_NAME, severity);
		}
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    System.out.println(" [x] Received '"  + envelope.getRoutingKey() + "':'" + message + "'");
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
}

消费者2(ReceiveLogs2File.java)

public class ReceiveLogs2File {
	private static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    
	    String severity="error";//只关注error级别的日志,然后记录到文件中去。
	    channel.queueBind(queueName, EXCHANGE_NAME, severity);
	    
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    //记录日志到文件:
			    print2File( "["+ envelope.getRoutingKey() + "] "+message);
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
	
	private static void print2File(String msg) {
		try {
			String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
			String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
			File file = new File(dir, logFileName + ".log");
			FileOutputStream fos = new FileOutputStream(file, true);
			fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());
			fos.flush();
			fos.close();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}  
}
       最终结果:


       罗哩罗嗦的说这么多,其实就是说了这么一件事:我们可以使用Direct exchange+routingKey来过滤自己感兴趣的消息。一个队列可以绑定多个routingKey。这就是我们今天的主题——路由选择。

本文转载自:http://blog.csdn.net/xiaoxian8023/article/details/48733249

白志华
粉丝 34
博文 265
码字总数 57524
作品 0
长沙
程序员
私信 提问
go封装rabbitmq的 发布/订阅 解耦组件

关于rabbitmq 的入门和使用,网络上的资料甚多,在此不再赘述。可以看文章末尾的参考资料。 对rabbitmq的golang客户端二度封装,通过订阅/发布的模式使消息的获取和接收进行解耦,从而支持多...

歪文
07/25
0
0
阿里大牛带你轻松实现RabbitMQ 延时消息

RabbitMQ 延时消息的实现(上) 我们在实际业务中有一些需要延时发送消息的场景,例如: 家里有一台智能热水器,需要在30分钟后启动 未付款的订单,15分钟后关闭 注意这里的场景是延时,不是...

Java架构
01/23
0
0
我的RabbitMQ的学习成果

背景 在研发分布式事务的最终一致性事务模式时,使用了RabbitMQ。 在这之前也接触过RabbitMQ,但没有特别深入的去了解它的特性与原理。这次决定系统的学习一次,所以业余时间阅读大神们的书籍...

XuePeng77
04/15
231
0
RabbitMQ消息队列入门(一)——RabbitMQ消息队列的安装(Windows环境下)

一、RabbitMQ介绍 1、RabbitMQ简介 RabbitMQ是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局:当你把你想要发布的邮件放在邮箱中时,你可以确定邮差先生最终将邮件发送给你的收...

王瘦子Kawhi
07/23
0
0
实现RabbitMQ 延时消息

RabbitMQ 延时消息的实现(上) 我们在实际业务中有一些需要延时发送消息的场景,例如: 家里有一台智能热水器,需要在30分钟后启动 未付款的订单,15分钟后关闭 注意这里的场景是延时,不是...

Java架构资源分享
01/23
77
0

没有更多内容

加载失败,请刷新页面

加载更多

Google Guava 笔记

一、引言 Guava 是 google 几个java核心类库的集合,包括集合 [collections] 、缓存 [caching] 、原生类型支持 [primitives support] 、并发库 [concurrency libraries] 、通用注解 [common ...

SuShine
28分钟前
7
0
SpringBoot中使用@Value为静态变量赋值并测试是否成功

今天想像普通变量一样如下采用写法取配置的,但取到的是个null。。。 @Value("${test.appKey}")private static String appKey; 才发现不能通过这种方式取配置来给static变量赋值 在网上搜索...

SilentSong
29分钟前
5
0
ECMAScript语句之with 语句

ECMAScript with 语句,用于设置代码在特定对象中的作用域(with运行缓慢,设置了属性值时更加缓慢,最好避免使用with语句) 一、with 语句用于字符串(配合toUpperCase()方法) var a = "C...

专注的阿熊
30分钟前
4
0
Apache Flink 进阶(一):Runtime 核心机制剖析

1. 综述 本文主要介绍 Flink Runtime 的作业执行的核心机制。首先介绍 Flink Runtime 的整体架构以及 Job 的基本执行流程,然后介绍在这个过程,Flink 是怎么进行资源管理、作业调度以及错误...

大涛学长
36分钟前
4
0
7. 整数反转

给出一个 32 位的有符号整数,你需要将这个整数中每位上的数字进行反转。 示例 1: 输入: 123 输出: 321 示例 2: 输入: -123 输出: -321 示例 3: 输入: 120 输出: 21 注意: 假设我们的环境只能...

苏坡吴
37分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部