一、基本介绍
RabbitMQ是一个实现了AMQP协议(是一个异步消息传递所使用的应用层协议规范)标准的开源消息代理和队列服务器,它是企业级消息系统,自带了集群、管理、插件等特性。学习RabbitMQ需要理解下面几个概念:
- 消息(Message):
-
有效载荷(Payload),也就是要传输的数据,数据类型可以是纯文本也可以是json
-
标签(Label),它包含交换机的名字和可选的主题(topic)标记等,AMQP仅仅描述了标签,而RabbitMQ决定了把这个消息发给哪个消费者。
-
生产者(Producer):创建消息并且设置标签。
-
消费者(Consumer): 消费者连接到代理服务器上,接收消息的有效载荷(消费者并不需要消息中的标签)
二、工作流程
AMQP工作流程图如下所示:
交换机就像邮局,通过它做路由分发,交换机将收到的消息根据路由分发规则分发给绑定的队列。
队列将消息投递给了订阅此队列的消费者或者消费者主动获取
为了保证消息被正确取出并执行,消息投递失败后会重发,AMQP包含了一个消息确认的概念:当一个消息从队列中投递给消费者后,消费者会通知消息代理(Broker),这个通知可以是自动完成的,也可以由处理消息的应用来执行。当消息确认被启用的时候,消息代理不会完全将消息从队列中删除,除非收到来自消费者的确认回执。
交换机拿到一个消息之后会将它路由给消息队列。它使用哪种路由算法是由交换机类型和被称作绑定(queue_bind)的规则所决定的,目前RabbitMQ提供了如下四种交换机。
- 直连交换机(direct exchange):根据消息携带的路由键(routing key)将消息投递给对应队列。将一个队列绑定到某个交换机的同时赋予该绑定一个路由键,当一个携带着路由键为XXX的消息发送给直连交换机的时,交换机会把它路由给绑定值同样为XXX的队列。直连交换机用来处理消息的单播路由。
- 主题交换机(topic exchange): 通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机通常用来实现消息的多播路由。发送到主题交换机的消息的路由键,必须是一个“.”分隔的词语列表,这些词语应该和对应的业务相关联,词语的个数可以随意,但是不要超过255字节。绑定键支持通配符:“*“用来表示一个单词,”#“用来表示任意数量单词。
- 扇形交换机(fanout exchange):将消息路由给绑定到它身上的所有队列,且不理会绑定的路由键。扇形交换机用来处理消息的广播路由。扇形交换机允许对单条消息做不同的处理,另外可以把一个消息发给多个任务队列,执行不一样的工作。使用扇形交换机可以有效的解耦生产者和消费者
- 头交换机(headers exchange): 允许匹配AMQP的头而非路由键,使用起来和直连交换机差不多,但是性能却差很多,一般很少用到。
三、安装启动
- Ubuntu下安装
sudo apt-get install rabbitmq-server
sudo service rabbitmq-server start
- Centos下安装
yum -y install rabbitmq-server
systemctl start rabbitmq-server
四、代码演示
由于采用python来演示流程,所以需要先安装RabbitMQ的python客户端,最常用的客户端是pika,代码支持Python 2.X和3.X
pip install pika
生产者producer.py
#!/usr/bin/env python
# -*-coding:utf-8 -*-
import sys
import pika
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672')
connection = pika.BlockingConnection(parameters) # connection 就是消息代理
channel = connection.channel() # 获得信道
# 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机
channel.exchange_declare(exchange='web_develop', exchange_type='direct',
passive=False, durable=True, auto_delete=False)
if len(sys.argv) != 1:
msg = sys.argv[1] # 使用命令行参数为消息内容
else:
msg = 'hello world'
# 创建一个消息,delivery_mode为2表示让这个消息持久化,重启RabbitMQ也不会丢失
props = pika.BasicProperties(content_type='text/plain', delivery_mode=2)
# basic_publish 表示发送路由键为xxx_routing_key,消息为hello world 的消息给web_develop 这个交换机
channel.basic_publish('web_develop', 'xxx_routing_key', msg, properties=props)
connection.close() # 关闭连接
消费者consumer.py
#!/usr/bin/env python
# -*-coding:utf-8 -*-
import pika
# 处理接收到的消息的回调函数,method_frame 携带了投递标记,header_frame表示AMQP信息头的对象,body为消息实体
def on_message(channel, method_frame, header_frame, body):
# 消息确认,确认之后才会删除消息并给消费者发送新的消息
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
print(body)
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672')
connection = pika.BlockingConnection(parameters) # connection 就是消息代理
channel = connection.channel() # 获得信道
# 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机
channel.exchange_declare(exchange='web_develop', exchange_type='direct',
passive=False, durable=True, auto_delete=False)
# 声明队列,如果没有就创建
channel.queue_declare(queue='standard', auto_delete=True)
# 通过路由键将队列和交换机绑定
channel.queue_bind(queue='standard', exchange='web_develop', routing_key='xxx_routing_key')
# 订阅队列
channel.basic_consume(on_message, 'standard')
try:
# 开始消费
channel.start_consuming()
except KeyboardInterrupt:
# 停止消费
channel.stop_consuming()
# 关闭连接
connection.close()
先启动消费者
python consumer.py
另开一个终端后,启动生产者
python producer.py
可以对生产者的例子进行改动,让它支持消息确认。支持的原理是确保basic_publish的返回值为True。
生产者producer_with_confirm.py
#!/usr/bin/env python
# -*-coding:utf-8 -*-
import sys
import pika
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672')
connection = pika.BlockingConnection(parameters) # connection 就是消息代理
channel = connection.channel() # 获得信道
# 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机
channel.exchange_declare(exchange='web_develop', exchange_type='direct',
passive=False, durable=True, auto_delete=False)
if len(sys.argv) != 1:
msg = sys.argv[1] # 使用命令行参数为消息内容
else:
msg = 'hello world'
# 创建一个消息,delivery_mode为2表示让这个消息持久化,重启RabbitMQ也不会丢失
props = pika.BasicProperties(content_type='text/plain', delivery_mode=2)
if channel.basic_publish('web_develop', 'xxx_routing_key', msg, properties=props):
print('Message publish was confirmed!')
else:
print('Message could not be confirmed!')
connection.close() # 关闭连接
重新执行新的生产者代码发现,当消费者收到消息后,生产者这边会打印出Message publish was confirmed!。