教你如何使用 RisingWave 一秒钟直接洞察 kafka 中的数据

原创
01/04 17:43
阅读数 14

RisingWave是一款云原生流数据库,旨在帮助用户更轻松地实现实时流处理。它具有许多功能,包括自由创建物化视图(Materialized View),并支持通过SQL语言对数据进行查询和处理。通过RisingWave,用户可以轻松地消费Kafka中的数据,并且可以通过命令行直接查询Kafka中的数据,无需启动任何流式任务。此外,RisingWave支持在流批处理模式下执行相同的SQL语句,这意味着用户可以在创建物化视图(Materialized View)之前快速验证他们的SQL语句是否正确。总之,RisingWave是一个功能强大、易于使用的工具,可以为用户提供更高效、更智能的流数据处理体验。

查询演示

构建数据源

在本文中,我们假设用户的数据已经存储在Kafka中,并且以JSON格式存在。请注意,RisingWave也原生支持其他数据格式,例如Avro、Protobuf等,但本文仅以JSON为例。下面是创建Table的语句示例:

create source s1 (v1 int, v2 varchar) with (
  connector = 'kafka',
  topic = 'kafka_1_partition_topic',
  properties.bootstrap.server = '127.0.0.1:29092',
  scan.startup.mode = 'earliest'
) row format json;

这里我们假设在一个json对象中存在一个名为 v1 的字符串类型字段和一个名为 v2 的整型字段,样例如下:

{
  "v1": "1",
  "v2": 1
}

在这个json对象中,v1 字段是一个字符串类型的数据,可以包含任何文本内容。而 v2 字段则是一个整型数据,表示一个数值。在一些情况下,这些json对象可能会作为数据传递给后端服务器或者前端应用程序进行进一步的处理和展示。

当上述建表语句被成功创建后,用户便可以直接进行查询而无需将kafka中的数据物化。这意味着,用户可以更加高效地查询数据,同时也减少了对磁盘空间的需求。此外,用户还可以轻松地对数据进行筛选、排序和聚合等操作,以满足不同的需求。因此,这种方法在大量数据查询和分析场景下具有很高的实用性和可行性。

dev=> select * from s1;
 v1 |  v2  
----+------
  1 | 1
  2 | 22
  3 | 333
  4 | 4444
(4 rows)

Kafka中的时间戳

在 Kafka 中,每条消息都会带有一个时间戳。Kafka 内部会为每个时间戳建立索引,以便用户可以通过时间戳来过滤消息。当用户像上图所示建立 Kafka 源之后,RisingWave 会为每条记录添加一个虚拟列 _rw_kafka_timestamp,这个列即为 Kafka 中每条消息的时间戳。用户可以通过以下语句查询这个时间戳:

dev=> select _rw_kafka_timestamp, v1, v2 from s1;
      _rw_kafka_timestamp      | v1 |  v2  
-------------------------------+----+------
 2023-02-27 10:44:13.375+00:00 |  1 | 1
 2023-02-27 10:44:13.375+00:00 |  2 | 22
 2023-02-27 10:44:13.375+00:00 |  3 | 333
 2023-02-27 10:44:13.375+00:00 |  4 | 4444
(4 rows)

用户可以使用查询语句来查询特定时间戳的数据。此外,用户还可以基于该时间戳进行高效过滤。例如,假设用户只想查询最近10分钟内的数据,他们可以使用以下语句来查询:

dev=>  select _rw_kafka_timestamp, v1, v2 from s1 where _rw_kafka_timestamp > now() - interval '10 minute';
NOTICE:  Your session timezone is Asia/Shanghai. It was used in the interpretation of timestamps and dates in your query. If this is unintended, change your timezone to match that of your data's with `set timezone = [timezone]` or rewrite your query with an explicit timezone conversion, e.g. with `AT TIME ZONE`.

      _rw_kafka_timestamp      | v1 |  v2  
-------------------------------+----+------
 2023-02-27 18:44:13.375+08:00 |  1 | 1
 2023-02-27 18:44:13.375+08:00 |  2 | 22
 2023-02-27 18:44:13.375+08:00 |  3 | 333
 2023-02-27 18:44:13.375+08:00 |  4 | 4444
(4 rows)

针对上述语句,risingwave会利用kafka中的时间戳的索引进行优化,提前过滤掉不需要的消息内容,减少数据的扫描,从而提升查询效率。这将返回最近10分钟内的所有数据,使用户能够更轻松地获取所需的信息。这些功能不仅使用户更轻松地获取所需的信息,而且还提供了更多的工具和方法来管理和分析数据,从而更好地满足用户的需求。

总结

Kafka是一个流处理系统,它广泛应用于各种场景。在risingwave中,我们支持直接查询Kafka中的数据,这给开发人员提供了很大的便利。直接查询Kafka数据可以帮助开发人员在创建流式任务之前验证语句的正确性。开发人员可以更轻松地消费Kafka中的数据并进行处理,同时也可以更加灵活地选择数据源。这样一来,开发人员可以更加高效地开发出符合需求的应用。

在本文中,我们将详细介绍如何直接查询Kafka中的数据。我们将讨论如何使用不同的查询语句和过滤条件来查询Kafka中的数据,并对查询结果进行分析和处理。此外,我们还将介绍如何通过使用Kafka中的时间戳来优化查询速度,从而减少Kafka的查询数据量,使查询速度更快,效率更高。我们将讨论如何在不同的场景下使用时间戳来优化查询,以及时间戳的使用注意事项。


关于 RisingWave

RisingWave 是一个云原生SQL流式数据库。其旨在降低构建实时应用的门槛以及成本。

GitHub: risingwave.com/github

官网: risingwave.com

Slack: risingwave.com/slack

文档: risingwave.dev

社区用户微信交流群:risingwave_assistant

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部