文档章节

# EMQ X 持久化插件系列(一)- 消息存储到 OpenTSDB 数据库

EMQX
 EMQX
发布于 08/08 10:04
字数 949
阅读 22
收藏 0

OpenTSDB 是可扩展的分布式时序数据库,底层依赖 HBase 并充分发挥了HBase的分布式列存储特性,支持数百万每秒的读写。

面对大规模快速增长的物联网传感器采集、交易记录等数据,时间序列数据累计速度非常快,时序数据库通过提高效率来处理这种大规模数据,并带来性能的提升,包括:更高的容纳率(Ingest Rates)、更快的大规模查询(尽管有一些比其他数据库支持更多的查询)以及更好的数据压缩。

本文以 CentOS 7.2 系统中的实际例子来说明如何通过 OpenTSDB 来存储相关的信息。

安装与验证 OpenTSDB 服务器

读者可以参考 OpenTSDB 官方文档Docker 来下载安装 OpenTSDB 服务器,本文使用 OpenTSDB 2.4.0 版本。

配置 EMQ X 服务器

通过 RPM 方式安装的 EMQ X,OpenTSDB 相关的配置文件位于 /etc/emqx/plugins/emqx_backend_opentsdb.conf,考虑到功能定位,OpenTSDB 插件仅支持消息存储功能。更多 backend 插件详见 EMQ X 数据持久化

配置连接地址与连接池大小、batch 策略:

## OpenTSDB Server 接入地址
backend.opentsdb.pool1.server = 127.0.0.1:4242

## 连接池大小
backend.opentsdb.pool1.pool_size = 8


## Max batch size of put 最大批量写条数
backend.opentsdb.pool1.max_batch_size = 20

## 通过 topic 过滤器存储全部消息
backend.opentsdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

**OpenTSDB Backend 消息存储规则参数: **

通过 topic 过滤器,设置需要存储消息的主题,pool 参数区别多个数据源:

## Store Publish Message
backend.opentsdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

启动该插件:

./bin/emqx_ctl plugins load emqx_backend_opentsdb

消息模板

由于 MQTT Message 无法直接写入 OpenTSDB, OpenTSDB Backend 提供了 emqx_backend_opentsdb.tmpl 模板文件将 MQTT Message 转换为可写入 OpenTSDB 的 DataPoint。

消息模板功能需要重启 EMQ X 才能应用更改。

tmpl 文件位于 data/templates/emqx_backend_opentsdb_example.tmpl,使用 json 格式, 用户可以为不同 Topic 定义不同的 Template, 类似:

{
    "sample": {
        "measurement": "$topic",
        "tags": {
            "host": ["$payload", "data", "$0", "host"],
            "region": ["$payload", "data", "$0", "region"],
            "qos": "$qos",
            "from": "$from"
        },
        "value": ["$payload", "data", "$0", "temp"],
        "timestamp": "$timestamp"
    }
}

其中, measurement 与 fields 为必选项, tags 与 timestamp 为可选项。<Where is value of> 支持通过占位符如 $key 提取变量名为 key 的变量,支持的变量如下:

  • qos: 消息 QoS
  • form: 发布者信息
  • topic: 发布主题
  • timestamp: 时间戳
  • payload.*: JSON 消息体内任意变量,如 { "data": [{ "temp": 1 }] } 使用 ["$payload", "data", "temp"] 可以提取出 1

本示例设定模板如下:

{
    "sample": {
        "measurement": "$topic",
        "tags": {
            "host": ["$payload", "data", "$0", "host"],
            "region": ["$payload", "data", "$0", "region"],
            "qos": "$qos",
            "from": "$from"
        },
        "value": ["$payload", "data", "$0", "temp"],
        "timestamp": "$timestamp"
    }
}

当 Topic 为”sample” 的 MQTT Message 拥有以下 Payload 时:

{
  "data": [
    {
      "temp": 1,
      "host": "serverA",
      "region": "hangzhou"
    },
    {
      "temp": 2,
      "host": "serverB",
      "region": "ningbo"
    }
  ]
}

Backend 会将 MQTT Message 转换为:

[
  {
    "measurement": "sample",
    "tags": {
      "from": "mqttjs_ebcc36079a",
      "host": "serverA",
      "qos": "0",
      "region": "hangzhou"
    },
    "value": "1",
    "timestamp": "1560743513626681000"
  },
  {
    "measurement": "sample",
    "tags": {
      "from": "mqttjs_ebcc36079a",
      "host": "serverB",
      "qos": "0",
      "region": "ningbo"
    },
    "value": "2",
    "timestamp": "1560743513626681000"
  }
]

使用示例

EMQ X 管理控制台 WebSocket 页面中,向 sample 主题发布如上格式消息消息,消息将解析存储到 OpenTSDB udp 数据库对应的 measurement 中。

总结

读者在理解了 OpenTSDB 中所存储的数据结构,学习使用消息模板配置写入消息字段格式后可以结合 OpenTSDB 拓展相关应用。


更多信息请访问我们的官网 emqx.io,或关注我们的开源项目 github.com/emqx/emqx ,详细文档请访问 官方文档

© 著作权归作者所有

EMQX
粉丝 4
博文 54
码字总数 71436
作品 0
杭州
私信 提问
小米正用时序数据库,解决这个“硬核”问题

参加 2019 Python开发者日,请扫码咨询 ↑↑↑ 作者 | 许俊红 来源 | 小米云技术(id:mi-cloud-tech) 时序数据 根据维基百科的定义[1],时间序列是一组按照时间发生先后顺序进行排列的数据...

AI科技大本营
03/21
0
0
开源监控系统 - OpenTSDB

开源监控系统OpenTSDB,用hbase存储所有的时序(无须 采样)来构建一个分布式、可伸缩的时间序列数据库。它支持秒级数据采集所有metrics,支持永久存储,可以做容量规划,并很容易的接入到现...

匿名
2012/07/07
22K
3
OpenTSDB 生产应用与思考

作者:陈杰,欢聚时代YY 基础架构部,数据库技术组,专注于HBase、Kafka,MySQL 等技术。 OpenTSDB 官方介绍 这里就不翻译了。http://opentsdb.net/overview.html How does OpenTSDB work? O...

pursue5956
2018/08/07
0
0
OpenTSDB 监控系统的研究和介绍

一、背景介绍 此次航天局为了让天宫一号与神舟九号载人交会顺利对接成功,采用了新一代数值天气预报系统为 神九保驾护航。新一代数值天气预报系统是中国国内技术最先进、分辨率最高、预报时效...

红薯
2012/07/07
4.4K
2
快速时间序列数据库 - kairosdb

kairosdb是一款时间序列数据库产品 它最早是从 openTSDB 衍生而来的,兼容 openTSDB,并有自己的特色,现行版本支持 hbase ,cassandra和 h2db 后端数据存储,采用java开发支持1.6及以上版本...

匿名
2017/06/28
1K
2

没有更多内容

加载失败,请刷新页面

加载更多

教你玩转Linux—添加批量用户

添加和删除用户对每位Linux系统管理员都是轻而易举的事,比较棘手的是如果要添加几十个、上百个甚至上千个用户时,我们不太可能还使用useradd一个一个地添加,必然要找一种简便的创建大量用户...

xiangyunyan
19分钟前
3
0
返回提示信息,如:xxx创建成功!

【服务端】在输出的方法块中,加入要输出的字段(qcm_batch_id) QCMUserType.cs: public struct QCM_Custom_Create_Batch_Out_Tag { public BASCoreType.Cmn_Out_T......

_Somuns
19分钟前
3
0
Aliyun Serverless VSCode Extension v1.12.0 发布

Aliyun Serverless VSCode Extension 是阿里云 Serverless 产品 函数计算 Function Compute 的 VSCode 插件,该插件结合了函数计算 Fun 工具以及函数计算 SDK ,是一款 VSCode 图形化开发调试...

阿里云官方博客
20分钟前
4
0
程序员如何培养解决复杂问题的能力?

今天在上网时候,突然看到了这篇文章,感觉非常的适合现在的自己去思考下,可能也适用在座的读者。程序员不仅仅是敲代码,更是一个复合能力的结合体,也不仅仅停留在技术和代码阶段。你想要成...

哥本哈根的小哥
23分钟前
6
0
市场变化驱动产品思维升级

宜信科技中心财富管理产品部负责人Bob,与大家一起聊聊个性化推荐产品功能的设计和B端产品的功能策划方式。 拓展阅读:回归架构本质,重新理解微服务 智慧金融时代,大数据和AI如何为业务赋能...

宜信技术学院
24分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部