文档章节

[RabbitMQ]05_RabbitMQ学习之路由(java)

morpheusWB
 morpheusWB
发布于 2017/09/03 19:11
字数 531
阅读 7
收藏 0

参考:http://blog.csdn.NET/lmj623565791/article/details/37669573

使用direct类型实现:消息会被推送至绑定键(binding key)和消息发布附带的选择键(routing key)完全匹配的队列。例如:将不同的日志发送到不同的消费端。

1.发送日志端SendLogDirect.Java

 

[java] view plain copy

 print?

  1. package cn.slimsmart.rabbitmq.demo.route;  
  2.   
  3. import java.util.Random;  
  4. import java.util.UUID;  
  5.   
  6. import com.rabbitmq.client.AMQP;  
  7. import com.rabbitmq.client.Channel;  
  8. import com.rabbitmq.client.Connection;  
  9. import com.rabbitmq.client.ConnectionFactory;  
  10.   
  11. //随机发送6条随机类型(routing key)的日志给转发器~~  
  12. public class SendLogDirect {  
  13.       
  14.      //交换名称  
  15.      private static final String EXCHANGE_NAME = "ex_logs_direct";    
  16.      //日志分类  
  17.      private static final String[] SEVERITIES = { "info", "warning", "error" };    
  18.           
  19.     public static void main(String[] args) throws Exception {  
  20.         //创建连接和频道    
  21.         ConnectionFactory factory = new ConnectionFactory();    
  22.         factory.setHost("192.168.101.174");  
  23.         // 指定用户 密码  
  24.         factory.setUsername("admin");  
  25.         factory.setPassword("admin");  
  26.         // 指定端口  
  27.         factory.setPort(AMQP.PROTOCOL.PORT);  
  28.         Connection connection = factory.newConnection();    
  29.         Channel channel = connection.createChannel();    
  30.         // 声明转发器的类型    
  31.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");    
  32.     
  33.         //发送6条消息    
  34.         for (int i = 0; i < 6; i++)    
  35.         {    
  36.             String severity = getSeverity();    
  37.             String message = severity + "_log :" + UUID.randomUUID().toString();    
  38.             // 发布消息至转发器,指定routingkey    
  39.             channel.basicPublish(EXCHANGE_NAME, severity, null, message  .getBytes());    
  40.             System.out.println(" [x] Sent '" + message + "'");    
  41.         }    
  42.     
  43.         channel.close();    
  44.         connection.close();    
  45.     }  
  46.       
  47.     /**  
  48.      * 随机产生一种日志类型  
  49.      *   
  50.      * @return  
  51.      */    
  52.     private static String getSeverity()    
  53.     {    
  54.         Random random = new Random();    
  55.         int ranVal = random.nextInt(3);    
  56.         return SEVERITIES[ranVal];    
  57.     }    
  58.   
  59. }  

2.接收日志端ReceiveLogsDirect.java

 

 

[java] view plain copy

 print?

  1. package cn.slimsmart.rabbitmq.demo.route;  
  2.   
  3. import java.util.Random;  
  4.   
  5. import com.rabbitmq.client.AMQP;  
  6. import com.rabbitmq.client.Channel;  
  7. import com.rabbitmq.client.Connection;  
  8. import com.rabbitmq.client.ConnectionFactory;  
  9. import com.rabbitmq.client.QueueingConsumer;  
  10.   
  11. //接收端随机设置一个日志严重级别(binding_key)。。。  
  12. public class ReceiveLogsDirect {  
  13.       
  14.     private static final String EXCHANGE_NAME = "ex_logs_direct";    
  15.     private static final String[] SEVERITIES = { "info", "warning", "error" };    
  16.   
  17.     public static void main(String[] args) throws Exception {  
  18.           // 创建连接和频道    
  19.         ConnectionFactory factory = new ConnectionFactory();    
  20.         factory.setHost("192.168.101.174");  
  21.         // 指定用户 密码  
  22.         factory.setUsername("admin");  
  23.         factory.setPassword("admin");  
  24.         // 指定端口  
  25.         factory.setPort(AMQP.PROTOCOL.PORT);  
  26.         Connection connection = factory.newConnection();    
  27.         Channel channel = connection.createChannel();    
  28.         // 声明direct类型转发器    
  29.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");    
  30.     
  31.         String queueName = channel.queueDeclare().getQueue();    
  32.         String severity = getSeverity();    
  33.         // 指定binding_key    
  34.         channel.queueBind(queueName, EXCHANGE_NAME, severity);    
  35.         System.out.println(" [*] Waiting for "+severity+" logs. To exit press CTRL+C");    
  36.     
  37.         QueueingConsumer consumer = new QueueingConsumer(channel);    
  38.         channel.basicConsume(queueName, true, consumer);    
  39.     
  40.         while (true)    
  41.         {    
  42.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
  43.             String message = new String(delivery.getBody());    
  44.     
  45.             System.out.println(" [x] Received '" + message + "'");    
  46.         }    
  47.     }    
  48.     
  49.     /**  
  50.      * 随机产生一种日志类型  
  51.      *   
  52.      * @return  
  53.      */    
  54.     private static String getSeverity()    
  55.     {    
  56.         Random random = new Random();    
  57.         int ranVal = random.nextInt(3);    
  58.         return SEVERITIES[ranVal];    
  59.     }    
  60. }  

启动几个接收端服务,再启动发送端,接收端对应绑定的键收到对应的消息。

 

注:发送消息时可以设置routing_key,接收队列与转发器间可以设置binding_key,接收者接收与binding_key与routing_key相同的消息。

本文转载自:http://blog.csdn.net/zhu_tianwei/article/details/40887755

morpheusWB
粉丝 26
博文 84
码字总数 14703
作品 0
西安
程序员
私信 提问
java B2B2C Springboot电子商城系统-消息队列之 RabbitMQ

常见的消息队列 需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码请加企鹅求求: :二一四七七七五六三三 目前业界有四款常用的消息队列,它们分别是RabbitMQ、Roc...

it菲菲
2018/12/14
0
0
RabbitMQ 3.6.1 RC1 发布

RabbitMQ 3.6.1 RC1 发布了,下载地址: https://github.com/rabbitmq/rabbitmq-server/releases/tag/rabbitmq_v3_6_1_rc1 该版本完整的改进内容如下: Bug Fixes Purging a lazy queue cou......

淡漠悠然
2016/02/13
871
0
RabbitMQ 3.6.5 Milestone 2 发布

RabbitMQ 3.6.5 Milestone 2 发布了,RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继...

凝小紫
2016/08/30
892
5
RabbitMQ 3.6.1 RC2 发布

RabbitMQ 3.6.1 RC2 发布了,下载地址: https://github.com/rabbitmq/rabbitmq-server/releases/tag/rabbitmq_v3_6_1_rc2 完整改进记录: Server Bug Fixes Windows installer failed to c......

淡漠悠然
2016/02/21
1K
0
阿里大牛带你轻松实现RabbitMQ 延时消息

RabbitMQ 延时消息的实现(上) 我们在实际业务中有一些需要延时发送消息的场景,例如: 家里有一台智能热水器,需要在30分钟后启动 未付款的订单,15分钟后关闭 注意这里的场景是延时,不是...

Java架构
01/23
0
0

没有更多内容

加载失败,请刷新页面

加载更多

NIO基于长度域的报文在Netty下的解码

1, 先复习一下粘包/拆包 1.1, 粘包/拆包的含义 TCP是个“流”协议, 并不了解上层业务数据的具体含义, 它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP...

老菜鸟0217
今天
8
0
从零开始搭建spring-cloud(2) ----ribbon

在微服务架构中,业务都会被拆分成一个独立的服务,服务与服务的通讯是基于http restful的。Spring cloud有两种服务调用方式,一种是ribbon+restTemplate,另一种是feign。 其实我们已经在上...

Vincent-Duan
今天
19
0
get和post的区别?

doGet:路径传参。效率高,安全性差(get的传送数据量有限制,不能大于2Kb) doPOST:实体传参。效率低,安全性好 建议: 1、get方式的安全性较Post方式要差些,包含机密信息的话,建议用Pos...

花无谢
昨天
4
0
当谈论迭代器时,我谈些什么?

当谈论迭代器时,我谈些什么? 花下猫语:之前说过,我对于编程语言跟其它学科的融合非常感兴趣,但我还说漏了一点,就是我对于 Python 跟其它编程语言的对比学习,也很感兴趣。所以,我一直...

豌豆花下猫
昨天
14
0
10天学Python直接做项目,我做了这5件事

初学者如何尽快上手python? 市面上关于如何学python的资料很多,但是讲的都太复杂。 我就是很简单的几句话,从小白到开发工程师,我只做了五件事。 我觉得任何商业计划书如果不能用几句话讲...

Python派森
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部