文档章节

rabbitMQ 在spring boot的使用

狼王黄师傅
 狼王黄师傅
发布于 01/22 21:54
字数 3209
阅读 8
收藏 0

RabbitMQ介绍

    RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种。

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

 

相关概念

    通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange).。

    这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

  • 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。
  • 中间即是 RabbitMQ,其中包括了 交换器 和 队列。
  • 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。

    其中比较重要的概念有 4 个,分别为:虚拟主机,交换器,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换器、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换器/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
  • 交换器:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
    这里有一个比较重要的概念:路由键 。消息到交换器的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
  • 绑定:也就是交换器需要和队列相绑定,这其中如上图所示,是多对多的关系。

交换器(Exchange)

    交换器的功能主要是接收消息并且转发到绑定的队列,交换器不存储消息,在启用ack模式后,交换器找不到队列会返回错误。

    交换器有四种类型:Direct、topic、Headers、Fanout

  • Direct:direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
  • Topic:按规则转发消息(最灵活)
  • Headers:设置header attribute参数类型的交换器
  • Fanout:转发消息到所有绑定队列

 

    Direct Exchange是RabbitMQ默认的交换器模式,也是最简单的模式,根据key全文匹配去寻找队列。

    

    当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。

    X - Q1 就一个 binding key,名字为 orange;

    X - Q2 有 2 个 binding key,名字为 black 和 green。

 

    Topic Exchange 转发消息主要是根据通配符。 

    在这种交换器下,队列和交换器的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换器才能转发消息。

  • 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
  • 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin开头的路由键都是可以的。

    具体代码发送的时候还是一样,第一个参数表示交换器,第二个参数表示routing key,第三个参数即消息。如下:

rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is  RabbitMQ!");

    topic 和 direct 类似, 只是匹配上支持了"模式", 在"点分"的 routing_key 形式中, 可以使用两个通配符:

  • *表示一个词.
  • #表示零个或多个词.

 

    headers Exchange 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型.

    在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

 

    Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列。即使 routing_key配置了,也会被忽略。

 

springboot集成RabbitMQ

 1.添加依赖:

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置文件

    配置rabbitmq的安装地址、端口以及账户信息:

spring.rabbitmq.host=192.168.0.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

3.声明队列和交换器,并各自绑定。

package com;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqDemoApplication {
    /***************************************队列***********************************************/
    @Bean
    public Queue helloQueue() {
        return new Queue("helloQueue");
    }

    @Bean
    public Queue topicMessage() {
        return new Queue("topicMessage");
    }

    @Bean
    public Queue topicMessages() {
        return new Queue("topicMessages");
    }

    @Bean
    public Queue fanoutA() {
        return new Queue("fanoutA");
    }

    @Bean
    public Queue fanoutB() {
        return new Queue("fanoutB");
    }

    @Bean
    public Queue fanoutC() {
        return new Queue("fanoutC");
    }
    /***************************************两种exchange***********************************************/
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    /***************************************将队列和exchange绑定***********************************************/

    /**
     * 将队列topicMessage与topicExchange绑定,
     * 只有路由键 为topic.Message才能匹配,
     * 将消息转发到绑定的Queue
     * @param topicMessage
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue topicMessage, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicMessage).to(topicExchange).with("topic.Message");
    }

    /**
     * 将队列topicMessages与topicExchange绑定,
     * 以topic开头的路由键 均会模糊匹配,
     * 将消息转发到绑定的Queue
     * @param topicMessages
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue topicMessages, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicMessages).to(topicExchange).with("topic.#");
    }

    /**
     * 将队列fanoutA与fanoutExchange绑定
     * 
     * @param fanoutA
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingExchangeA(Queue fanoutA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutA).to(fanoutExchange);
    }

    /**
     * 将队列fanoutB与fanoutExchange绑定
     *
     * @param fanoutB
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingExchangeB(Queue fanoutB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutB).to(fanoutExchange);
    }

    /**
     * 将队列fanoutC与fanoutExchange绑定
     *
     * @param fanoutC
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingExchangeC(Queue fanoutC, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutC).to(fanoutExchange);
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemoApplication.class, args);
    }
}

4.实现发送消息类:

    实体类(测试对象的发送和接收):

package com.demo.model;

import java.io.Serializable;

/**
 * @Description:实体类
 */
public class User implements Serializable {
    private String userName;

    private String password;

    private String sex;

    private String level;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public String getLevel() {
        return level;
    }

    public void setLevel(String level) {
        this.level = level;
    }
}

   生产者1:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description:
 * 生产者1
 */
@Component
public class Sender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
     * 测试发送普通消息,全文检索名称为 helloQueue 的队列
     */
    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    /**
     * 测试发送实体类,全文检索名称为 helloQueue 的队列
     * @param user
     */
    public void sendUser(User user){
        System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }

    /**
     * 测试发送普通文本消息,
     * 在topicExchange 交换器上,路由键:topic.Message、topic.Messages将依次寻找与绑定键 匹配的队列
     */
    public void testTopPicMessage() {
        String msg = "sendTopPicMessage";
        System.out.println("sendTopPicMessage1:" + msg);
        //第一个参数:指定了exchange
        //第二个参数:指定了接受消息的栏目名
        //第三个参数:消息内容
        //到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息
        rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合

        msg = "sendTopPicMessages";
        System.out.println("sendTopPicMessages1:" + msg);
        rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
    }

    /**
     * 测试发送普通文本消息到fanoutExchange 交换器绑定的队列上
     */
    public void testFanoutMessage(){
        String sendMsg = "sendFanoutMessage";
        System.out.println("fanout Sender1:" + sendMsg);
        //第二个参数不会进行正则表达式的过滤
        //但是必须要填,才能根据exchange找到相关Queue
        rabbitTemplate.convertAndSend("fanoutExchange","", sendMsg);
    }
}

    生产者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description: 生产者2
 */
@Component
public class Sender2 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
     * 测试发送普通消息,全文检索名称为 helloQueue 的队列
     */
    public void send() {
        String sendMsg = "hello2 " + new Date();
        System.out.println("Sender2:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    /**
     * 测试发送实体类,全文检索名称为 helloQueue 的队列
     *
     * @param user
     */
    public void sendUser(User user) {
        System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }

    /**
     * 测试发送普通文本消息,
     * 在topicExchange 交换器上,路由键:topic.Message、topic.Messages将依次寻找与绑定键 匹配的队列
     */
    public void testTopPicMessage() {
        String msg = "sendTopPicMessage";
        System.out.println("sendTopPicMessage2:" + msg);
        //第一个参数:指定了exchange
        //第二个参数:指定了接受消息的栏目名
        //第三个参数:消息内容
        //到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息
        rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合

        msg = "sendTopPicMessages";
        System.out.println("sendTopPicMessages2:" + msg);
        rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
    }

    /**
     * 测试发送普通文本消息到fanoutExchange 交换器绑定的队列上
     */
    public void testFanoutMessage() {
        String sendMsg = "sendFanoutMessage";
        System.out.println("fanout Sender2:" + sendMsg);
        //第二个参数不会进行正则表达式的过滤
        //但是必须要填,才能根据exchange找到相关Queue
        rabbitTemplate.convertAndSend("fanoutExchange", "", sendMsg);
    }
}

5.实现消费者:

    Direct Exchange 消费者1:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * helloQueue消费者1
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver1:" + hello);
    }

    @RabbitHandler
    public void processUser(User user) {
        System.out.println("user receive1:" + user.getUserName()+"/"+user.getPassword());
    }
}

    Direct Exchange 消费者2:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * helloQueue消费者2
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver2 {
    @RabbitHandler
    public void process(String mesg) {
        System.out.println("Receiver2:" + mesg);
    }

    @RabbitHandler
    public void processUser(User user) {
        System.out.println("user receive2:" + user.getUserName()+"/"+user.getPassword());
    }
}

    Topic Exchange 消费者1:

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * topicMessage消费者
 */
@Component
//监听topicMessage队列
@RabbitListener(queues = "topicMessage")
public class TopMessageReceiver {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("topMessageReceiver:" +msg);
    }
}

    Topic Exchange 消费者2:

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * topicMessages消费者
 */
@Component
//监听topicMessages队列
@RabbitListener(queues = "topicMessages")
public class TopMessagesReceiver {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("topMessagesReceiver:" +msg);
    }
}

    Fanout Exchange 消费者1:

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * fanoutA消费者
 */
@Component
//监听fanoutA队列
@RabbitListener(queues = "fanoutA")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverA:" + msg);
    }

}

    Fanout Exchange 消费者2:

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * fanoutB消费者
 */
@Component
//监听fanoutB队列
@RabbitListener(queues = "fanoutB")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverB:" + msg);
    }

}

    Fanout Exchange 消费者3:

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * fanoutC消费者
 */
@Component
//监听fanoutC队列
@RabbitListener(queues = "fanoutC")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverC:" + msg);
    }

}

6.测试类:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 测试类
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @Autowired
    private Sender2 helloSender2;

    /**
     * 发送普通文本消息
     *
     * @return
     */
    @RequestMapping("/hello")
    public String hello() {
        //两个生产者
        helloSender1.send();
        helloSender2.send();
        return "ok";
    }

    /**
     * 发送Object
     *
     * @return
     */
    @RequestMapping("/user")
    public String user() {
        User user = new User();
        user.setUserName("a");
        user.setPassword("1");
        user.setSex("m");
        user.setLevel("1");

        //两个生产者
        helloSender1.sendUser(user);
        helloSender2.sendUser(user);
        return "ok";
    }

    /**
     * 发送普通文本消息
     *
     * @return
     */
    @RequestMapping("/topMessage")
    public String topMessage() {
        //两个生产者
        helloSender1.testTopPicMessage();
        helloSender2.testTopPicMessage();
        return "ok";
    }

    /**
     * 发送普通文本消息
     *
     * @return
     */
    @RequestMapping("/fanoutMessage")
    public String fanoutMessage() {
        //两个生产者
        helloSender1.testFanoutMessage();
        helloSender2.testFanoutMessage();
        return "ok";
    }
}

测试结果:

  • /hello的结果为:两个消费者分摊两个生产者的消息。因为使用的是默认的交换器,一条消息只能被一个队列接收到,而这两个生产者都监听了这一个队列,因此两个消费者分摊。
  • /user的结果为:和 /hello 一样。证明了 此处支持对象的发送和接收,只是实体类 必须实现序列化接口。
  • /topMessage的结果为:

        routing_key是“topic.message” 时,exchange绑定的binding_key:“topic.message”,topic.#都符合路由规则;

        所以发送的消息,两个队列都能接收到,而两个消费者各自监听了一个队列,因此消息不会被分摊;

        routing_key是“topic.messages”,exchange绑定的binding_key 只有 “topic.#”符合路由规则;

        发送的消息,只有队列topic.messages能收到,只有一个消费者监听了这个队列,因此消息不会被分摊。

  • /fanoutMessage的结果为:就算fanoutSender发送消息的时候,指定了routing_key,但所有绑定的队列都能接受到了消息,而这三个消费者都各自监听了一个队列,因此消息不会被分摊。

 

总结:

    一条消息只能被监听该队列的某一个消费者消费。

    默认的交换器:队列名称必须和routing_key 一致。

    topic 交换器:绑定的队列中,只要消息的routing_key满足队列的binding_key的路由规则,该队列即可接收到消息。

    fanout 交换器:绑定的队列均能收到消息,不会进行路由匹配这一过程。

© 著作权归作者所有

共有 人打赏支持
狼王黄师傅
粉丝 13
博文 253
码字总数 536128
作品 0
成都
程序员
私信 提问
RabbitMQ 入门 与 RabbitMQ 在 Spring Boot 中的使用

Message Broker与AMQP简介 Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集...

Anur
2018/09/13
0
0
Spring Boot整合RabbitMQ实例

什么是消息? 消息是一个或者多个实体之间沟通的一种方式并且无处不在。 自从计算机发明以来,计算机以多种多样的方式发送消息,消息定义了软硬件或者应用程序之间的沟通方式。消息总是有一个...

英雄有梦没死就别停
2018/06/27
0
0
消息队列1:RabbitMQ解析并基于Springboot实战

目录 RabbitMQ简介 RabitMQ 概念模型 Exchange 类型 代码实战 RabbitMQ简介 AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此...

养码青年
2018/09/07
0
0
企业级 Spring Boot 教程(十五)Springboot整合RabbitMQ

这篇文章带你了解怎么整合RabbitMQ服务器,并且通过它怎么去发送和接收消息。我将构建一个springboot工程,通过RabbitTemplate去通过MessageListenerAdapter去订阅一个POJO类型的消息。 准备...

itcloud
2018/10/30
0
0
精通Spring Boot——第八篇:整合RabbitMQ消息队列

今天来和朋友们一起学习下,SpringBoot怎么整合RabbitMQ。目前消息组件大致有三种:.activemq, rabbitmq, kafka。这三者各有优缺点,RabbitMQ相比之下是处于其他二者之间的一个消息组件。Rab...

developlee的潇洒人生
2018/10/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring Cloud 入门教程9、服务限流/API限流(Zuul+RateLimiter)

一、前言 1、什么是RateLimiter、Spring Cloud Zuul RateLimiter? RateLimiter是Google开源的实现了令牌桶算法的限流工具(速率限制器)。http://ifeve.com/guava-ratelimiter/ Spring Clou...

吴伟祥
15分钟前
0
0
win10多人同时远程桌面连接 教程

win10多人同时远程桌面连接 教程 1 新建用户 右键“此电脑”->管理->本地用户和组 右键“用户”->新用户 点击【创建】按钮,即可完成用户创建。 2 添加远程桌面用户 右键“此电脑”->属性->远...

linjin200
26分钟前
2
0
本地运行Kmeans算法

参考资料链接:https://github.com/CraigCovey/spark-examples/blob/f8182a6736fd5293dfa03b023eb1423363ba6041/spark-1_6/scala/clustering/kmeans/kmeans_clustering_main.scala packag......

KYO4321
26分钟前
0
0
超长干货 | Kubernetes命名空间详解

K8s使用命名空间的概念帮助解决集群中在管理对象时的复杂性问题。在本文中,会讨论命名空间的工作原理,介绍常用实例,并分享如何使用命名空间来管理K8s对象。最后,介绍名为projects的Ranch...

RancherLabs
29分钟前
0
0
Syncfusion教程:在Xamarin.Forms中创建数据输入表单 (4)

下载Essential Studio for Xamarin最新版本 Essential Studio for Xamarin是全面的Xamarin.iOS、Xamarin.Android和Xamarin.Forms组件套包,包含最快的图表和网格。 分段控制不定开关 在进一步...

电池盒
29分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部