文档章节

EMQ百万级MQTT消息服务(小技巧)

喵了_个咪
 喵了_个咪
发布于 04/15 10:50
字数 1642
阅读 3809
收藏 48
点赞 2
评论 8

附上:

喵了个咪的博客:w-blog.cn

EMQ官方地址:http://emqtt.com/

EMQ中文文档:http://emqtt.com/docs/v2/guide.html

1.ACL鉴权规则化

在正常业务使用下对于客户端的行为可以使用ACL进行限制,比如A客户端只能订阅 /A/get 队列消息和向 /A/set 发布内容 但是在MYSQL里面处理这样的鉴权就需要写入两条记录,如果设备量有一百万数据库就要承担两百万条鉴权数据量会大大影响数据库的性能 那么有没有什么批量的方式来定义ACL鉴权呢?

在mysql-ACL鉴权的配置文件下关于如何使用鉴权的SQL是可以编辑的,也就意味着你可以通过SQL来实现批量ACL鉴权规则

> vim /usr/local/emqttd/etc/plugins/emq_auth_mysql.conf

# 最下面有这样一条配置
auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

笔者这里就实现每个设备默认可以订阅 /A/get 队列消息和向 /A/set 发布

笔者现在的规则是客户端只能向hello写消息其他操作一概不允许,我们先加两条记录

insert `mqtt_acl`(`allow`,`username`,`access`,`topic`) values(1,'$all',1,'/$user/get');
insert `mqtt_acl`(`allow`,`username`,`access`,`topic`) values(1,'$all',2,'/$user/set');

然后修改默认的ACL鉴权的SQL语句如下(这里使用的是username作为topic动态名称也可以使用其他字段):

select allow, ipaddr, username, clientid, access, REPLACE(topic,'$user','%u') from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

这样一来就算没有独立配置/A/set可以写入,作为用户是A的客户端也可以进行消息的写入了,并且也可以监听消息/A/get

2.共享订阅

关于队列常见的使用中也有这样的场景,一条消息希望被多个监听程序接收到,可能的场景如下:

  • 一个程序处理,一个程序记录日志分别处理
  • 批量推送
                            ---------
                            |       | --Msg1,Msg2,Msg3--> Subscriber1
Publisher--Msg1,Msg2,Msg3-->|  EMQ  | --Msg1,Msg2,Msg3--> Subscriber2
                            |       | --Msg1,Msg2,Msg3--> Subscriber3
                            ---------

多条消息希望被多个程序中的某个进行处理,场景如下:

  • 并发情况下耗时操作进行并行处理提高系统吞吐量
                            ---------
                            |       | --Msg1--> Subscriber1
Publisher--Msg1,Msg2,Msg3-->|  EMQ  | --Msg2--> Subscriber2
                            |       | --Msg3--> Subscriber3
                            ---------

在默认情况下有多个客户端监听一个事件时会受到同样的消息,但是怎么共享订阅呢?EMQ共享订阅支持两种使用方式:

  • $queue/ 如:$queue/topic
  • $share/<group>/ 如:$share/group/topic

以上两种都可以实现共享订阅(笔者测试下来值通过了share来完成了订阅),订阅和监听

  • 多个服务端监听 $share/group/topic
  • 客户端向 topic 发送消息

3.Qos 0/1/2的区别实测

最多一次的传输 消息是基于TCP/IP网络传输的。没有回应,在协议中也没有定义重传的语义。消息可能到达服务器1次,也可能根本不会到达。

至少一次的传输 服务器接收到消息会被确认,通过传输一个PUBACK信息。如果有一个可以辨认的传输失败,无论是通讯连接还是发送设备,还是过了一段时间确认信息没有收到,发送方都会将消息头的DUP位置1,然后再次发送消息。消息最少一次到达服务器。SUBSCRIBE和UNSUBSCRIBE都使用level 1 的QoS。 如果客户端没有接收到PUBACK信息(无论是应用定义的超时,还是检测到失败然后通讯session重启),客户端都会再次发送PUBLISH信息,并且将DUP位置1。 当它从客户端接收到重复的数据,服务器重新发送消息给订阅者,并且发送另一个PUBACK消息。

笔者做了一个实现消费端阻塞2秒消费一个内容,发布端1秒发布一个内容,等EMQ的最大拥塞使用完了之后消息在EMQ缓存的会后就会出现很多的重复消息

只有一次的传输 在QoS level 1上附加的协议流保证了重复的消息不会传送到接收的应用。这是最高级别的传输,当重复的消息不被允许的情况下使用。这样增加了网络流量,但是它通常是可以接受的,因为消息内容很重要。 QoS level 2在消息头有Message ID。

4.密码加盐

在用户验证中可以使用plain | md5 | sha | sha256 | bcrypt等hash方式(默认使用的sha256),但是出于安全性考虑EMQ也支持对密码加盐,可以解开注释使用一下加盐方式中的一种

vim /usr/local/emqttd/etc/plugins/emq_auth_mysql.conf

## sha256 with salt prefix
## auth.mysql.password_hash = salt,sha256

## bcrypt with salt only prefix
## auth.mysql.password_hash = salt,bcrypt

## sha256 with salt suffix
## auth.mysql.password_hash = sha256,salt

## pbkdf2 with macfun iterations dklen
## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
## auth.mysql.password_hash = pbkdf2,sha256,1000,20

对应存储的密码就要进行加盐处理了

5.EMQ离线消息

  • 保留消息 MQTT客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。 例如mosquitto命令行发布一条保留消息到主题’a/b/c’: mosquitto_pub -r -q 1 -t a/b/c -m 'hello' 之后连接上来的MQTT客户端订阅主题’a/b/c’时候,仍可收到该消息: $ mosquitto_sub -t a/b/c -q 1 hello 保留消息(Retained Message)有两种清除方式: 客户端向有保留消息的主题发布一个空消息: mosquitto_pub -r -q 1 -t a/b/c -m '' 消息服务器设置保留消息的超期时间。

  • cleanSession 清理回话 MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。 ‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。 ‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。

3 总结

在EMQ和MQTT使用过程中还有很多的细节需要注意,关注细节才能走的更远

注:笔者能力有限有说的不对的地方希望大家能够指出,也希望多多交流!

© 著作权归作者所有

共有 人打赏支持
喵了_个咪
粉丝 240
博文 136
码字总数 178071
作品 4
杨浦
技术主管
加载中

评论(8)

s
sweetalin
很厉害了!
喵了_个咪
喵了_个咪

引用来自“快乐之星IT”的评论

集群的共享订阅怎么实现呢?我测试的是多个监听程序收到的消息都是一样的?重复消费怎么处理呢?

引用来自“喵了_个咪”的评论

就是本文中提到的共享订阅

引用来自“快乐之星IT”的评论

单点的共享订阅是没有问题的,使用集群之后就会收到的消息重复?我看集群里有主题路由复制功能。
使用集群也不会受到重复的消息,消息确认机制及集群内共享的
快乐之星IT

引用来自“快乐之星IT”的评论

集群的共享订阅怎么实现呢?我测试的是多个监听程序收到的消息都是一样的?重复消费怎么处理呢?

引用来自“喵了_个咪”的评论

就是本文中提到的共享订阅
单点的共享订阅是没有问题的,使用集群之后就会收到的消息重复?我看集群里有主题路由复制功能。
喵了_个咪
喵了_个咪

引用来自“快乐之星IT”的评论

集群的共享订阅怎么实现呢?我测试的是多个监听程序收到的消息都是一样的?重复消费怎么处理呢?
就是本文中提到的共享订阅
快乐之星IT
集群的共享订阅怎么实现呢?我测试的是多个监听程序收到的消息都是一样的?重复消费怎么处理呢?
迷死特祁
迷死特祁
支持一下
今夕何年
今夕何年
搞不懂 每个 topic 只能保存一条信息
Wizzer
Wizzer
mark
EMQ 2.0.6 发布, 百万级 MQTT 消息服务器

EMQ 2.0.6 版本发布,2.0最后一个维护版本。EMQ 是采用 Erlang/OTP 平台开发,全面支持 MQTT V3.1.1 协议,支持集群和百万级连接的开源 MQTT 消息服务器。 更新内容: 升级esockd库到v4.1.1版...

emqtt
2017/01/09
2.6K
3
EMQ 2.1.0-beta.1 发布, 百万级 MQTT 消息服务器

EMQ v2.1.0-beta.1 版本正式发布。 改进Session/Inflight窗口设计,一个定时器负责全部Inflight QoS1/2消息重传。优化MQTT连接的GC机制,降低高消息吞吐情况下的CPU/内存占用。 EMQ 2.1.0版本...

emqtt
2017/02/24
1K
4
EMQ 2.2-beta.1 发布, 百万级 MQTT 消息服务器

EMQ 2.2-beta.1版本正式发布。 EMQ 2.2正式支持MQTT协议多监听器配置,支持HAProxy的Proxy Protocol V1/V2。新增Web Hook插件(emq-web-hook)、Lua Hook插件(emq-lua-hook)。 MQTT协议监听器配...

emqtt
2017/05/08
2.3K
6
EMQ百万级MQTT消息服务(分布式集群)

在强大的单机也比不上集群,EMQ的集群模式很粗暴,只需要把EMQ服务关联在一起然后负载均衡就可以达到集群的效果,这样就算面对1000CK问题也迎刃而解 附上: 喵了个咪的博客:w-blog.cnEMQ官方地址...

喵了_个咪
04/15
0
0
EMQ 2.3-beta.1 发布, 百万级 MQTT 消息服务器

EMQ R2.3-beta.1版本发布!该版本正式支持集群节点自动发现与集群脑裂自动愈合,支持基于IP Multicast、Etcd、Kubernetes等多种策略自动构建集群。 节点发现与自动集群 EMQ R2.3 版本支持多种...

emqtt
2017/07/25
1K
4
EMQ百万级MQTT消息服务(ACL鉴权)

虽然EMQ已经搭建起来了,但是投入到业务使用中还面临着一些问题,当然MQTT设计之初也考虑了这一点,比如不是任何一个客户端都能链接到服务器和限制客户端能够对topic操作的权限 附上: 喵了个咪的...

喵了_个咪
04/15
0
0
EMQ 2.1.1 发布,Erlang 集群 MQTT 消息服务器

EMQ2.1.1 发布了。[emqttd] (EMQ)是采用Erlang语言开发,全面支持MQTT V3.1.1协议,支持集群和大规模连接的开源MQTT消息服务器。 [emqttd]致力于发布一个基于Erlang/OTP语言平台,企业级稳...

emqtt
2017/04/14
717
1
EMQ 2.2-beta.2 发布, 百万级 MQTT 消息服务器

EMQ 2.2-beta.2版本正式发布。新增'websocketprotocolheader'配置项,支持微信小程序WebSocket连接EMQ;支持Elixir语言开发EMQ插件。 问题修复 新增'websocketprotocolheader'配置项,支持微...

emqtt
2017/05/22
1K
2
emqttd 2.1.0 发布,Erlang 集群 MQTT 消息服务器

emqttd 2.1.0 发布了。[emqttd] (EMQ)是采用Erlang语言开发,全面支持MQTT V3.1.1协议,支持集群和大规模连接的开源MQTT消息服务器。 [emqttd]致力于发布一个基于Erlang/OTP语言平台,企业...

达尔文
2017/04/09
701
2
EMQ 2.2-rc.1 发布, 百万级 MQTT 消息服务器

EMQ 2.2-rc.1 版本正式发布。新增HTTP管理API监听器,修复涂鸦上报的session注册race condition问题。其他更新: Add a new listener for HTTP REST API (emqttd#1094) Fix the race condit...

emqtt
2017/06/15
1K
2

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Unity客户端框架

自己作为主力开发的第一个Unity项目已经进入尾声测试阶段了,虽然数据还没有完全达到要求,但是从代码层面上看,有很多地方已经可以进行总结和整理。不管项目最终结果如何,在整个开发过程中...

爽歪歪ES
3分钟前
0
0
数据分析挖掘学习干货:大数据处理技术的总结与分析

一 数据分析处理需求分类 1 事务型处理 在我们实际生活中,事务型数据处理需求非常常见,例如:淘宝网站交易系统、12306网站火车票交易系统、超市POS系统等都属于事务型数据处理系统。 这类系...

加米谷大数据
6分钟前
0
0
关于看到的从页面调取html或者接口

你看到的可能是路由或者进行的接口拦截的跳转而不是前端页面进行的跳转

凡人修仙者
10分钟前
0
0
Ubuntu 安装 Redis (非源码Build方式)

安装redis : apt-get install redis-server # 安装完会自动自动 使用命令 ps -aux|grep redis 查看是否启动 ---------------------------------- redis 服务操作方式 /etc/init.d/redis-serv......

Alex142857
14分钟前
0
0
mysql字段中去掉括号

eg: select * from (select *,substring_index(substring_index(remarks,"[",1),']',-1) as carCard from test ) as t where carCard LIKE '%C999%'...

writeademo
18分钟前
0
0
web3.py简介

与web3.py库交互的共同入口是web3对象。web3对象提供API,用于python开发的应用与以太坊区块链进行交互,通常是通过连接JSON-RPC服务器进行。 Providers提供者 Providers使web3连接到区块链上...

笔阁
19分钟前
0
0
jquery 获取父页面某一个input框值得写法

//login_name 为父页面的input框的ID var login_name= $('#login_name', window.parent.document).val();//当前用户登录名...

gulf
20分钟前
0
0
mybatis高级查询

说实话不怎么想写这些,可能是我昨天太累了,妹的那个错误一直找不到。醉了,写一下把,分别是多表联合查询以及汇总 <resultMap id="mdxlResult" type="java.util.Map"> <id column="id" ...

木九天
22分钟前
0
0
Spring5(Java8版本)中的反射工具类和注解工具类

1.反射工具类 学习反射时很好的反射教程.这个工具类是基于Java8的。 package org.springframework.util;import java.lang.reflect.Constructor;import java.lang.reflect.Field;imp...

hutaishi
24分钟前
0
0
java io

Java IO在实际开发中的应用 http://www.cnblogs.com/ldh-better/p/7158658.html

小鱼吃大鱼
24分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部