文档章节

EMQ X Enterprise 新功能 Rule Engine 介绍

EMQX
 EMQX
发布于 08/04 16:18
字数 2290
阅读 16
收藏 0

EMQ X Enterprise Rule Engine

Rule Engine (以下简称规则引擎) 用于配置 EMQ X 消息流与设备事件的处理、响应规则。作为 2019 年度 EMQ X 新增重量级功能,规则引擎不仅提供了清晰、灵活的"配置式"的业务集成方案,用于简化业务开发流程,提升用户易用性,降低业务系统与 EMQ X 的耦合度;也为 EMQ X 的私有功能定制提供了一个更优秀的基础架构,提升开发交付速度。

规则引擎开源版本提供了基础处理能力,已集成在 EMQ X v3.1.0 中发布。功能更灵活完备、可用性定制性更强的规则引擎正在紧密开发测试,计划集成在下一 EMQ X 商业版中发布。

规则引擎典型应用场景举例:

  • 动作监听:智慧家庭智能门锁开发中,门锁会因为网络、电源故障、人为破坏等原因离线导致功能异常,使用规则引擎配置监听离线事件向应用服务推送该故障信息,可以在接入层实现第一时间的故障检测的能力;
  • 数据筛选:车辆网的卡车车队管理,车辆传感器采集并上报了大量运行数据,应用平台仅关注车速大于 40 km/h 时的数据,此场景下可以使用规则引擎对消息进行条件过滤,向业务消息队列写入满足条件的数据;
  • 消息路由:智能计费应用中,终端设备通过不同主题区分业务类型,可通过配置规则引擎将计费业务的消息接入计费消息队列并在消息抵达设备端后发送确认通知到业务系统,非计费信息接入其他消息队列,实现业务消息路由配置;
  • 消息编解码:其他公共协议/私有 TCP 协议接入、工控行业等应用场景下,可以通过规则引擎的本地处理函数(可在 EMQ X 上定制开发)做二进制/特殊格式消息体的编解码工作;亦可通过规则引擎的消息路由将相关消息流向外部计算资源如函数计算进行处理(可由用户自行开发处理逻辑),将消息转为业务易于处理的 JSON 格式,简化项目集成难度、提升应用快速开发交付能力。

Rule Engine 工作示意图

规则引擎通过嵌入在 EMQ 的消息转发过程中对数据进行过滤、转换和丰富,实现高效的数据处理。新的规则引擎涵盖了 EMQ X 中多个插件的功能,将原先插件中独立的外部资源进行集中管理,实现资源复用,降低管理监控复杂度。同时,规则引擎将大部分原先只能在应用端进行的计算内置到 EMQ X 中,通过计算、过滤、筛选高价值数据提高消息处理效率的同时,精简了业务架构、减少数据传递路径降低了消息处理时延。

规则引擎相关的功能包括:

  • 消息规则:处理设备到 EMQ X 的消息,实现条件计算筛选、消息结构调整,消息重新发布、持久化与桥接;
  • 事件规则:处理设备通信生命周期中的各个事件信息,可方便实现设备状态记录如上下线通知,认证连接记录、消息状态记录如消息计费统计等功能;
  • 资源管理:集中管理外部资源,实现资源复用,降低管理监控复杂度。

与 EMQ X 其他功能一样,规则引擎同样提供了类似的 HTTP REST API 方便用户应用开发集成,EMQ X Dashboard ( EMQ X 管理控制台)中亦实现了规则引擎的可视化创建、编辑、管理功能。

消息规则

借助规则引擎中的消息规则,用户可以将设备到 EMQ X 的消息路由或写入到各类数据库、消息队列、HTTP REST 网关等对象或资源中,或重新发送到设备以实现服务端计算功能。

规则引擎提供了基于 SQL 表达式的数据查询、处理功能,让您先筛选数据并转换消息为预置格式,再配置后续处理动作。

SQL 表达式范例如下:

-- 选择发往 "t/a" 主题的消息体中的 name 字段, 过滤条件为 name = 'EMQ'
select payload.name as name from "t/a" when name = 'EMQ'


-- 选择发往 "command/#" 主题的消息体
select payload from "command/#"

消息规则典型功能与应用场景如下:

  • 按照消息的主题进行过滤,指定要处理的消息,处理后重新发布到新主题;
  • 制定筛选条件,针对消息正文特定字段进行条件筛选,处理满足条件的数据;
  • 将消息正文转换为预置结构再处理,削减内部通信与外部存储、计算开销;
  • 使用消息摘要、编码转换、数学运算等多种预处理方式处理消息正文或消息正文中指定字段,在 Broker 中完成简单计算以降低操作延迟。

每条消息规则包含以下属性:

属性 说明
Source 要处理的数据流来源,基于 MQTT 主题,使用 SQL 中的 FROM 指令筛选
条件 针对消息正文(仅限 JSON 信息)、消息上下文信息(如 QoS、Client ID、Username)的条件过滤表达式,用于确定该条规则的匹配条件、消息结构。使用 SQL 中的 WHERE 指令查询
处理器 针对消息正文(仅限 JSON 信息)、消息上下文信息(如 QoS、Client ID、Username)的选择表达式,用于选择并预处理指定数据,规则引擎内置多种预处理方法如消息摘要、编解码与编码转换、简单数学运算,使用 SQL 的子句与 SQL 函数处理。
动作 消息命中规则并处理成功后需要触发的动作,指定具体的动作操作如写入数据库 SQL 语句,发送到消息队列的对象、主题。一条规则可以定义一个或多个动作,实现规则的多端处理。

事件规则

  • 借助规则引擎中的事件规则,用户可以处理设备通信生命周期中的各个事件信息,事件规则典型功能与应用场景如下:
    • 设备各个事件动作 log:,如设备连接/断开连接、消息发布、消息传送/抵达/丢弃、设备订阅/取消订阅等事件,用于设备操作记录与行为分析;
    • 设备上下线通知与记录:监听 client.connected 与 client.disconnected 两个事件可以实现设备上下线记录;
    • 消息状态记录:监听 message 相关事件可以实现关键消息指令状态监测如下发成功/失败回调等。

附:规则引擎功能列表

功能 说明 开源版 商业版
基础功能
条件筛选 通过事件名称、上下文信息、消息内容进行条件筛选,选择要处理的消息流 支持 支持
预处理 通过内置处理函数实现消息的简单数学计算、字符处理、编解码能力,输出预置格式的消息 支持 支持
资源管理 通过内置处理函数实现消息的简单数学计算、字符处理、编解码能力,输出预置格式的消息 支持 支持
消息输出
发布到指定主题 将规则处理后的消息重发布(republish)到指定主题进行载处理或供订阅端使用 支持 支持
发送到 WebHook 将消息发布到 HTTP API 网关 支持 支持
发送到消息队列 支持 Kafka、RabbitMQ 等私有或公有云消息中间件 支持
写入到数据库 包括 MySQL、PostgreSQL、MongoDB、Redis 等私有或公有云数据库 支持
发送到另一个 MQTT Broker 通过 MQTT 协议将消息发布到另一个 MQTT Broker 指定主题,包括但不限于 EMQ X、Azure IoT Hub、AWS IoT、阿里云物联网平台 支持
扩展定制
处理功能定制 定制私有内置处理函数,灵活处理私有协议、特殊编码消息 支持
消息输出定制 定制私有消息输出方式,规则输出端更灵活 支持

更多信息请访问我们的官网 emqx.io,或关注我们的开源项目 github.com/emqx/emqx ,详细文档请访问 官方文档

© 著作权归作者所有

EMQX
粉丝 4
博文 54
码字总数 71436
作品 0
杭州
私信 提问
EMQ X 规则引擎系列(五)存储消息到 Cassandra 数据库

Cassandra 介绍与安装 Cassandra 是来自 Apache 的开源分布式数据库系统,它能在支持线性扩展 、 高可用的特性下,不损失原有的读写性能。目前广泛运用于各个大企业的后端服务中,例如 Netf...

EMQX
09/03
19
0
EMQ X Meetup 深圳

活动简介: Since the EMQ X open source project first launch in 2013, EMQ has rapidly became a global leader of providing messaging and streaming solutions for IoT in 5G era. Its ......

映云科技
03/26
26
0
Mosquitto 上建立到 EMQ X 的桥接

EMQ X 节点可以被其他类型的 MQTT 消息中间件桥接,实现跨平台的消息订阅和发送。本文我们以一个配置实例来说明如何配置 Mosquitto 到 EMQ X 的桥接。 Mosquitto 是一个小型轻量的开源 MQTT...

EMQX
08/28
48
0
EMQ 获得2019无锡世界物联网博览会 “新技术新产品新应用” 金奖

2019年9月7日,一年一度的世界物联网博览会在无锡顺利开幕。当晚,由物博会组委会主办的物联网之夜颁奖典礼——2019 年新技术新产品新应用成果发布会隆重举行。今年的评审专家委员会由中国工...

EMQX
09/11
20
0
EMQ X 3.0-beta.1 版本发布,完整支持 MQTT-5.0 协议

EMQ X 3.0-beta.1 版本发布,版本别名: The Promise of Tomorrow。兼容 MQTT-3.1.1 协议的同时, 完整支持 MQTT-5.0 协议。此外还增加了很多实用的功能特性,重构了核心组件,提升了系统的伸...

emqtt
2018/09/05
1K
1

没有更多内容

加载失败,请刷新页面

加载更多

golang-字符串-地址分析

demo package mainimport "fmt"func main() {str := "map.baidu.com"fmt.Println(&str, str)str = str[0:5]fmt.Println(&str, str)str = "abc"fmt.Println(&s......

李琼涛
今天
4
0
Spring Boot WebFlux 增删改查完整实战 demo

03:WebFlux Web CRUD 实践 前言 上一篇基于功能性端点去创建一个简单服务,实现了 Hello 。这一篇用 Spring Boot WebFlux 的注解控制层技术创建一个 CRUD WebFlux 应用,让开发更方便。这里...

泥瓦匠BYSocket
今天
6
0
从0开始学FreeRTOS-(列表与列表项)-3

FreeRTOS列表&列表项的源码解读 第一次看列表与列表项的时候,感觉很像是链表,虽然我自己的链表也不太会,但是就是感觉很像。 在FreeRTOS中,列表与列表项使用得非常多,是FreeRTOS的一个数...

杰杰1号
今天
8
0
Java反射

Java 反射 反射是框架设计的灵魂(使用的前提条件:必须先得到代表的字节码的 Class,Class 类 用于表示.class 文件(字节码)) 一、反射的概述 定义:JAVA 反射机制是在运行状态中,对于任...

zzz1122334
今天
6
0
聊聊nacos的LocalConfigInfoProcessor

序 本文主要研究一下nacos的LocalConfigInfoProcessor LocalConfigInfoProcessor nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/config/impl/LocalConfigInfoProcessor.java p......

go4it
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部