MQ学习(二)AMQP模型、重要概念和Java操作RabbitMQ实际使用
博客专区 > 阿荒 的博客 > 博客详情
MQ学习(二)AMQP模型、重要概念和Java操作RabbitMQ实际使用
阿荒 发表于2个月前
MQ学习(二)AMQP模型、重要概念和Java操作RabbitMQ实际使用
  • 发表于 2个月前
  • 阅读 11
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云实验室 1小时搭建人工智能应用,让技术更容易入门 免费体验 >>>   

目录


AMQP模型

http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html

重要概念

  • 连接Connection
    Connection是RabbitMQ的TCP连接

  • 通道Channel
    应用与AMQP可能有时候想创造多个连接,但是过多的TCP连接会消耗系统资源,所以通道是在Connection上为线程开启的单独的虚拟连接

  • 虚拟主机VHost
    为了在同一个MQ服务上实现多个隔离环境,可以设置不同的VHost

  • 交换机Exchange
    消息发送者在建立连接后,首先将消息发送到交换机上,交换机负责根据规则将消息路由给不同的队列

  • 路由Route
    交换机将消息匹配到队列的过程

  • 队列Queue
    队列是接收消息的实体,消息存放在队列中

  • 路由键RoutingKey
    路由键是关联交换机和队列的唯一标识,发送消息到交换机上并且指定路由键,就能将消息路由到队列

JAVA发送消息

开始写一个简单的发送例子

发送过程

  • 读取配置文件
  • 创建连接
    此时需要吧配置文件中用户名、密码、ip、端口号设置好
  • 根据连接创建通道
  • 创建交换机
  • 创建队列
  • 交换机绑定队列
  • 将消息发送到队列上
    不同的交换机有不同的发送方式,通过不同的发送方式可以将消息路由到不同的队列中,具体区别在下面具体讲述。

MQ配置文件mq.properties

mq.host=localhost
mq.port=5672
mq.user=guest
mq.pwd=guest
mq.timeOut=3000

配置文件读取工具类

public class PropertiesLoadUtil {
    
    public static Properties load(String fileName) throws Exception {
        if (EmptyUtils.isEmpty(fileName) || EmptyUtils.isTrimBlank(fileName)) {
            return null;
        }
        Properties properties = new Properties();
        InputStream is = PropertiesLoadUtil.class.getClassLoader().getResourceAsStream(fileName);
        properties.load(is);
        return properties;
    }
    
}

MQ配置加类

package com.java.butterfly.business.test.service.impl.rabbitMq;

import java.util.Properties;

import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;

import com.java.butterfly.common.util.EmptyUtils;
import com.java.butterfly.common.util.PropertiesLoadUtil;

/**
 * Created by lu.xu on 2017/8/17.
 * TODO: rabbitmq 配置文件加载
 */
@Service
public class MqPropertiesUtil {
    
    private static Logger logger = Logger.getLogger(MqPropertiesUtil.class);
    
    /**mq配置信息*/
    private static final Properties mqProperties;
    
    /**mq配置文件名称以及各个属性名称*/
    private static final String mqPropertiesName = "mq.properties";
    
    private static final String MQ_HOST_KEY = "mq.host";
    
    private static final String MQ_PORT_KEY = "mq.port";
    
    private static final String MQ_USER_KEY = "mq.user";
    
    private static final String MQ_PWD_KEY = "mq.pwd";
    
    private static final String MQ_TIME_OUT_KEY = "mq.timeOut";
    
    /**属性值*/
    public static final String MQ_HOST;
    
    public static final int MQ_PORT;
    
    public static final String MQ_USER;
    
    public static final String MQ_PWD;
    
    public static final int MQ_TIME_OUT;
    
    static {
        logger.info(">>加载mq配置文件");
        Properties temp = null;
        try {
            temp = PropertiesLoadUtil.load(mqPropertiesName);
        } catch (Exception e) {
            logger.error(">>初始化加载mq配置文件出错...");
            logger.error(e.getMessage());
        } finally {
            mqProperties = temp;
            MQ_HOST = mqProperties.getProperty(MQ_HOST_KEY);
            String mqPort = mqProperties.getProperty(MQ_PORT_KEY);
            if (EmptyUtils.isNotEmpty(mqPort)) {
                MQ_PORT = Integer.valueOf(mqPort);
            } else {
                MQ_PORT = 0;
            }
            MQ_USER = mqProperties.getProperty(MQ_USER_KEY);
            MQ_PWD = mqProperties.getProperty(MQ_PWD_KEY);
            String mqTimeOut = mqProperties.getProperty(MQ_TIME_OUT_KEY);
            if (EmptyUtils.isNotEmpty(mqTimeOut)) {
                MQ_TIME_OUT = Integer.valueOf(mqTimeOut);
            } else {
                MQ_TIME_OUT = 3000;
            }
        }
        logger.info(">>加载mq配置文件over");
    }
    
}

准备工作完毕,开始进行发送测试;测试分别测试rabbitMQ客户端发送和springAMQP发送

rabbitMQ发送

maven

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.1</version>
</dependency>

java代码

package com.java.butterfly.business.test.service.impl.rabbitMq;

import org.apache.log4j.Logger;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

/**
 * Created by lu.xu on 2017/8/22.
 * TODO: rabbitClint 发送接收MQ
 */
public class RabbitMqClinetTest {
    private static Logger logger = Logger.getLogger(RabbitMqClinetTest.class);
    
    private static ConnectionFactory rabbitCf = null;
    static {
        try {
            rabbitCf = new ConnectionFactory();
            rabbitCf.setHost(MqPropertiesUtil.MQ_HOST);
            rabbitCf.setPort(MqPropertiesUtil.MQ_PORT);
            rabbitCf.setUsername(MqPropertiesUtil.MQ_USER);
            rabbitCf.setPassword(MqPropertiesUtil.MQ_PWD);
            rabbitCf.setConnectionTimeout(MqPropertiesUtil.MQ_TIME_OUT);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }
    }
    
    /**
     * 使用rabbitmq-client实现发送消息
     *      不指定exchange类型,也不指定routingkey 使用默认的
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        logger.info(">> 发送消息开始...");
        Connection connection = rabbitCf.newConnection();
        Channel channel = connection.createChannel();
        
        String QUEUE_NAME = "RABBIT_QUEUE";
        String message = "Hello World!";
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        /**
         * 踩坑!
         * 如果发现程序走到:  channel.basicPublish就阻塞了,后来吧rabbitmq重启了就好了
         * 重启之前 rabbitmqctl status只能看到 rabbitmq在哪台机器
         * 重启之后 同样的命令能看到各种信息
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        logger.info(" 发送 :" + message + " 完毕\n");
        
        logger.info("接收消息开始,线程睡眠3s,避免延迟情况下获取不到消息");
        Thread.sleep(3000);
        GetResponse response = channel.basicGet(QUEUE_NAME, false);
        if (response == null) {
            logger.info("未收到到...");
        } else {
            AMQP.BasicProperties props = response.getProps();
            byte[] body = response.getBody();
            String msg = new String(body);
            logger.info("收取消息:" + msg);
            long deliveryTag = response.getEnvelope().getDeliveryTag();
            channel.basicAck(response.getEnvelope().getDeliveryTag(), false); // acknowledge receipt of the message
        }
        logger.info("接收消息完毕..");
        channel.close();

        connection.close();
        logger.info(">>连接已销毁.. ");
    }
    
}

SpringAMQP发送

maven

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.3.9.RELEASE</version>
</dependency>

java代码

package com.java.butterfly.business.test.service.impl.rabbitMq;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * Created by lu.xu on 2017/8/22.
 * TODO: spring发送rabbit示例
 */
public class SpringAMQPTest {
    private static Logger logger = Logger.getLogger(SpringAMQPTest.class);
    
    private static CachingConnectionFactory cachingConnectionFactory = null;
    
    static {
        try {
            System.out.println();
            com.rabbitmq.client.ConnectionFactory rabbitCf = new com.rabbitmq.client.ConnectionFactory();
            rabbitCf.setHost(MqPropertiesUtil.MQ_HOST);
            rabbitCf.setPort(MqPropertiesUtil.MQ_PORT);
            rabbitCf.setUsername(MqPropertiesUtil.MQ_USER);
            rabbitCf.setPassword(MqPropertiesUtil.MQ_PWD);
            rabbitCf.setConnectionTimeout(MqPropertiesUtil.MQ_TIME_OUT);
            cachingConnectionFactory = new CachingConnectionFactory(rabbitCf);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }
    }
    
    /**
     * 使用spring-rabbit发送直连交换机类型的消息
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        logger.info(">>spring发送消息开始...");
        
        final String queueName = "MY_AMPQ_QUEUE";
        final String exchangeName = "MY_AMPQ_EXCHANGE";
        final String routingKey = "MY_AMPQ_ROUTING_KEY";
        final String msg = "Hello, world!";
        
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        /**声明交换机,声明后即存在于rabbitmq服务端*/
        DirectExchange exchange = new DirectExchange(exchangeName);
        admin.declareExchange(exchange);
        /**声明队列*/
        Queue queue = new Queue(queueName);
        admin.declareQueue(queue);
        /**交换机和队列通过routingkey进行绑定*/
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
        /**发送消息*/
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        template.convertAndSend(exchangeName, routingKey, msg);
        logger.info(">>发送 :" + msg + " 完毕\n");
        
        logger.info("接收消息开始,线程睡眠3s,避免延迟情况下获取不到消息");
        Thread.sleep(3000);
        Object obj = template.receiveAndConvert(queueName);
        if (null == obj) {
            logger.info(">> 未收取到");
        }
        logger.info(">> 收取 :" + String.valueOf(obj));
        
        cachingConnectionFactory.destroy();
        logger.info(">>连接已销毁.. ");
    }
}

交换机

不同的交换机区别

  • Direct
    直连交换机,需要routingKey,通过routingKey精确匹配交换机和队列
  • Topic
    主题交换机,需要routingKey,通过routingKey模糊匹配交换机和队列
  • Fanout
    扇形交换机,不需routingKey,发送消息到交换机上,会拷贝给每一个绑定的队列
  • Headers
    头交换机,不需routingKey,绑定时需要声明匹配规则,发送消息时根据消息头中设定内容,自动匹配到队列

SpringAMQP中不同交换机的使用

package com.java.butterfly.business.test.service.impl.rabbitMq;

import java.util.Hashtable;
import java.util.Map;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * Created by lu.xu on 2017/8/22.
 * TODO: spring发送rabbit 多种exchange示例
 */
public class SpringAMQPExchangeTest {
    private static Logger logger = Logger.getLogger(SpringAMQPExchangeTest.class);
    
    private static CachingConnectionFactory cachingConnectionFactory = null;
    static {
        try {
            com.rabbitmq.client.ConnectionFactory rabbitCf = new com.rabbitmq.client.ConnectionFactory();
            rabbitCf.setHost(MqPropertiesUtil.MQ_HOST);
            rabbitCf.setPort(MqPropertiesUtil.MQ_PORT);
            rabbitCf.setUsername(MqPropertiesUtil.MQ_USER);
            rabbitCf.setPassword(MqPropertiesUtil.MQ_PWD);
            rabbitCf.setConnectionTimeout(MqPropertiesUtil.MQ_TIME_OUT);
            cachingConnectionFactory = new CachingConnectionFactory(rabbitCf);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }
    }
    
    public static void main(String[] args) throws Exception {
        try {
            directExchangeTest();
            headerExchangeTest();
            fanoutExchangeTest();
            topicExchangeTest();
            
        } catch (Exception e) {
            throw e;
        } finally {
            /**关闭连接*/
            Thread.sleep(3000);
            /**关闭connection*/
            if (cachingConnectionFactory != null) {
                cachingConnectionFactory.destroy();
                System.out.println("\n");
                logger.info("rabbitmq连接已销毁");
            }
        }
        
    }
    
    /**
     * Direct 直连交换机
     * 通过routingKey绑定交换机和队列
     */
    public static void directExchangeTest() throws Exception {
        System.out.println("\n\n\n");
        logger.info(">>开始往直连交换机上发送mq..");
        final String exchangeName = "TEST_DIRECT_EXCHANGE";
        final String queueName = "TEST_DIRECT_QUEUENAME1";
        final String routingKey = "TEST_DIRECT_ROUTINGKEY1";

        DirectExchange exchange = new DirectExchange(exchangeName);
        Queue queue = new Queue(queueName);
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        admin.declareExchange(exchange);
        admin.declareQueue(queue);
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        for (int i = 0; i < 5; i++) {
            /**发送的时候根据交换机名称,将路由名称传递,就能找到对应队列*/
            String msg = "hello direct " + i;
            System.out.println("发送:" + msg);
            template.convertAndSend(exchangeName, routingKey, msg);
        }
        logger.info(">>发送直连交换机队列消息完毕...\n");
        
        logger.info(">>直连交换机接收开始...");
        /**避免延迟情况下,统计队列大小出错*/
        Thread.sleep(3000);
        RabbitAdmin consumerAdmin = new RabbitAdmin(cachingConnectionFactory);
        Integer size = (Integer) consumerAdmin.getQueueProperties(queueName).get("QUEUE_MESSAGE_COUNT");
        logger.info("当前队列大小:" + size);
        RabbitTemplate consumerTemplate = new RabbitTemplate(cachingConnectionFactory);
        for (int i = 1; i <= size; i++) {
            Object obj = consumerTemplate.receiveAndConvert(queueName);
            System.out.println("接收消息:" + String.valueOf(obj));
        }
        logger.info(">>直连交换机接收完毕...");
        
    }
    
    /**
     *Headers 头交换机
     * 忽略routingKey,使用消息头内容建立路由规则
     *      通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
     * 使用:
     *      1.声明队列和exchange的绑定模式any or all 并且传递绑定规则
     *      2.发送消息时在消息头中明确绑定规则
     *注意:
     *      慎用头交换机,性能低
     */
    public static void headerExchangeTest() throws Exception {
        System.out.println("\n\n\n");
        logger.info(">>开始往头交换机上面发送消息...");
        final String exchangeName = "TEST_HEADERS_EXCHANGE";
        final String queueName1 = "TEST_HEADERS_QUEUENAME1";
        final String queueName2 = "TEST_HEADERS_QUEUENAME2";
        final String queueName3 = "TEST_HEADERS_QUEUENAME3";
        
        /**
         * 在一个交换机上面设置三个队列,并设置3个不同的路由规则
         */
        HeadersExchange exchange = new HeadersExchange(exchangeName);
        Queue queue1 = new Queue(queueName1);
        Queue queue2 = new Queue(queueName2);
        Queue queue3 = new Queue(queueName3);
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        admin.declareExchange(exchange);
        admin.declareQueue(queue1);
        admin.declareQueue(queue2);
        admin.declareQueue(queue3);
        /**设置3个匹配规则*/
        Map header1 = new Hashtable();
        header1.put("name", "jack");
        header1.put("age", "20");
        Map header2 = new Hashtable();
        header2.put("sex", "sex");
        header2.put("temp", "temp");
        Map header3 = new Hashtable();
        header3.put("name", "jack");
        header3.put("sex", "sex");
        /**设置绑定模式和匹配规则*/
        admin.declareBinding(BindingBuilder.bind(queue1).to(exchange).whereAll(header1).match());
        admin.declareBinding(BindingBuilder.bind(queue2).to(exchange).whereAny(header2).match());
        admin.declareBinding(BindingBuilder.bind(queue3).to(exchange).whereAny(header3).match());
        
        /**
         *发送,发送前将消息头属性设置好
         * 队列1完全匹配,可以收到
         * 队列2模糊匹配,但是没有匹配项,收不到
         */
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("name", "jack");
        messageProperties.setHeader("age", "20");
        String msg =
            "hello  header 1,name jack ,age 20 ," + queue1.getName() + "、" + queue3.getName() + " will received";
        Message message = new Message(msg.getBytes(), messageProperties);
        System.out.println("发送: " + msg);
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        template.convertAndSend(exchangeName, null, message);
        /**
         *再发送一个,应该都可以收到
         */
        MessageProperties messageProperties2 = new MessageProperties();
        messageProperties2.setHeader("name", "jack");
        messageProperties2.setHeader("age", "20");
        messageProperties2.setHeader("sex", "sex");
        String msg2 = "hello  header 2,name jack ,age 20 ,sex sex , all queue will received";
        Message message2 = new Message(msg.getBytes(), messageProperties2);
        System.out.println("发送:" + msg2);
        template.convertAndSend(exchangeName, null, message2);
        logger.info(">>发送头交换机队列消息完毕...\n");
        
        logger.info(">>头交换机接收开始,线程睡眠3s避免延迟情况下,统计队列大小出错...");
        Thread.sleep(3000);
        RabbitAdmin consumerAdmin = new RabbitAdmin(cachingConnectionFactory);
        Integer size1 = (Integer) consumerAdmin.getQueueProperties(queueName1).get("QUEUE_MESSAGE_COUNT");
        Integer size2 = (Integer) consumerAdmin.getQueueProperties(queueName2).get("QUEUE_MESSAGE_COUNT");
        Integer size3 = (Integer) consumerAdmin.getQueueProperties(queueName3).get("QUEUE_MESSAGE_COUNT");
        logger.info(queueName1 + " 消费前大小 :" + size1);
        logger.info(queueName2 + " 消费前大小 :" + size2);
        logger.info(queueName3 + " 消费前大小 :" + size3);
        
        RabbitTemplate consumerTemplate = new RabbitTemplate(cachingConnectionFactory);
        System.out.println("\r");
        logger.info(">>收取 " + queue1.getName());
        for (int i = 1; i <= size1; i++) {
            Message mmsg = consumerTemplate.receive(queue1.getName());
            System.out.println(new String(mmsg.getBody()));
        }
        System.out.println("\r");
        logger.info(">>收取 " + queue2.getName());
        for (int i = 1; i <= size2; i++) {
            Message mmsg = consumerTemplate.receive(queue2.getName());
            System.out.println(new String(mmsg.getBody()));
        }
        System.out.println("\r");
        logger.info(">>收取 " + queue3.getName());
        for (int i = 1; i <= size3; i++) {
            Message mmsg = consumerTemplate.receive(queue3.getName());
            System.out.println(new String(mmsg.getBody()));
        }
        size1 = (Integer) consumerAdmin.getQueueProperties(queueName1).get("QUEUE_MESSAGE_COUNT");
        size2 = (Integer) consumerAdmin.getQueueProperties(queueName2).get("QUEUE_MESSAGE_COUNT");
        size3 = (Integer) consumerAdmin.getQueueProperties(queueName3).get("QUEUE_MESSAGE_COUNT");
        System.out.println("\r");
        logger.info(queueName1 + " 消费后大小 :" + size1);
        logger.info(queueName2 + " 消费后大小 :" + size2);
        logger.info(queueName3 + " 消费后大小 :" + size3);
        logger.info(">>头交换机接收完毕...");
        
    }
    
    /**
     *Fanout 扇形交换机
     *      忽略routingKey 当消息发送到交换机上,交换机会拷贝给它对应的所有队列
     */
    public static void fanoutExchangeTest() throws Exception {
        System.out.println("\n\n\n");
        logger.info(">>扇形交换机发送消息开始..");
        final String exchangeName = "TEST_FANOUT_EXCHANGE";
        final String queueName1 = "TEST_FANOUT_QUEUENAME1";
        final String queueName2 = "TEST_FANOUT_QUEUENAME2";
        final String queueName3 = "TEST_FANOUT_QUEUENAME3";

        FanoutExchange exchange = new FanoutExchange(exchangeName);
        Queue queue1 = new Queue(queueName1);
        Queue queue2 = new Queue(queueName2);
        Queue queue3 = new Queue(queueName3);
        /**交换机、队列进行绑定,不需要routingKey*/
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        admin.declareExchange(exchange);
        admin.declareQueue(queue1);
        admin.declareBinding(BindingBuilder.bind(queue1).to(exchange));
        admin.declareQueue(queue2);
        admin.declareBinding(BindingBuilder.bind(queue2).to(exchange));
        admin.declareQueue(queue3);
        admin.declareBinding(BindingBuilder.bind(queue3).to(exchange));

        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        for (int i = 0; i < 5; i++) {
            String msg = "hello  fanout " + i;
            logger.info("发送:" + msg);
            template.convertAndSend(exchangeName, null, msg);
        }
        logger.info(">>发送扇形交换机队列消息完毕...");
        
        logger.info(">>扇形换机接收开始,程序睡眠3s避免延迟情况下,统计队列大小出错...");
        Thread.sleep(3000);
        RabbitAdmin consumerAdmin = new RabbitAdmin(cachingConnectionFactory);
        Integer size1 = (Integer) consumerAdmin.getQueueProperties(queueName1).get("QUEUE_MESSAGE_COUNT");
        Integer size2 = (Integer) consumerAdmin.getQueueProperties(queueName2).get("QUEUE_MESSAGE_COUNT");
        Integer size3 = (Integer) consumerAdmin.getQueueProperties(queueName3).get("QUEUE_MESSAGE_COUNT");
        logger.info(queueName1 + "当前队列大小:" + size1);
        logger.info(queueName2 + "当前队列大小:" + size2);
        logger.info(queueName3 + "当前队列大小:" + size3);
        
        RabbitTemplate consumerTemplate = new RabbitTemplate(cachingConnectionFactory);
        System.out.println("\r");
        logger.info(">>接收 " + queue1.getName());
        for (int i = 1; i <= size1; i++) {
            Message mmsg = consumerTemplate.receive(queue1.getName());
            System.out.println(new String(mmsg.getBody()));
        }
        System.out.println("\r");
        logger.info(">>接收 " + queue2.getName());
        for (int i = 1; i <= size2; i++) {
            Message mmsg = consumerTemplate.receive(queue2.getName());
            System.out.println(new String(mmsg.getBody()));
        }
        System.out.println("\r");
        logger.info(">>接收 " + queue3.getName());
        for (int i = 1; i <= size3; i++) {
            Message mmsg = consumerTemplate.receive(queue3.getName());
            System.out.println(new String(mmsg.getBody()));
        }
        logger.info(">>扇形交换机接收完毕...");
        
    }
    
    /**
     *topic  主题交换机
     *      routingKey 和队列进行模糊匹配
     *      Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息
     *      
     * 匹配关键字 #和* , “#”表示0个或若干个关键字,“*”表示一个关键字
     * 例如:
     *  log.test.test 可以匹配 log.# ,但是不能匹配log.*
     *  log.test 可以匹配log.*
     */
    public static void topicExchangeTest() throws Exception {
        System.out.println("\n\n\n");
        logger.info(">>主题交换机发送消息开始...");
        final String exchangeName = "TEST_TOPIC_EXCHANGE";
        final String queueName1 = "TEST_TOPIC_QUEUENAME1";
        final String queueName2 = "TEST_TOPIC_QUEUENAME2";
        final String routingKey1 = "#.RTK.#";
        final String routingKey2 = "*.RTK.*";
        
        TopicExchange exchange = new TopicExchange(exchangeName);
        Queue queue1 = new Queue(queueName1);
        Queue queue2 = new Queue(queueName2);
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        admin.declareExchange(exchange);
        admin.declareQueue(queue1);
        admin.declareBinding(BindingBuilder.bind(queue1).to(exchange).with(routingKey1));
        admin.declareQueue(queue2);
        admin.declareBinding(BindingBuilder.bind(queue2).to(exchange).with(routingKey2));
        /**发送*/
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        String msg = "  hello topic for all";
        logger.info("给所有队列发送:" + msg);
        template.convertAndSend(exchangeName, "test.RTK.test", msg);
        msg = "  hello topic for none";
        logger.info("发送一个无法匹配的队列:" + msg);
        template.convertAndSend(exchangeName, "test.test.RRRRTTTK.test.test", msg);
        msg = "  hello topic for " + queueName1;
        logger.info("发送一个" + queueName1 + "可以接收到的:" + msg);
        template.convertAndSend(exchangeName, "test.test.RTK.test.test", msg);
        logger.info(">>发送主题交换机队列消息完毕...");
        
        logger.info(">>主题交换机接收开始,程序睡眠3s,避免延迟情况下,统计队列大小出错...");
        Thread.sleep(3000);
        RabbitAdmin consumerAdmin = new RabbitAdmin(cachingConnectionFactory);
        Integer size1 = (Integer) consumerAdmin.getQueueProperties(queueName1).get("QUEUE_MESSAGE_COUNT");
        Integer size2 = (Integer) consumerAdmin.getQueueProperties(queueName2).get("QUEUE_MESSAGE_COUNT");
        logger.info(queueName1 + "当前队列大小:" + size1);
        logger.info(queueName2 + "当前队列大小:" + size2);
        
        RabbitTemplate consumerTemplate = new RabbitTemplate(cachingConnectionFactory);
        System.out.println("\r");
        logger.info(">>接收 " + queue1.getName());
        for (int i = 1; i <= size1; i++) {
            Message mmsg = consumerTemplate.receive(queue1.getName());
            System.out.println(new String(mmsg.getBody()));
        }
        System.out.println("\r");
        logger.info(">>接收 " + queue2.getName());
        for (int i = 1; i <= size2; i++) {
            Message mmsg = consumerTemplate.receive(queue2.getName());
            System.out.println(new String(mmsg.getBody()));
        }
        logger.info(">>主题交换机接收完毕...");
    }
}

SpringAMQP交换机和队列参数

以DirectExchange直连交换机为例

public DirectExchange(String name) {
		super(name);
	}

public DirectExchange(String name, boolean durable, boolean autoDelete) {
	super(name, durable, autoDelete);
}

public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String,Object> arguments) {
	super(name, durable, autoDelete, arguments);
}

直连交换机继承于 AbstractExchange

public AbstractExchange(String name) {
	this(name, true, false);
}

public AbstractExchange(String name, boolean durable, boolean autoDelete) {
		this(name, durable, autoDelete, null);
}

public AbstractExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
		super();
		this.name = name;
		this.durable = durable;
		this.autoDelete = autoDelete;
		if (arguments != null) {
			this.arguments = arguments;
		}
		else {
			this.arguments = new HashMap<String, Object>();
		}
	}
	

name:名称
durable:是否持久化
autoDelete:没有消费者时是否自动删除
arguments:配置信息
我们可以看到,默认情况下交换机是持久化的,且不会自动删除。

队列参数类似,但是队列中有 exclusive概念;
exclusive:代表队列是否只属于当前用户,其他用户不可见,默认false

public Queue(String name) {
		this(name, true, false, false);
	}
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
		this(name, durable, exclusive, autoDelete, null);
	}

routingkey绑定不同队列

/**
     * 路由键相同测试
     * @throws Exception
     */
    public static void routingKeyTest() throws Exception {
        System.out.println("\n\n\n");
        logger.info(">>测试routingKey开始..");
        final String exchangeName = "TEST_EXCHANGE";
        final String queueName1 = "TEST_QUEUENAME1";
        final String queueName2 = "TEST_QUEUENAME2";
        final String routingKey = "TESTT_ROUTINGKEY";
        /**声明交换机和队列*/
        DirectExchange exchange = new DirectExchange(exchangeName);
        Queue queue = new Queue(queueName1);
        Queue queue2 = new Queue(queueName2);
        
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        admin.declareExchange(exchange);
        admin.declareQueue(queue);
        admin.declareQueue(queue2);
        
        /**进行绑定*/
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
        admin.declareBinding(BindingBuilder.bind(queue2).to(exchange).with(routingKey));
        
        /**对exchange 和routingkey 发送消息*/
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        for (int i = 0; i < 5; i++) {
            String msg = "hello  " + i;
            System.out.println("发送:" + msg);
            template.convertAndSend(exchangeName, routingKey, msg);
        }
        
        /**避免延迟情况下,统计队列大小出错*/
        Thread.sleep(3000);
        RabbitAdmin consumerAdmin = new RabbitAdmin(cachingConnectionFactory);
        Integer size = (Integer) consumerAdmin.getQueueProperties(queueName1).get("QUEUE_MESSAGE_COUNT");
        Integer size2 = (Integer) consumerAdmin.getQueueProperties(queueName2).get("QUEUE_MESSAGE_COUNT");
        
        logger.info(queueName1 + "队列大小:" + size);
        logger.info(queueName2 + "队列大小:" + size2);
        
    }

通过测试发现,同一个交换机同一个routingKey绑定不同的队列,后者会覆盖前者。
简单的说:一个交换机上只能有唯一一个名字的路由键。

持久化


    /**
     *非持久化测试
     * @throws Exception
     */
    public static void unDurableTest() throws Exception {
        System.out.println("\n\n\n");
        logger.info(">>测试uddurableTest开始..");
        final String exchangeName = "TEST_DURABLE_EXCHANGE";
        final String queueName1 = "TEST_DURABLE_QUEUENAME";
        final String routingKey = "TEST_DURABLE_ROUTINGKEY";
        /**
         * 声明交换机和队列
         * 交换机和队列默认是持久化的,声明的时候选择非默认
         */
        DirectExchange exchange = new DirectExchange(exchangeName, false, false);
        Queue queue = new Queue(queueName1, false, false, false);
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        admin.declareExchange(exchange);
        admin.declareQueue(queue);
        /**进行绑定*/
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
        /**对exchange 和routingkey 发送消息*/
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        for (int i = 0; i < 5; i++) {
            String msg = "hello  " + i;
            System.out.println("发送:" + msg);
            template.convertAndSend(exchangeName, routingKey, msg);
        }
    }
    
    /**
     *持久化测试
     * @throws Exception
     */
    public static void durableTest() throws Exception {
        System.out.println("\n\n\n");
        logger.info(">>测试durableTest开始..");
        final String exchangeName = "TEST_DURABLE_EXCHANGE";
        final String queueName1 = "TEST_DURABLE_QUEUENAME";
        final String routingKey = "TEST_DURABLE_ROUTINGKEY";
        /**
         * 声明交换机和队列
         * 交换机和队列默认是持久化的,声明的时候选择非默认
         */
        DirectExchange exchange = new DirectExchange(exchangeName, false, false);
        Queue queue = new Queue(queueName1);
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        admin.declareExchange(exchange);
        admin.declareQueue(queue);
        /**进行绑定*/
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
        /**对exchange 和routingkey 发送消息*/
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        for (int i = 0; i < 5; i++) {
            String msg = "hello  " + i;
            System.out.println("发送:" + msg);
            template.convertAndSend(exchangeName, routingKey, msg);
        }
    }

发送完毕后重启mq: net stop rabbitmq & net start rabbitmq
可以看到未持久化的消息丢失,但持久化的没有;
需要注意的是,MQ的持久化并不是一条条持久化的,这样效率太慢,他应该是隔一段时间统一持久化一批。

消息确认ACK

消息的应答完全是消费端的动作,发送端和服务端无需进行任何特殊配置。

 /**
     *Message acknowledgment消息应答测试
     * @throws Exception
     */
    public static void ackTest() throws Exception {
        System.out.println("\n\n\n");
        logger.info(">>测试ack开始..");
        
        final String exchangeName = "TEST_ACK_EXCHANGE";
        final String queueName = "TEST_ACK_QUEUENAME";
        final String routingKey = "TEST_ACK_ROUTINGKEY";
        
        DirectExchange exchange = new DirectExchange(exchangeName);
        Queue queue = new Queue(queueName);
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        admin.declareExchange(exchange);
        admin.declareQueue(queue);
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
        ;
        
        /**对exchange 和routingkey 发送消息*/
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        for (int i = 0; i < 5; i++) {
            String msg = "hello  ack " + i;
            System.out.println("发送:" + msg);
            template.convertAndSend(exchangeName, routingKey, msg);
        }
        logger.info("程序睡眠3s..");
        Thread.sleep(3000);
        RabbitAdmin consumerAdmin = new RabbitAdmin(cachingConnectionFactory);
        Integer size = (Integer) consumerAdmin.getQueueProperties(queueName).get("QUEUE_MESSAGE_COUNT");
        logger.info("当前队列大小:" + size);
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
        container.setQueueNames(queueName);
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("接收 : " + new String(body));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
            }
        });
        container.start();
        Thread.sleep(3000);
        container.stop();
    }

如果注释: channel.basicAck(message.getMessageProper...
可以在管理界面看到unack的消息

消息过期TTL

TTL:Time-To-Live Extensions
在发送消息时,我们可以设置消息的过期时间,消息过期后MQ会丢弃消息 。
消息过期可以从三个方面设置:

  • 对队列中消息统一设置
    在队列属性上面统一设置消息的过期时间,时间过后队列会丢弃消息,并且队列相应的大小会进行变化。

  • 单个消息设置
    在消息头属性上面设置过期时间,即使消息过期,也不会马上从队列中抹去,所以此时统计队列大小信息不会发生变化;而这种设置主要是让消费端去判断。

  • 设置队列过期时间
    设置队列过期时间,而不是队列中的消息,队列过期后会自动被删除。


以下是JAVA代码

对队列中消息统一设置过期时间:

 /**
     *TODO:TTL:Time-To-Live Extensions RabbitMQ消息过期时间 测试
     * 对队列中所有的设置过期时间
     * Per-Queue Message TTL
     */
    public static void testTTLForQueueMsg() throws Exception {
        System.out.println("\n\n\n");
        logger.info(">>ttl 队列测试开始..");
        final String exchangeName = "TEST_TTL_EXCHANGE";
        final String queueName = "TEST_TLT_QUEUENAME";
        final String routingKey = "TEST_TTL_ROUTINGKEY";
        
        DirectExchange exchange = new DirectExchange(exchangeName);
        /**这设置过期时间*/
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 5000);
        Queue queue = new Queue(queueName, false, false, false, arguments);
        
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        admin.declareExchange(exchange);
        admin.declareQueue(queue);
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
        
        /**发送*/
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        for (int i = 0; i < 5; i++) {
            String msg = "hello ttl " + i;
            System.out.println("发送:" + msg);
            template.convertAndSend(exchangeName, routingKey, msg);
        }
        logger.info(">>发送完毕开始计算大小...");
        Thread.sleep(3000);
        RabbitAdmin consumerAdmin = new RabbitAdmin(cachingConnectionFactory);
        Integer size = (Integer) consumerAdmin.getQueueProperties(queueName).get("QUEUE_MESSAGE_COUNT");
        logger.info(">>过期时间过前,队列大小:" + size);
        Thread.sleep(4000);
        size = (Integer) consumerAdmin.getQueueProperties(queueName).get("QUEUE_MESSAGE_COUNT");
        logger.info(">>过期时间过后,队列大小:" + size);
        
        logger.info(">>ttl 队列测试完毕...");
        
    }

单条消息设置过期时间:

    /***
     * TODO:单条消息设置过期时间
     * 注意!!即使消息过期,也不会马上从队列中抹去
     * @throws Exception
     */
    public static void testTTLForSingleMsg() throws Exception {
        System.out.println("\n\n\n");
        logger.info(">>ttl消息开始..");
        final String exchangeName = "TEST_TTL_EXCHANGE";
        final String queueName = "TEST_TLT_QUEUENAME";
        final String routingKey = "TEST_TTL_ROUTINGKEY";
        
        DirectExchange exchange = new DirectExchange(exchangeName, false, false);
        Queue queue = new Queue(queueName, false, false, false);
        
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        admin.declareExchange(exchange);
        admin.declareQueue(queue);
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
        
        /**发送*/
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        for (int i = 0; i < 5; i++) {
            String msg = "hello ttl " + i;
            System.out.println("发送:" + msg);
            template.convertAndSend(exchangeName, routingKey, msg);
        }
        
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
        messageProperties.setExpiration("3000");
        String msg = "测试过期时间";
        Message message = new Message(msg.getBytes(), messageProperties);
        template.convertAndSend(exchangeName, routingKey, message);
        
        logger.info(">>发送完毕开始计算大小...");
        /***
         * 对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,
         * 而第二种方法里,即使消息过期,也不会马上从队列中抹去,
         * 因为每条消息是否过期时在即将投递到消费者之前判定的,为什么两者得处理方法不一致?
         * 因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期消息即可,
         * 而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息,势必要扫描整个队列,
         * 所以不如等到此消息即将被消费时再判定是否过期,如果过期,再进行删除
         */
        Thread.sleep(3000);
        RabbitAdmin consumerAdmin = new RabbitAdmin(cachingConnectionFactory);
        Integer size = (Integer) consumerAdmin.getQueueProperties(queueName).get("QUEUE_MESSAGE_COUNT");
        logger.info(">>过期时间过前,队列大小:" + size);
        Thread.sleep(3000);
        size = (Integer) consumerAdmin.getQueueProperties(queueName).get("QUEUE_MESSAGE_COUNT");
        logger.info(">>过期时间过后,队列大小:" + size);
        
        logger.info(">>ttl 消息测试完毕...");
        
    }

给队列设置过期时间,到期队列自动删除

    /***
     * Queue TTL
     * TODO:设置队列过期时间,而不是队列中的消息,队列过期后会自动被删除
     * queue.declare 命令中的 x-expires 参数控制 queue 被自动删除前可以处于未使用状态的时间。
     * 未使用的意思是 queue上没有任何 consumer ,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。
     * 该方式可用于,例如,RPC-style 的回复 queue ,其中许多 queue 会被创建出来,但是却从未被使用。
     * @throws Exception
     */
    
    public static void testTTLForQueue() throws Exception {
        System.out.println("\n\n\n");
        logger.info(">>ttl 队列测试开始..");
        final String exchangeName = "TEST_TTL_EXCHANGE";
        final String queueName = "TEST_TLT_QUEUENAME";
        final String routingKey = "TEST_TTL_ROUTINGKEY";
        
        DirectExchange exchange = new DirectExchange(exchangeName);
        /**这设置过期时间*/
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-expires", 5000);
        Queue queue = new Queue(queueName, false, false, false, arguments);
        
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        admin.declareExchange(exchange);
        admin.declareQueue(queue);
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
        
        /**发送*/
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        for (int i = 0; i < 5; i++) {
            String msg = "hello ttl " + i;
            System.out.println("发送:" + msg);
            template.convertAndSend(exchangeName, routingKey, msg);
        }
        logger.info(">>发送完毕开始计算大小...");
        Thread.sleep(3000);
        RabbitAdmin consumerAdmin = new RabbitAdmin(cachingConnectionFactory);
        Integer size = (Integer) consumerAdmin.getQueueProperties(queueName).get("QUEUE_MESSAGE_COUNT");
        logger.info(">>过期时间过前,队列大小:" + size);
        Thread.sleep(10000);
        size = (Integer) consumerAdmin.getQueueProperties(queueName).get("QUEUE_MESSAGE_COUNT");
        logger.info(">>过期时间过后,队列大小:" + size);
        
        logger.info(">>ttl 队列测试完毕...");
        
    }

消息过期转发DLX

DLX,Dead-Letter-Exchange
当消息过期后,可以设置消息转发到其他队列。

public static void testDLX() throws Exception {
        System.out.println("\n\n\n");
        logger.info(">>dlx 队列测试开始..");
        final String exchangeName = "TEST_dlx_EXCHANGE";
        final String queueName = "TEST_DLX_QUEUENAME1";
        final String queueName2 = "TEST_DLX_QUEUENAME2";
        final String routingKey = "TEST_DLX_ROUTINGKEY1";
        final String routingKey2 = "TEST_DLX_ROUTINGKEY2";
        
        DirectExchange exchange = new DirectExchange(exchangeName);
        
        Map<String, Object> arguments = new HashMap<>();
        /**设置过期时间*/
        arguments.put("x-message-ttl", 5000);
        /**设置死信队列*/
        arguments.put("x-dead-letter-exchange", exchangeName);
        arguments.put("x-dead-letter-routing-key", routingKey2);
        Queue queue = new Queue(queueName, false, false, false, arguments);
        Queue queue2 = new Queue(queueName2, false, false, false, null);
        
        RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory);
        admin.declareExchange(exchange);
        admin.declareQueue(queue);
        admin.declareQueue(queue2);
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
        admin.declareBinding(BindingBuilder.bind(queue2).to(exchange).with(routingKey2));
        
        /**发送*/
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        for (int i = 0; i < 5; i++) {
            String msg = "hello ttl " + i;
            System.out.println("发送:" + msg);
            template.convertAndSend(exchangeName, routingKey, msg);
        }
        logger.info(">>发送完毕开始计算大小...");
        Thread.sleep(3000);
        RabbitAdmin consumerAdmin = new RabbitAdmin(cachingConnectionFactory);
        Integer size = (Integer) consumerAdmin.getQueueProperties(queueName).get("QUEUE_MESSAGE_COUNT");
        logger.info(">>过期时间过前,队列大小:" + size);
        Thread.sleep(4000);
        size = (Integer) consumerAdmin.getQueueProperties(queueName).get("QUEUE_MESSAGE_COUNT");
        logger.info(">>过期时间过后,队列大小:" + size);
        Thread.sleep(5000);
        size = (Integer) consumerAdmin.getQueueProperties(queueName2).get("QUEUE_MESSAGE_COUNT");
        logger.info(">>死信队列大小:" + size);
        
        logger.info(">>dlx 队列测试完毕...");
        
    }
标签: RabbitMQ Java
共有 人打赏支持
粉丝 0
博文 2
码字总数 6516
×
阿荒
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: