RabbitMQ系列一:初识RabbitMQ

原创
2017/11/15 17:20
阅读数 1.5K

一、基本介绍

RabbitMQ是一个实现了AMQP协议(是一个异步消息传递所使用的应用层协议规范)标准的开源消息代理和队列服务器,它是企业级消息系统,自带了集群、管理、插件等特性。学习RabbitMQ需要理解下面几个概念:

  1. 消息(Message):
  • 有效载荷(Payload),也就是要传输的数据,数据类型可以是纯文本也可以是json
    
  • 标签(Label),它包含交换机的名字和可选的主题(topic)标记等,AMQP仅仅描述了标签,而RabbitMQ决定了把这个消息发给哪个消费者。
    
  1. 生产者(Producer):创建消息并且设置标签。

  2. 消费者(Consumer): 消费者连接到代理服务器上,接收消息的有效载荷(消费者并不需要消息中的标签)

二、工作流程

AMQP工作流程图如下所示: 输入图片说明

交换机就像邮局,通过它做路由分发,交换机将收到的消息根据路由分发规则分发给绑定的队列。

队列将消息投递给了订阅此队列的消费者或者消费者主动获取

为了保证消息被正确取出并执行,消息投递失败后会重发,AMQP包含了一个消息确认的概念:当一个消息从队列中投递给消费者后,消费者会通知消息代理(Broker),这个通知可以是自动完成的,也可以由处理消息的应用来执行。当消息确认被启用的时候,消息代理不会完全将消息从队列中删除,除非收到来自消费者的确认回执。

交换机拿到一个消息之后会将它路由给消息队列。它使用哪种路由算法是由交换机类型和被称作绑定(queue_bind)的规则所决定的,目前RabbitMQ提供了如下四种交换机。

  1. 直连交换机(direct exchange):根据消息携带的路由键(routing key)将消息投递给对应队列。将一个队列绑定到某个交换机的同时赋予该绑定一个路由键,当一个携带着路由键为XXX的消息发送给直连交换机的时,交换机会把它路由给绑定值同样为XXX的队列。直连交换机用来处理消息的单播路由。
  2. 主题交换机(topic exchange): 通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机通常用来实现消息的多播路由。发送到主题交换机的消息的路由键,必须是一个“.”分隔的词语列表,这些词语应该和对应的业务相关联,词语的个数可以随意,但是不要超过255字节。绑定键支持通配符:“*“用来表示一个单词,”#“用来表示任意数量单词。
  3. 扇形交换机(fanout exchange):将消息路由给绑定到它身上的所有队列,且不理会绑定的路由键。扇形交换机用来处理消息的广播路由。扇形交换机允许对单条消息做不同的处理,另外可以把一个消息发给多个任务队列,执行不一样的工作。使用扇形交换机可以有效的解耦生产者和消费者
  4. 头交换机(headers exchange): 允许匹配AMQP的头而非路由键,使用起来和直连交换机差不多,但是性能却差很多,一般很少用到。

三、安装启动

  1. Ubuntu下安装
sudo apt-get install rabbitmq-server
sudo service rabbitmq-server start
  1. Centos下安装
yum -y install rabbitmq-server
systemctl start rabbitmq-server

四、代码演示

由于采用python来演示流程,所以需要先安装RabbitMQ的python客户端,最常用的客户端是pika,代码支持Python 2.X和3.X

pip install pika

生产者producer.py

#!/usr/bin/env python
# -*-coding:utf-8 -*-
import sys
import pika

parameters = pika.URLParameters('amqp://guest:guest@localhost:5672')
connection = pika.BlockingConnection(parameters) # connection 就是消息代理
channel = connection.channel() # 获得信道

# 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机
channel.exchange_declare(exchange='web_develop', exchange_type='direct',
                         passive=False, durable=True, auto_delete=False)

if len(sys.argv) != 1:
    msg = sys.argv[1]  # 使用命令行参数为消息内容
else:
    msg = 'hello world'

# 创建一个消息,delivery_mode为2表示让这个消息持久化,重启RabbitMQ也不会丢失
props = pika.BasicProperties(content_type='text/plain', delivery_mode=2)

# basic_publish 表示发送路由键为xxx_routing_key,消息为hello world 的消息给web_develop 这个交换机
channel.basic_publish('web_develop', 'xxx_routing_key', msg, properties=props)

connection.close()  # 关闭连接

消费者consumer.py

#!/usr/bin/env python
# -*-coding:utf-8 -*-
import pika

# 处理接收到的消息的回调函数,method_frame 携带了投递标记,header_frame表示AMQP信息头的对象,body为消息实体

def on_message(channel, method_frame, header_frame, body):
    # 消息确认,确认之后才会删除消息并给消费者发送新的消息
    channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    print(body)

parameters = pika.URLParameters('amqp://guest:guest@localhost:5672')
connection = pika.BlockingConnection(parameters)  # connection 就是消息代理
channel = connection.channel() # 获得信道

# 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机
channel.exchange_declare(exchange='web_develop', exchange_type='direct',
                         passive=False, durable=True, auto_delete=False)

# 声明队列,如果没有就创建
channel.queue_declare(queue='standard', auto_delete=True)

# 通过路由键将队列和交换机绑定
channel.queue_bind(queue='standard', exchange='web_develop', routing_key='xxx_routing_key')

# 订阅队列
channel.basic_consume(on_message, 'standard')

try:
    # 开始消费
    channel.start_consuming()
except KeyboardInterrupt:
    # 停止消费
    channel.stop_consuming()

# 关闭连接
connection.close()

先启动消费者

python consumer.py

输入图片说明

另开一个终端后,启动生产者

python producer.py

输入图片说明

可以对生产者的例子进行改动,让它支持消息确认。支持的原理是确保basic_publish的返回值为True。

生产者producer_with_confirm.py

#!/usr/bin/env python
# -*-coding:utf-8 -*-
import sys
import pika

parameters = pika.URLParameters('amqp://guest:guest@localhost:5672')
connection = pika.BlockingConnection(parameters) # connection 就是消息代理
channel = connection.channel() # 获得信道

# 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机
channel.exchange_declare(exchange='web_develop', exchange_type='direct',
                         passive=False, durable=True, auto_delete=False)

if len(sys.argv) != 1:
    msg = sys.argv[1]  # 使用命令行参数为消息内容
else:
    msg = 'hello world'

# 创建一个消息,delivery_mode为2表示让这个消息持久化,重启RabbitMQ也不会丢失
props = pika.BasicProperties(content_type='text/plain', delivery_mode=2)

if channel.basic_publish('web_develop', 'xxx_routing_key', msg, properties=props):
    print('Message publish was confirmed!')
else:
    print('Message could not be confirmed!')

connection.close()  # 关闭连接

重新执行新的生产者代码发现,当消费者收到消息后,生产者这边会打印出Message publish was confirmed!。

展开阅读全文
加载中
点击加入讨论🔥(2) 发布并加入讨论🔥
打赏
2 评论
23 收藏
1
分享
返回顶部
顶部