文档章节

EMQ百万级MQTT消息服务(小技巧)

喵了_个咪
 喵了_个咪
发布于 04/15 10:50
字数 1642
阅读 4554
收藏 48

附上:

喵了个咪的博客:w-blog.cn

EMQ官方地址:http://emqtt.com/

EMQ中文文档:http://emqtt.com/docs/v2/guide.html

1.ACL鉴权规则化

在正常业务使用下对于客户端的行为可以使用ACL进行限制,比如A客户端只能订阅 /A/get 队列消息和向 /A/set 发布内容 但是在MYSQL里面处理这样的鉴权就需要写入两条记录,如果设备量有一百万数据库就要承担两百万条鉴权数据量会大大影响数据库的性能 那么有没有什么批量的方式来定义ACL鉴权呢?

在mysql-ACL鉴权的配置文件下关于如何使用鉴权的SQL是可以编辑的,也就意味着你可以通过SQL来实现批量ACL鉴权规则

> vim /usr/local/emqttd/etc/plugins/emq_auth_mysql.conf

# 最下面有这样一条配置
auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

笔者这里就实现每个设备默认可以订阅 /A/get 队列消息和向 /A/set 发布

笔者现在的规则是客户端只能向hello写消息其他操作一概不允许,我们先加两条记录

insert `mqtt_acl`(`allow`,`username`,`access`,`topic`) values(1,'$all',1,'/$user/get');
insert `mqtt_acl`(`allow`,`username`,`access`,`topic`) values(1,'$all',2,'/$user/set');

然后修改默认的ACL鉴权的SQL语句如下(这里使用的是username作为topic动态名称也可以使用其他字段):

select allow, ipaddr, username, clientid, access, REPLACE(topic,'$user','%u') from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

这样一来就算没有独立配置/A/set可以写入,作为用户是A的客户端也可以进行消息的写入了,并且也可以监听消息/A/get

2.共享订阅

关于队列常见的使用中也有这样的场景,一条消息希望被多个监听程序接收到,可能的场景如下:

  • 一个程序处理,一个程序记录日志分别处理
  • 批量推送
                            ---------
                            |       | --Msg1,Msg2,Msg3--> Subscriber1
Publisher--Msg1,Msg2,Msg3-->|  EMQ  | --Msg1,Msg2,Msg3--> Subscriber2
                            |       | --Msg1,Msg2,Msg3--> Subscriber3
                            ---------

多条消息希望被多个程序中的某个进行处理,场景如下:

  • 并发情况下耗时操作进行并行处理提高系统吞吐量
                            ---------
                            |       | --Msg1--> Subscriber1
Publisher--Msg1,Msg2,Msg3-->|  EMQ  | --Msg2--> Subscriber2
                            |       | --Msg3--> Subscriber3
                            ---------

在默认情况下有多个客户端监听一个事件时会受到同样的消息,但是怎么共享订阅呢?EMQ共享订阅支持两种使用方式:

  • $queue/ 如:$queue/topic
  • $share/<group>/ 如:$share/group/topic

以上两种都可以实现共享订阅(笔者测试下来值通过了share来完成了订阅),订阅和监听

  • 多个服务端监听 $share/group/topic
  • 客户端向 topic 发送消息

3.Qos 0/1/2的区别实测

最多一次的传输 消息是基于TCP/IP网络传输的。没有回应,在协议中也没有定义重传的语义。消息可能到达服务器1次,也可能根本不会到达。

至少一次的传输 服务器接收到消息会被确认,通过传输一个PUBACK信息。如果有一个可以辨认的传输失败,无论是通讯连接还是发送设备,还是过了一段时间确认信息没有收到,发送方都会将消息头的DUP位置1,然后再次发送消息。消息最少一次到达服务器。SUBSCRIBE和UNSUBSCRIBE都使用level 1 的QoS。 如果客户端没有接收到PUBACK信息(无论是应用定义的超时,还是检测到失败然后通讯session重启),客户端都会再次发送PUBLISH信息,并且将DUP位置1。 当它从客户端接收到重复的数据,服务器重新发送消息给订阅者,并且发送另一个PUBACK消息。

笔者做了一个实现消费端阻塞2秒消费一个内容,发布端1秒发布一个内容,等EMQ的最大拥塞使用完了之后消息在EMQ缓存的会后就会出现很多的重复消息

只有一次的传输 在QoS level 1上附加的协议流保证了重复的消息不会传送到接收的应用。这是最高级别的传输,当重复的消息不被允许的情况下使用。这样增加了网络流量,但是它通常是可以接受的,因为消息内容很重要。 QoS level 2在消息头有Message ID。

4.密码加盐

在用户验证中可以使用plain | md5 | sha | sha256 | bcrypt等hash方式(默认使用的sha256),但是出于安全性考虑EMQ也支持对密码加盐,可以解开注释使用一下加盐方式中的一种

vim /usr/local/emqttd/etc/plugins/emq_auth_mysql.conf

## sha256 with salt prefix
## auth.mysql.password_hash = salt,sha256

## bcrypt with salt only prefix
## auth.mysql.password_hash = salt,bcrypt

## sha256 with salt suffix
## auth.mysql.password_hash = sha256,salt

## pbkdf2 with macfun iterations dklen
## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
## auth.mysql.password_hash = pbkdf2,sha256,1000,20

对应存储的密码就要进行加盐处理了

5.EMQ离线消息

  • 保留消息 MQTT客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。 例如mosquitto命令行发布一条保留消息到主题’a/b/c’: mosquitto_pub -r -q 1 -t a/b/c -m 'hello' 之后连接上来的MQTT客户端订阅主题’a/b/c’时候,仍可收到该消息: $ mosquitto_sub -t a/b/c -q 1 hello 保留消息(Retained Message)有两种清除方式: 客户端向有保留消息的主题发布一个空消息: mosquitto_pub -r -q 1 -t a/b/c -m '' 消息服务器设置保留消息的超期时间。

  • cleanSession 清理回话 MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。 ‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。 ‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。

3 总结

在EMQ和MQTT使用过程中还有很多的细节需要注意,关注细节才能走的更远

注:笔者能力有限有说的不对的地方希望大家能够指出,也希望多多交流!

© 著作权归作者所有

共有 人打赏支持
喵了_个咪
粉丝 262
博文 144
码字总数 185422
作品 4
杨浦
技术主管
加载中

评论(8)

s
sweetalin
很厉害了!
喵了_个咪
喵了_个咪

引用来自“快乐之星IT”的评论

集群的共享订阅怎么实现呢?我测试的是多个监听程序收到的消息都是一样的?重复消费怎么处理呢?

引用来自“喵了_个咪”的评论

就是本文中提到的共享订阅

引用来自“快乐之星IT”的评论

单点的共享订阅是没有问题的,使用集群之后就会收到的消息重复?我看集群里有主题路由复制功能。
使用集群也不会受到重复的消息,消息确认机制及集群内共享的
快乐之星IT

引用来自“快乐之星IT”的评论

集群的共享订阅怎么实现呢?我测试的是多个监听程序收到的消息都是一样的?重复消费怎么处理呢?

引用来自“喵了_个咪”的评论

就是本文中提到的共享订阅
单点的共享订阅是没有问题的,使用集群之后就会收到的消息重复?我看集群里有主题路由复制功能。
喵了_个咪
喵了_个咪

引用来自“快乐之星IT”的评论

集群的共享订阅怎么实现呢?我测试的是多个监听程序收到的消息都是一样的?重复消费怎么处理呢?
就是本文中提到的共享订阅
快乐之星IT
集群的共享订阅怎么实现呢?我测试的是多个监听程序收到的消息都是一样的?重复消费怎么处理呢?
迷死特祁
迷死特祁
支持一下
今夕何年
今夕何年
搞不懂 每个 topic 只能保存一条信息
Wizzer
Wizzer
mark
EMQ 2.0.6 发布, 百万级 MQTT 消息服务器

EMQ 2.0.6 版本发布,2.0最后一个维护版本。EMQ 是采用 Erlang/OTP 平台开发,全面支持 MQTT V3.1.1 协议,支持集群和百万级连接的开源 MQTT 消息服务器。 更新内容: 升级esockd库到v4.1.1版...

emqtt
2017/01/09
2.6K
3
EMQ 2.1.0-beta.1 发布, 百万级 MQTT 消息服务器

EMQ v2.1.0-beta.1 版本正式发布。 改进Session/Inflight窗口设计,一个定时器负责全部Inflight QoS1/2消息重传。优化MQTT连接的GC机制,降低高消息吞吐情况下的CPU/内存占用。 EMQ 2.1.0版本...

emqtt
2017/02/24
1K
4
EMQ 2.2-beta.1 发布, 百万级 MQTT 消息服务器

EMQ 2.2-beta.1版本正式发布。 EMQ 2.2正式支持MQTT协议多监听器配置,支持HAProxy的Proxy Protocol V1/V2。新增Web Hook插件(emq-web-hook)、Lua Hook插件(emq-lua-hook)。 MQTT协议监听器配...

emqtt
2017/05/08
2.3K
6
EMQ 2.3-beta.1 发布, 百万级 MQTT 消息服务器

EMQ R2.3-beta.1版本发布!该版本正式支持集群节点自动发现与集群脑裂自动愈合,支持基于IP Multicast、Etcd、Kubernetes等多种策略自动构建集群。 节点发现与自动集群 EMQ R2.3 版本支持多种...

emqtt
2017/07/25
1K
4
EMQ百万级MQTT消息服务(分布式集群)

在强大的单机也比不上集群,EMQ的集群模式很粗暴,只需要把EMQ服务关联在一起然后负载均衡就可以达到集群的效果,这样就算面对1000CK问题也迎刃而解 附上: 喵了个咪的博客:w-blog.cnEMQ官方地址...

喵了_个咪
04/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

00.编译OpenJDK-8u40的整个过程

前言 历经2天的折腾总算把OpenJDK给编译成功了,要说为啥搞这个,还得从面试说起,最近出去面试经常被问到JVM的相关东西,总感觉自己以前学的太浅薄,所以回来就打算深入学习,目标把《深入理...

凌晨一点
59分钟前
2
0
python: 一些关于元组的碎碎念

初始化元组的时候,尤其是元组里面只有一个元素的时候,会出现一些很蛋疼的情况: def checkContentAndType(obj): print(obj) print(type(obj))if __name__=="__main__": tu...

Oh_really
昨天
4
2
jvm crash分析工具

介绍一款非常好用的jvm crash分析工具,当jvm挂掉时,会产生hs_err_pid.log。里面记录了jvm当时的运行状态以及错误信息,但是内容量比较庞大,不好分析。所以我们要借助工具来帮我们。 Cras...

xpbob
昨天
97
0
Qt编写自定义控件属性设计器

以前做.NET开发中,.NET直接就集成了属性设计器,VS不愧是宇宙第一IDE,你能够想到的都给你封装好了,用起来不要太爽!因为项目需要自从全面转Qt开发已经6年有余,在工业控制领域,有一些应用...

飞扬青云
昨天
4
0
我为什么用GO语言来做区块链?

Go语言现在常常被用来做去中心化系统(decentralised system)。其他类型的公司也都把Go用在产品的核心模块中,并且它在网站开发中也占据了一席之地。 我们在决定做Karachain的时候,考量(b...

HiBlock
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部