文档章节

Springboot 集成rabbitmq

江火似流萤
 江火似流萤
发布于 2017/11/02 11:54
字数 2066
阅读 92
收藏 3

     RabbitMQ是一款基于AMQP(消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、Erlang等。
对于RabbitMQ来说,生产者和消息队列之间存在隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列。消费者通过读取消息队列从而实现消息的发送和接收。


   交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.

  Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.

  topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.

  headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.

  Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.

消息队列两个用处:服务间解耦,缓解压力(削峰平谷);
RabbitMQ实现了AQMP协议,AQMP协议定义了消息路由规则和方式。生产端通过路由规则发送消息到不同queue,消费端根据queue名称消费消息。
RabbitMQ既支持内存队列也支持持久化队列,消费端为推模型,消费状态和订阅关系由服务端负责维护,消息消费完后立即删除,不保留历史消息。
(1)点对点
生产端发送一条消息通过路由投递到Queue,只有一个消费者能消费到。

(2)多订阅
当RabbitMQ需要支持多订阅时,发布者发送的消息通过路由同时写到多个Queue,不同订阅组消费不同的Queue。所以支持多订阅时,消息会多个拷贝。

2、安装RabbitMQ服务端
    (1)下载Erlang安装包:https://pan.baidu.com/s/1bEbYnc#list/path=%2F(百度网盘地址)
    (2)安装和配置RabbitMQ服务端,3.6.0版本:https://pan.baidu.com/s/1bEbYnc#list/path=%2F(百度网盘地址)
        (官方:安装Erland,通过官方下载页面http://www.erlang.org/downloads获取exe安装包,直接打开并完成安装。
         安装RabbitMQ,通过官方下载页面https://www.rabbitmq.com/download.html获取exe安装包。)
    (3)启用web管理插件:rabbitmq-plugins enable rabbitmq_management
    (4)启动RabbitMQ:chkconfig rabbitmq-server on  /sbin/service rabbitmq-server start
    (5)防火墙开通端口
# firewall-cmd --permanent --zone=public --add-port=5672/tcp
# firewall-cmd --permanent --zone=public --add-port=15672/tcp
# firewall-cmd --reload
(6)rabbitmq默认会创建guest账号,只能用于localhost登录页面管理员,本机访问地址:http://localhost:15672/

三.SpringBoot整合RabbitMQ(Topic转发模式)

  首先我们看发送端,我们需要配置队列Queue,再配置交换机(Exchange),再把队列按照相应的规则绑定到交换机上:


首先 配置pom.xml
    <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

接着在application.properties中,去编辑和RabbitMQ相关的配置信息,配置信息的代表什么内容根据键就能很直观的看出了.这里端口是5672,不是15672...15672是管理端的端口!

server.port=17080
#mq
spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=ncs
spring.rabbitmq.password=12345678
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/

我们看发送端,我们需要配置队列Queue,再配置交换机(Exchange),再把队列按照相应的规则绑定到交换机上:

package com.example.demo.conf;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by ningcs on 2017/10/30.
 */
@Configuration
public class SenderConf {
    @Bean(name="message")
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean(name="messages")
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
    }
}

 

在SpringBoot中,我们使用AmqpTemplate去发送消息!代码如下:

package com.example.demo.sender;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Created by ningcs on 2017/10/30.
 */
@Component
public class HelloSender {
    @Autowired
    private AmqpTemplate template;

    public void send(String msg) {
        System.out.println(msg);
        template.convertAndSend("exchange","topic.message","hello,rabbit~");
    }
}

测试

package com.example.demo.controller;

import com.example.demo.sender.HelloSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
 * Created by ningcs on 2017/10/30.
 */
@RestController
@RequestMapping("rabbit")
public class RabbitController {

    @Autowired
    private HelloSender helloSender;

    @RequestMapping(value = "/hello",method = {RequestMethod.GET, RequestMethod.POST})
    public String helloSender(){
        helloSender.send("hello,rabbit~");
        return "发送成功";
    }
}

接收端(可以放在两个项目,配置文件一样):
package com.example.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Created by ningcs on 2017/10/30.
 */
@Component
public class HelloReceive {

//    @RabbitListener(queues="queue")    //监听器监听指定的Queue
//    public void processC(String str) {
//        System.out.println("Receive:"+str);
//    }

    @RabbitListener(queues="topic.message")    //监听器监听指定的Queue
    public void process1(String str) {
        System.out.println("message:"+str);
    }
    @RabbitListener(queues="topic.messages")    //监听器监听指定的Queue
    public void process2(String str) {
        System.out.println("messages:"+str);
    }

}

访问地址:http://localhost:17080/rabbit/hello
控制台输出以下日志说明成功:
2017-11-02 11:30:11.982  INFO 4476 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 17080 (http)
2017-11-02 11:30:11.994  INFO 4476 --- [           main] d.SpringbootRabbitTopicSenderApplication : Started SpringbootRabbitTopicSenderApplication in 12.292 seconds (JVM running for 12.829)
2017-11-02 11:31:06.712  INFO 4476 --- [io-17080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring FrameworkServlet 'dispatcherServlet'
2017-11-02 11:31:06.713  INFO 4476 --- [io-17080-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started
2017-11-02 11:31:06.729  INFO 4476 --- [io-17080-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 16 ms
hello,rabbit~
hello,rabbit~

2017-11-02 11:30:13.165  INFO 11064 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 17081 (http)
2017-11-02 11:30:13.170  INFO 11064 --- [           main] SpringbootRabbitTopicReceiverApplication : Started SpringbootRabbitTopicReceiverApplication in 13.48 seconds (JVM running for 17.121)
messages:hello,rabbit~
message:hello,rabbit~
messages:hello,rabbit~
message:hello,rabbit~

Fanout 和Direct模式详见最下面github地址。

术语解释

Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
RabbitMQ消息队列详细介绍(主要涉及术语)
http://blog.csdn.net/leyangjun/article/details/52529047

通信协议AMQP(Advanced Message Queuing Protocol)
AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。 消息队列的使用过程大概如下:
客户端连接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间建立好绑定关系。
客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

RabbitMQ消息队列-RabbitMQ的优劣势及产生背景
推荐博客:
http://blog.csdn.net/super_rd/article/details/70229714


详细代码访问github:https://github.com/ningcs/Springboot-rabbit-mq

© 著作权归作者所有

江火似流萤

江火似流萤

粉丝 11
博文 27
码字总数 29577
作品 0
南阳
程序员
私信 提问
SpringBoot | 第十二章:RabbitMQ的集成和使用

前言 上节讲了缓存数据库的使用,在实际工作中,一般上在系统或者应用间通信或者进行异步通知(登录后发送短信或者邮件等)时,都会使用消息队列进行解决此业务场景的解耦问题。这章节讲解下消...

oKong
2018/07/25
833
0
springboot 集成rabbitmq 并采用ack模式 以及封装队列定义

rabbitmq简介 rabbitmq 是spring所在公司Pivotal自己的产品 是基于AMQP高级队列协议的消息中间件 采用erlang开发 因此安装需要erlang环境 具体安装根据自己的环境 因为跟spring有共同的血缘关...

码农小胖哥
2018/02/27
10.2K
14
SpringBootBucket 1.0.0 发布,SprintBoot 全家桶

Spring Boot 现在已经成为Java 开发领域的一颗璀璨明珠,它本身是包容万象的,可以跟各种技术集成。 本项目对目前Web开发中常用的各个技术,通过和SpringBoot的集成,并且对各种技术通过“一...

一刀
2018/03/05
9K
17
恒宇少年/spring-boot-chapter

简书整套文档以及源码解析 专题 专题名称 专题描述 001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件 002 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解 003 Quer...

恒宇少年
2018/04/19
0
0
Spring Boot配置多套RabbitMQ

背景介绍: 为什么要单独来讲解SpringBoot如何配置RabbitMQ? 因为在项目中,有可能会用到多套RabbitMQ,如果只用一套那则用SpringBoot自带的配置就可以了,但多套则不行,需要自行配置。 说...

woter
2018/07/10
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

c++ 虚基类

c++ 虚基类 p556

天王盖地虎626
21分钟前
21
0
Java中的面向对象

一、面向对象 面向对象和面向过程的区别 过程就是函数,就是写方法,就是方法的一种实现。 对象就是将函数,属性的一种封装。用人们思考习惯的方式思考问题。 如何自定义类 修饰符 类名{ //成...

zhiruochujian
30分钟前
3
0
k8s删除Terminating状态的命名空间

背景: 我们都知道在k8s中namespace有两种常见的状态,即Active和Terminating状态,其中后者一般会比较少见,只有当对应的命名空间下还存在运行的资源,但是该命名空间被删除时才会出现所谓的...

Andy-xu
32分钟前
23
0
seata源码阅读笔记

seata源码阅读笔记 本文没有seata的使用方法,怎么使用seata可以参考官方示例,详细的很。 本文基于v0.8.0版本,本文没贴代码。 seata中的三个重要部分: TC:事务协调器,维护全局事务和分支...

东都大狼狗
45分钟前
14
0
Rust:最小化窗口后 CPU占用率高 (winit,glutin,imgui-rust)

最近试着用 imgui-rust 绘制界面,发现窗口最小化后CPU占用会增大。 查询的资料如下: https://github.com/rust-windowing/winit/issues/783 https://github.com/ocornut/imgui/issues/1151 ...

reter
49分钟前
27
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部