文档章节

RabbitMQ (4) Exchange类型:topic

玄影镜心
 玄影镜心
发布于 2016/12/09 17:55
字数 779
阅读 53
收藏 2

基本概念:在Direct上进行了扩展,增加了RoutingKey(BindingKey)的匹配规则,使消息传递更加灵活。

约束规则:

1:routingkey为一个句点号“. ”分隔的字符: 如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。

2:bindingkey中可以存在两种特殊字符“*”与“#”,用于做模糊匹配。其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

发送端代码:

package sunf.rabbitMQTest;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * MQ消息发送者测试4  【topic】
 * 
 * 这个例子将消息发送到Exchange, 该Exchange为【topic】类型,发送消息时候需要声明routingKey,

 * topic类型的Exchange扩展的direct,增加routing的复杂性,但在消息接收时可 通过模糊匹配同时获得多个队列的信息,增加了消息获取的灵活度。
 * 
 * Send4与Send3相比,仅仅修改了Exchange的类型。
 * ConsumerTest4与ConsumerTest3的基础上修改了routingKey,将固定值改为可匹配的类型,使马云同时获得了多条队列上的消息。
 * 
 * ps: 
 * 1:routingKey每个句号分隔一个单词, 一个词用*匹配, 多个或者零个用#匹配。
 * 
 * @author SUNF
 *
 */
public class Send4 {
    final static String exchangeName = "zg4";
    final static String routingKey1 = "com.sunfan.www";
    final static String routingKey2 = "com.sunfan.sf.www";
    

    public static void main(String[] args) throws IOException, Exception {
        Connection conn = createConnection();
        Channel channel = conn.createChannel();
        //声明一个exchange和routing类型 - topic
        channel.exchangeDeclare(exchangeName, "topic"); 
        
        for(int i=0;i<1;i++){
            String message = "错误消息" + i;
            //持久化消息
            channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
            
            System.out.println(routingKey1 +"消息已发送------>>" + message);
        }
        
        for(int i=0;i<2;i++){
            String message = "异常消息" + i;
            //持久化消息
            channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());
            
            System.out.println(routingKey2 +"消息已发送------>>" + message);
        }
        
        channel.close();
        conn.close();
        
    }


    private static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setUsername("sunfan");
        cf.setPassword("711579");
        cf.setHost(RabbitMQConstants.VHOST_IP);
        Connection conn = cf.newConnection(); //新连接
        return conn;
    }
}
com.sunfan.www消息已发送------>>错误消息0
com.sunfan.sf.www消息已发送------>>异常消息0
com.sunfan.sf.www消息已发送------>>异常消息1

接收端:

注意接收端RoutingKey2 

package sunf.rabbitMQTest;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * MQ消息发送者测试4  对应的消费者
 * @author SUNF
 *
 */
public class ConsumerTest4 {
    final static String exchangeName = Send4.exchangeName;
    final static String routingKey1 = "com.sunfan.www";
    final static String routingKey2 = "com.#.www";

    public static void main(String[] args) throws Exception{
        //1
        new Thread(){
            public void run(){
                try {
                    ConsumerTest4.recver("葫芦山","葫芦娃", routingKey1);
                } catch (Exception e) {
                    e.printStackTrace();
                }  
            }
        }.start();
        
        //2
        new Thread(){
            public void run(){
                try {
                    ConsumerTest4.recver("阿里巴巴","马云", routingKey2);
                } catch (Exception e) {
                    e.printStackTrace();
                }  
            }
        }.start();
        
    }

    /**
     * 信息接收方法
     * @param queueName 队列名称
     * @param consumerName  消费者名称
     * @param routingKey   发送线路key
     * @throws Exception
     */
    private static void recver(String queueName, String consumerName, String routingKey) throws Exception {
        Channel channel = createChannel();
        
        /*****从此处开始看***/
        //声明exchange和routing类型
        channel.exchangeDeclare(exchangeName, "topic");
        //声明queue
        channel.queueDeclare(queueName, false, false, false, null);
        
        //解绑(若这个queue与Exchange的绑定并未解除,该关系一直存在,routingKey更变后不会影响原来的通道绑定关系,只会建立新的接收通道)
        channel.queueUnbind(queueName, exchangeName, routingKey == routingKey1 ? routingKey2 : routingKey1);
        
        //绑定Queue与Exchange 
        //与2相比增加了第三个参数routing key
        channel.queueBind(queueName, exchangeName, routingKey);
        
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);  
        
        while (true) {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            //获取routingKey
            String routingKeyMsg = delivery.getEnvelope().getRoutingKey();
            System.out.println(consumerName+"从【"+queueName+"】接收到("+routingKeyMsg+")消息---->> " + message + "'");  
        }
    }

    
    private static Channel createChannel() throws IOException, TimeoutException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost(RabbitMQConstants.VHOST_IP);
        Connection conn = cf.newConnection();
        Channel channel = conn.createChannel();
        return channel;
    }

}

 

© 著作权归作者所有

玄影镜心
粉丝 9
博文 103
码字总数 53409
作品 0
西安
高级程序员
私信 提问
1、RabbitMQ 入门秘籍,三分钟带你快速了解RabbitMQ

一、前言 刚开始接触RabbitMQ的时候,有些概念那理解起来简直是像风像雨又像雾,晦涩难懂。这篇文章用尽可能浅显的语言来解释RabbitMQ的入门知识。毕竟是入门课程,并没有对很多概念进行深入...

极客慧
2018/11/26
189
0
消息队列1:RabbitMQ解析并基于Springboot实战

目录 RabbitMQ简介 RabitMQ 概念模型 Exchange 类型 代码实战 RabbitMQ简介 AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此...

养码青年
2018/09/07
0
0
OpenStack中RabbitMQ RPC 调用研究

这两天研究了一下,OpenStack的工作原理,并着重调研了一下RabbitMQ在OpenStack中扮演的角色。 首先,OpenStack中模块Volume Control、Network Controller、ComputeController以及Scheduler...

icheer
2013/08/27
415
0
(二)RabbitMQ消息队列-RabbitMQ消息队列架构与基本概念

没错我还是没有讲怎么安装和写一个HelloWord,不过快了,这一章我们先了解下RabbitMQ的基本概念。 RabbitMQ架构 说是架构其实更像是应用场景下的架构(自己画的有点丑,勿嫌弃) 从图中可以看...

Super_RD
2017/04/19
0
0
OpenStack中RabbitMQ RPC 调用研究

这两天研究了一下,OpenStack的工作原理,并着重调研了一下RabbitMQ在OpenStack中扮演的角色。 首先,OpenStack中模块Volume Control、Network Controller、ComputeController以及Scheduler...

icheer
2013/08/21
176
0

没有更多内容

加载失败,请刷新页面

加载更多

Docker常用命令小记

除了基本的<font color="blue">docker pull</font>、<font color="blue">docker image</font>、<font color="blue">docker ps</font>,还有一些命令及参数也很重要,在此记录下来避免遗忘。 ......

程序员欣宸
昨天
3
0
MAT使用-jvm内存溢出问题分析定位

1.MAT简介: MAT 全称 Eclipse Memory Analysis Tools 是一个分析 Java堆数据的专业工具,可以计算出内存中对象的实例数量、占用空间大小、引用关系等,看看是谁阻止了垃圾收集器的回收工作,...

xiaomin0322
昨天
4
0
内网和外网之间的通信(端口映射原理)

首先解释一下“内网”与“外网”的概念: 内网:即所说的局域网,比如学校的局域网,局域网内每台计算机的IP地址在本局域网内具有互异性,是不可重复的。但两个局域网内的内网IP可以有相同的...

Jack088
昨天
5
0
3.深入jvm内核-原理、诊断与优化-4. GC算法和种类

一、GC算法和种类 GC的概念 GC算法 引用计数法 标记清除 标记压缩 复制算法 可触及性 Stop-The-World GC的对象是堆空间和永久区 引用计数法 老牌垃圾回收算法 通过引用计算来回收垃圾 使用者...

hexiaoming123
昨天
4
0
MySQL中的哈希索引

Memory中的哈希索引 哈希索引是基于哈希表实现的,只有精确匹配索引所有列的查询才有效。对于每一行数据,存储引擎都会对所有的索引列计算一个哈希码,哈希码是一个较小的值,并且不同键值的...

我的眼里只有眼屎
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部