文档章节

docker环境下的RabbitMQ部署,Spring AMQP使用

 月冷X心寒
发布于 2016/11/18 16:32
字数 1410
阅读 9.4K
收藏 34

「深度学习福利」大神带你进阶工程师,立即查看>>>

AMQP简介

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦和通讯。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,具有很高的易用性和可用性。

在docker环境部署RabbitMQ

RabbitMQ是用 Erlang 编写的,直接部署的话需要先部署 Erlang 环境,比较麻烦。在 docker 环境下部署就比较简单了,直接使用rabbitmq官方提供的镜像即可。

登录 docker 节点,运行 docker pull rabbitmq:management,这里使用的是带 web 管理插件的镜像。

启动容器:

docker run -d --name rabbitmq --publish 5671:5671 \
 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \
rabbitmq:management

容器启动之后就可以访问web 管理端了 http://宿主机IP:15672,默认创建了一个 guest 用户,密码也是 guest

AMQP协议中的几个重要概念

  • Queue 是RabbitMQ的内部对象,用于存储消息。RabbitMQ中的消息只能存储在 Queue 中,消费者从 Queue 中获取消息并消费。
  • Exchange 生产者将消息发送到 Exchange,由 Exchange 根据一定的规则将消息路由到一个或多个 Queue 中(或者丢弃)。
  • Binding RabbitMQ中通过 BindingExchangeQueue 关联起来。
  • Binding key 在绑定(Binding) ExchangeQueue 的同时,一般会指定一个 binding key
  • Routing key 生产者在将消息发送给 Exchange 的时候,一般会指定一个 routing key,来指定这个消息的路由规则。 Exchange 会根据 routing keyExchange Type 以及 Binding key 的匹配情况来决定把消息路由到哪个 Queue
  • Exchange Types RabbitMQ常用的Exchange Type有 fanoutdirecttopicheaders 这四种。
    • fanout 这种类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,这时 Routing key 不起作用。
    • direct 这种类型的Exchange路由规则也很简单,它会把消息路由到那些 binding keyrouting key完全匹配的Queue中。
    • topic 这种类型的Exchange的路由规则支持 binding keyrouting key 的模糊匹配,会把消息路由到满足条件的Queue。 binding key 中可以存在两种特殊字符 *#,用于做模糊匹配,其中 * 用于匹配一个单词,# 用于匹配多个单词(可以是零个),单词以 .为分隔符。
    • headers 这种类型的Exchange不依赖于 routing keybinding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

使用Spring AMQP收发消息

新建一个maven工程,修改pom.xml引入 spring amqp 依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.4.2.RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

java 目录中创建一个包 demo ,在包中创建启动入口 SpringAmqpApplication.java

public static void main(String[] args) {
    ConfigurableApplicationContext context = SpringApplication.run(SpringAmqpApplication.class, args);
    Sender sender = context.getBean("sender", Sender.class);
    sender.sendMsg("测试Spring AMQP发送消息");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    context.close();
}

[@Bean](https://my.oschina.net/bean)
CachingConnectionFactory myConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setHost("10.47.160.238");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    return connectionFactory;
}

[@Bean](https://my.oschina.net/bean)
Exchange myExchange() {
    return ExchangeBuilder.topicExchange("test.topic").durable().build();
}

[@Bean](https://my.oschina.net/bean)
Queue myQueue() {
    return QueueBuilder.durable("myQueue").build();
}

[@Bean](https://my.oschina.net/bean)
public Binding myExchangeBinding(@Qualifier("myExchange") Exchange topicExchange,
                                 @Qualifier("myQueue") Queue queue) {
    return BindingBuilder.bind(queue).to(topicExchange).with("test.#").noargs();
}

@Bean
public RabbitTemplate myExchangeTemplate(CachingConnectionFactory myConnectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(myConnectionFactory);
    rabbitTemplate.setExchange("test.topic");
    rabbitTemplate.setRoutingKey("test.abc.123");
    return rabbitTemplate;
}

demo 包下创建 Sender.java

@Component
public class Sender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String content) {
        rabbitTemplate.convertAndSend(content);
        System.out.println("发送消息: '" + content + "'");
    }

}

demo 包下创建Receiver.java

@Component
public class Receiver {

    @RabbitListener(queues = "myQueue")
    public void processMessage(Message message) {
        byte[] body = message.getBody();
        System.out.println("收到消息: '" + new String(body) + "'");
    }

}

回头看下代码,在 SpringAmqpApplication.java 中创建了程序启动入口 main 方法,为了有时间把收到的消息打印出来,让主线程 sleep 了1秒,配置了几个和 RabbitMQ 相关的重要配置:

  • RabbitMQ 的连接 CachingConnectionFactory
  • 创建了一个名为 test.topic 并且类型为 topicExchange
  • 创建了一个名为 myQueueQueue
  • 把上边创建的 QueueExchange 进行了绑定,并指定 binding keytest.#
  • 最后还配置了一个spring封装的模板工具类 rabbitTemplate,指定了 ExchangeRouting key,用这个 rabbitTemplate 发送消息, 会把消息发送到名为 test.topicExchange,并且带有 Routing key test.abc.123

Sender.javaReceiver.java 的代码就比较简单了,在 sendMsg 方法中使用 rabbitTemplate 发送消息。在 processMessage 方法上加了一个 @RabbitListener(queues = "myQueue") 注解,指定从 myQueue 这个队列中获取消息。

运行 main 方法启动工程,可以看到控制台打印出了发送的消息和接收的消息。

demo源码 spring-amqp-demo

最后

RabbitMQ在 spring cloud 中做为消息总线,负责传递和分发系统消息,是非常重要的一个角色,spring cloud bus 动态加载配置就是使用消息总线,把重新拉去配置的消息分发到各个连接到消息总线的微服务。在 spring cloud steam 的消息驱动模型中同样使用了RabbitMQ。
不仅如此,RabbitMQ本身也是一个非常高效的消息服务器,可以用在服务之间异步调用,以及RPC远程调用(在消息头中增加 Reply Queue 来监听调用返回信息)。

粉丝 207
博文 13
码字总数 25192
作品 0
南京
架构师
私信 提问
加载中
此博客有 3 条评论,请先登录后再查看。
CDH5: 使用parcels配置lzo

一、Parcel 部署步骤 1 下载: 首先需要下载 Parcel。下载完成后,Parcel 将驻留在 Cloudera Manager 主机的本地目录中。 2 分配: Parcel 下载后,将分配到群集中的所有主机上并解压缩。 3 激...

cloud-coder
2014/07/01
6.8K
1
5分钟 maven3 快速入门指南

前提条件 你首先需要了解如何在电脑上安装软件。如果你不知道如何做到这一点,请询问你办公室,学校里的人,或花钱找人来解释这个给你。 不建议给Maven的服务邮箱来发邮件寻求支持。 安装Mav...

fanl1982
2014/01/23
1.2W
7
WSGI Web服务器--UV-Web

uv-web是一个轻量级的支持高并发的WSGI Web服务器,基于libuv构建,部分代码源于开源项目bjoern,本质是python的C扩展,所以适用于部署绝大部分 python web应用(如 Django) 特性 兼容 HTTP 1...

Jone.x
2013/03/04
1.7K
0
Terminal IDE

Terminal IDE 是一个运行于设备自身的命令行 java / android 开发包。 在定制终端和键盘环境里已经含配置好的 vim, bash 和 busybox。 所有这些程序都已经配置良好,相互间可以很好的交互。 ...

匿名
2012/11/08
1.4W
0
高效 Java Web 开发框架--JessMA

JessMA 是功能完备的高性能 Full-Stack Web 应用开发框架,内置可扩展的 MVC Web 基础架构和 DAO 数据库访问组件(内部已提供了 Hibernate、MyBatis 与 JDBC DAO 组件),集成了 Action 拦截...

伤神小怪兽
2012/11/13
9.2K
3

没有更多内容

加载失败,请刷新页面

加载更多

如何在IntelliJ中永久启用行号? - How can I permanently enable line numbers in IntelliJ?

问题: 如何在IntelliJ IDEA中永久启用行号? 解决方案: 参考一: https://stackoom.com/question/3Zn/如何在IntelliJ中永久启用行号 参考二: https://oldbug.net/q/3Zn/How-can-I-permane...

法国红酒甜
10分钟前
3
0
Docker镜像加速

vim /etc/docker/daemon.json # 将"registry-mirrors": ["https://......com"] (对应自己的加速地址)复制到文件中 # 重新加载文件和重启dockersudo systemctl daemon-reloadsudo syst......

codeccb
21分钟前
12
0
Android Studio使用lombok

参考:https://github.com/mplushnikov/lombok-intellij-plugin 使用@Setter/@Getter时,刚开始在Structure的函数列表里没有生成响应的函数,且调用set/get的地方也报红,但编译OK。 按网上的...

大熊猫
22分钟前
7
0
Stream数据流

Stream类基础操作 从JDK1.8开始提供了java.yitl.stream数据流的开发板,而Stream就是这个包中提供的一个接口,这个接口主要是通过函数式编程结构实现集合数据的分析,所以在Collection接口张...

哼着我的小调调
27分钟前
13
0
在视图控制器之间传递数据 - Passing Data between View Controllers

问题: I'm new to iOS and Objective-C and the whole MVC paradigm and I'm stuck with the following: 我是iOS和Objective-C以及整个MVC范例的新手,但我坚持以下几点: I have a view th......

fyin1314
40分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部