[TOC]
RabbitMQ快速入门及golang代码案例
之前做分布式集群的时候,用它的发布订阅模式和路由模式来做节点之间的通信.
简单模式和工作模式就用的比较少了, 反而用Redis队列比较多.
这里把之前做的笔记都记下来.
RabbitMQ 介绍
- 是面向消息的中间件,用于组件之间的解耦.主要体现在消息的发送者和消费者之间无强依赖关系.
- 特点: 高可用,扩展性,多语言客户端,管理界面.
- 主要使用场景: 流量削峰,异步处理,应用解耦.
- 默认监听端口:15672
- 对比Kafka虽然RabbitMQ的性能会差一点点,但是比其他的消息总好却高很多,而且RabbitMQ用起来更加的方便,可以保证数据不丢失.
- 为什么选择RabbitMQ? 因为它可以通过各种手段保证数据不丢失而且性能也比其他的消息总线好,而kafka则为了高性能对数据的重复,丢失,错误没有严格要求.
安装和查看
-
mac安装
brew install rabbitmq
注意,如果使用brew安装,不能直接使用RabbitMQ的命令只能手动指定路径
/usr/local/Cellar/rabbitmq/3.8.2/sbin/rabbitmqctl
但是可以直接使用
brew services run rabbitmq
启动RabbitMQ -
ubuntu安装
apt-get install rabbitmq-server
-
配置浏览器访问
- 查看运行状态
sudo service rabbitmq-server status
- 运行,关闭,重启,状态查看
# 系统命令
sudo service rabbitmq-server start
sudo service rabbitmq-server stop
sudo service rabbitmq-server restart
sudo service rabbitmq-server status
# rabbitmq-server命令
sudo rabbitmq-server start&
sudo rabbitmq-server stop
sudo rabbitmqctl status
- 如果要做Web可视化,需要安装插件,随后可以通过浏览器访问
15672
查看状态
sudo rabbitmq-plugins enable rabbitmq_management
- 重启服务
sudo service rabbitmq-server restart
用户管理
# 查看用户列表
sudo rabbitmqctl list_users
# 新建用户root,同时设置密码root
sudo rabbitmqctl add_user root root
# 设置用户角色, 这里为用户root设置最高权限
sudo rabbitmqctl set_user_tags root administrator
# 修改用密码
sudo rabbitmqctl change_password name 'newPasswd'
# 查看用户列表
sudo rabbitmqctl list_users
# 删除用户
sudo rabbitmqctl delete_user username
# 清除用户权限
sudo rabbitmqctl clear_permissions -p vhostpath username
# 设置权限
sudo rabbitmqctl set_permissions -p vhostpath username
- 现在可以用第7步设置的账号和密码来访问
http://ip:15672
使用rabbitmq的监控页面.
插件管理
# 查看支持的插件列表
sudo rabbitmq-plugins list
# 开启插件
sudo rabbitmq-plugins enable 插件名
# 关闭插件
sudo rabbitmq-plugins disable 插件名
- 查看插件[]为空说明未启用
sudo rabbitmq-plugins list
- 开启插件
web管理界面插件
sudo rabbitmq-plugins enable rabbitmq_management
日志插件
sudo rabbitmq-plugins enable rabbitmq_tracing
- 取消插件
sudo rabbit-mq-plugins disable rabbitmq_management
虚拟主机,队列和队列
# 创建虚拟主机
rabbitmqctl add_vhost vhostpath
# 列出所有虚拟主机
rabbitmqctl list_vhosts
# 列出虚拟主机上的所有权限
rabbitmqctl list_permisstions -p vhostpath
# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath
# 查看所有队列信息
rabbitmqctl list_queues
# 清楚队列中的消息
rabbitmqctl -p vhostpath purge_queue blue
高级操作
在RabbitMQ集群的笔记中有命令的补充
# 移除所有数据,要在rabbitmqctl stop_app后使用
rabbitmqctl reset
# 组成集群命令(--ram指定存储模式为内存,--disc指定为磁盘)
rabbitmqctl join_cluster <clusternode> [--ram]
# 查看集群状态
rabbitmqctl cluster status
# 修改集群节点的存储方式
rabbitmqctl change_cluster_node_type disc|ram
# 忘记集群节点(--offline在不启动节点的情况下摘除该集群),比如故障转移什么的就可以用这个
rabbitmqctl forget_cluster_node [--offline]
# 修改节点名称
rabbitmqctl rename_cluster_node oldName newName
RabbitMQ核心概念
VirtualHost
数据隔离,区分队列,隔离账号. 比如可以分成生成和线下环境之类的
初始用户未绑定Vhost,需要绑定Vhost.一旦绑定之后用户只能通过这个Vhost工作不管是新建连接还是发送队列.
Connection
查看连接, 监听进程
Exchange
交换机,做中转用.
工作原理: 用户发送消息-> 交换机-> 根据交换机规则绑定对应的Key
Channel
建立连接通信
Queue
队列
绑定交换机接收消息,只要没有消费者,消息会一直存在队列中
Binding
将我们起的队列绑定到交换机上面,通过不同方式的绑定,可以实现不同的工作模式
工作中常用的五种模式
一. Simple(简单模式)
一个生产者,一个消费者.
每个消费者获取到的消息唯一,也就是同一个消息只能被消费一次,其他人再次消费的就是下一条消息.
就像普通队列那样.
二. Work(工作模式)
一个生产者,多个消费者
每个消费者获取到的消息唯一,也就是说同一个消息只能被消费一次,其他人再次消费的就是下一条消息。可以起到负载分配的作用,负载均衡. 就像普通队列那样
当生产数量比消费数量多的时候,就应该启用work模式.
其实work工作模式就是在simple模式的消费端多了几个消费者而已,代码什么的都不用变.多起几个消费者即可.
三. Publish/Subscribe(订阅/发布模式)
订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取。
四. Routing(路由模式)
路由模式,一个消息可以被多个消费者获取。并且消息的目标队列可被生产者指定。
是由订阅模式演化而来的
五. Topic(话题模式)
话题模式,一个消息被多个消费者获取。消息的目标queue可用BindingKey 以通配符, (#:一个或多个词或零个,*:一个词)的方式指定。
例如:
test.*
,表示匹配test.liu
; 但是test.liu.mq
需要用test.#
来匹配; 如果只用#
则表示没有规则而获取所有消息至于单词与单词之间的间隔用英文_-.
等等都可以.
代码案例
这是之前学用golang下调用rabbitmq的代码,有点啰嗦. 但是主要是为了联系和学习,啰嗦点更好.
package rabbitmq
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
/**
qmqp:// 协议名,go语言中定死的
root:root RabbitMQ的账号密码
localhost:5672 ip地址和端口
test Virtual host名称
*/
//协议名://账号:密码@IP地址:端口号/Virtual host
const MQURL = "amqp://root:root@localhost:5672/test"
type RabbitMQ struct {
conn *amqp.Connection //保存的连接
channel *amqp.Channel //通信信息
QueueName string //队列名称
Exchange string //交换机
Key string //key
Mqurl string //连接信息
}
//创建结构体实例
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
//创建连接
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "创建连接失败")
//创建channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "获取channel失败")
return rabbitmq
}
//手动断开channel和connection,如果不断开会被一直占用
func (r *RabbitMQ) Destory() {
r.channel.Close()
r.conn.Close()
}
//错误处理逻辑
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
}
}
//sample模式step: 1. 创建RabbitMQ实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
//实例化,准备发送连接
//queueName在生产者和消费者中必须是一致的,否则将得不到消息
//exchange为空: 采用默认交换机,direct模式的交换机此种模式表示直接发送到队列
//key为空: 这里是没有Key
return NewRabbitMQ(queueName, "", "")
}
//sample模式step: 2. 生产者
func (r *RabbitMQ) ProducerSimple(message string) {
//1.生成队列(固定用法),如果队列不存在会自动创建,如果存在则跳过创建
//保证一定能入列
_, err := r.channel.QueueDeclare(
r.QueueName,
false, //是否持久化
false, //当最后一个消费者断开连接,是否自动删除
false, //是否有排他性,如果为true,会创建一个只有自己能访问的队列
false, //是否阻塞,发送消息以后是否等待服务器的响应
nil, //额外属性
)
if err != nil {
log.Printf("sample模式-创建队列失败: %s", err)
}
//2.发送消息到队列中
err = r.channel.Publish(
r.Exchange, //交换机名称
r.QueueName, //队列名称
false, //如果为true会根据Exchange类型和routekey规则自动寻找符合要求的队列,找不到就发还给生产者
false, //如果为true,发送消息到队列发现队列上没有绑定消费者,则会把消息发还给生产者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
if err != nil {
log.Printf("sample模式-发送消息失败: %s", err)
}
log.Println("sample模式-发送消息成功")
}
//sample模式step: 3. 消费者
func (r *RabbitMQ) ConsumeSample() {
//1.生成队列(固定用法),如果队列不存在会自动创建,如果存在则跳过创建
//保证一定能入列
_, err := r.channel.QueueDeclare(
r.QueueName,
false, //是否持久化
false, //当最后一个消费者断开连接,是否自动删除
false, //是否有排他性,如果为true,会创建一个只有自己能访问的队列
false, //是否阻塞,发送消息以后是否等待服务器的响应
nil, //额外属性
)
if err != nil {
log.Printf("sample模式-创建队列失败: %s", err)
}
//2. 接收消息
message, err := r.channel.Consume(
r.QueueName, //队列名称
"", //用来区分多个消费者,这里不区分
true, //是否自动应答,为true时消费者用完消息是否自动告诉rabbitMQ服务器我们已经用完了,让他删除.为false时需要我们自己写回调了
false, //是否具有排他性
false, //如果设置为true不能将同一个connection中发送的消息传递给这个connection中的消费者
false, //是否阻塞,消费完毕之后下一个再进来,注意:false为阻塞.
nil,
)
if err != nil {
log.Printf("sample模式-接收消息失败: %s", err)
}
//3. 消费接收到的消息,
forever := make(chan int) //利用无buffer的channel造成死循环
//启动协程处理消息
go func() {
for d := range message {
//这里可以写我们的消息处理逻辑的代码
log.Printf("sample模式-从队列中获取到消息:%s", d.Body)
}
}()
log.Println("[**********]sample模式-等待队列消息中")
<-forever
}
//订阅模式step: 1. 创建RabbitMQ实例
func NewRabbitMQPublish(exchangeName string) *RabbitMQ {
//实例化,准备发送连接
//queueName为空: 不需要设置
//exchange为空: 这里要指定交换机
//key为空: 这里是没有Key
return NewRabbitMQ("", exchangeName, "")
}
//订阅模式step: 2.生产者
func (r *RabbitMQ) ProducerPublish(message string) {
//创建交换机,如果不存在就按照我们的参数进行创建,存在就直接使用
err := r.channel.ExchangeDeclare(
r.Exchange, //指定交换机名称
"fanout", //交换机类型: fanout:广播类型
true, //是否持久化
false, //是否自动删除
false, //true:表示这个exchange不可以被client用来推送消息,只能用来exchange和exchange之间的绑定
false,
nil,
)
r.failOnErr(err, "创建交换机失败")
//发送消息
err = r.channel.Publish(
r.Exchange, //指定交换机
"",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
r.failOnErr(err, "发送消失失败")
log.Println("消息发送成功")
}
//订阅模式step: 3.消费者
func (r *RabbitMQ) ConsumePublish() {
//创建交换机,如果不存在就按照我们的参数进行创建,存在就直接使用
err := r.channel.ExchangeDeclare(
r.Exchange, //指定交换机名称
"fanout", //交换机类型: fanout:广播类型
true, //是否持久化
false, //是否自动删除
false, //true:表示这个exchange不可以被client用来推送消息,只能用来exchange和exchange之间的绑定
false,
nil,
)
r.failOnErr(err, "创建交换机失败")
//试探性创建队列,队列名称留空表示随机生成,不用指定
q, err := r.channel.QueueDeclare(
"", //随机生成队列
false,
false,
false,
false,
nil,
)
r.failOnErr(err, "尝试创建队列失败")
//绑定队列到exchange中
err = r.channel.QueueBind(
q.Name, //队列名称,用创建队列时系统自动生成的队列
"",
r.Exchange,
false,
nil,
)
r.failOnErr(err, "队列绑定失败")
//消费信息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "消费信息失败")
//消费信息
//3. 消费接收到的消息,
forever := make(chan int) //利用无buffer的channel造成死循环
//启动协程处理消息
go func() {
for d := range message {
//这里可以写我们的消息处理逻辑的代码
log.Printf("订阅模式-从队列中获取到消息:%s", d.Body)
}
}()
log.Println("[**********]订阅模式-等待队列消息中")
<-forever
}
//路由模式step1: 创建实例, 与订阅模式不同的是这里要指定routingkey
func NewRabbitMQRouting(exchangeName string, routingKey string) *RabbitMQ {
//实例化
rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
var err error
//获取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "创建连接失败")
//获取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "打开channel失败")
return rabbitmq
}
//路由模式step2: 发送消息
func (r *RabbitMQ) ProducerRouting(message string) {
//创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange,
"direct", //和订阅模式唯一不同的地方是这里改为direct模式
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "创建交换机失败")
//发送消息
err = r.channel.Publish(
r.Exchange,
r.Key, //这里必须要设置key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
r.failOnErr(err, "发送消息失败")
log.Println("消息发送成功")
}
//路由模式step3:接收消息
func (r *RabbitMQ) ReceiveRouting() {
//尝试创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange,
"direct", //和订阅模式不同的地方
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "尝试创建交换机失败")
//尝试创建队列
q, err := r.channel.QueueDeclare(
"", //要系统随机生成
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "创建队列失败")
//绑定队列
err = r.channel.QueueBind(
q.Name,
r.Key, //设置key
r.Exchange,
false,
nil,
)
//消费消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan int)
go func() {
for d := range message {
//这里可以写我们的消息处理逻辑的代码
log.Printf("路由模式-从队列中获取到消息:%s", d.Body)
}
}()
log.Println("[**********]路由模式-等待队列消息中")
<-forever
}
//话题模式step1: 创建RabbitMQ实例
func NewRabbitMQTopic(exchangeName string, routingKey string) *RabbitMQ {
rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
//创建connection连接
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "创建连接失败")
//获取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "创建channel失败")
return rabbitmq
}
//话题模式step2: 发送消息
func (r *RabbitMQ) ProducerTopic(message string) {
//创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange,
"topic", //和订阅模式唯一不同的地方是这里改为topic模式
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "创建交换机失败")
//发送消息
err = r.channel.Publish(
r.Exchange,
r.Key, //这里必须要设置key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
r.failOnErr(err, "发送消息失败")
log.Println("消息发送成功")
}
//要注意key的规则
//#:一个或多个词或零个,*:一个词
//test.*,表示匹配test.liu,但是test.liu.mq需要用test.#来匹配
//话题模式step3: 接收消息
func (r *RabbitMQ) ReceiveTopic() {
//尝试创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange,
"topic", //和订阅模式不同的地方
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "尝试创建交换机失败")
//尝试创建队列
q, err := r.channel.QueueDeclare(
"", //要系统随机生成
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "创建队列失败")
//绑定队列
err = r.channel.QueueBind(
q.Name,
r.Key, //设置key
r.Exchange,
false,
nil,
)
//消费消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan int)
go func() {
for d := range message {
//这里可以写我们的消息处理逻辑的代码
log.Printf("话题模式-从队列中获取到消息:%s", d.Body)
}
}()
log.Println("[**********]话题模式-等待队列消息中")
<-forever
}