StreamNative 宣布发布 Flink-Pulsar SQL 连接器

原创
2022/12/29 18:34
阅读数 67

本文翻译自 StreamNative 博客《Announcing the Flink-Pulsar SQL Connector》,作者 Yufei Zhang。
本文译者:王中兴,就职于 ebay 消息中间件团队,社区昵称 alphawang。

StreamNative 宣布 Flink-Pulsar SQL 连接器已经发布,现在即可下载并使用。Flink-Pulsar SQL 连接器支持通过简单的 Flink SQL 从 Pulsar 主题中查询及写入数据。使用该连接器可以轻松创建 Flink + Pulsar 数据管道,无需编写 Java 或 Scala 代码。本文将阐述该连接器的优势和功能以及使用方法。

什么是 Flink-Pulsar SQL 连接器?

Flink 社区提供了 SQL 和 Table API[1] 以使用 SQL 查询来表达 Flink 作业。Flink-Pulsar SQL 连接器使得 Flink SQL 通过简单的“SELECT FROM”和“INSERT INTO”语句来读取和写入 Pulsar 主题。

注意:Flink-Pulsar SQL 连接器基于 Pulsar DataStream 连接器[2] 实现,继承了 DataStream 连接器的大部分配置。

为什么使用 Flink-Pulsar SQL 连接器?

Flink-Pulsar SQL 连接器有三大优点:

  • • 易用性:Flink-Pulsar SQL 连接器让用户可以发现 Pulsar 中的实时数据值,只需通过 SQL 查询提交 Flink 作业即可,而无需编写和部署 Java。可以使用基于 native 表的 SQL 查询 Pulsar 主题,而无需编写“CREATE TABLE”语句。

  • • 可扩展性:Flink-Pulsar SQL 连接器继承了底层 DataStream 连接器的高可扩展性,该连接器通过使用最新的 Source 和 Sink API 实现可扩展性。

  • • 灵活性:Flink-Pulsar SQL 连接器支持对主题模式进行灵活地订阅,即便与某模式相匹配的主题尚未被创建也可以订阅。该连接器能够在运行时发现新添加的主题,而无需重新启动作业。

Flink-Pulsar SQL 连接器的功能特性

  • • 根据消息元数据定义列:Flink-Pulsar SQL 连接器允许将 Pulsar 消息的元数据(例如 event_timeproducer_name 以及 publish_time 等)映射到 Flink 表列。这在基于时间属性元数据定义 Watermark 或使用主题名称充实 Flink 记录时非常有用。

  • • 动态主题发现:与 DataStream 连接器类似,Flink-Pulsar SQL 连接器支持定义主题模式,可以在 Flink SQL 作业运行时通过添加新主题来添加新数据。当需要扩展 Source 主题时无需重新启动 Flink 作业,这很有用。

  • • Avro 和 JSON 格式:Flink-Pulsar SQL 连接器支持通过 Flink JSON 和 Avro 格式来读取 Pulsar 主题中存储的二进制数据。当从带有 JSON 或 Avro Schema 的 Pulsar 主题中读取消息时,它还可以自动派生 Flink 表的 Schema。阅读此文档[3]了解更多有关此功能的信息。

  • • PulsarCatalog:PulsarCatalog 允许使用 Pulsar 集群作为 Flink 表的元数据存储。它支持定义两种类型的表:explicit 和 nativenative 表可以在不显式创建 Flink 表的情况下读取 Pulsar 主题,因此命名为“native”。阅读此文档[4]获取有关 explicit 和 native 表的详细说明。

使用 Flink-Pulsar SQL 连接器

前提条件

  • • 对于 Table API 程序,将 Flink-Pulsar SQL 连接器添加到依赖项中。

  • • 对于基于 SQL Client 的 Flink SQL 查询,先下载 SQL Jar 包并在启动 SQL 客户端时将其添加到类路径中,例如 ./bin/sql-client.sh --jar flink-sql-connector-pulsar-1.15.1.1.jar

Maven SQL Jar
io.streamnative.connectors flink-sql-connector-pulsar 1.15.1.1 SQL JAR[5]

如何使用 Flink-Pulsar SQL 连接器

以下示例代码演示了如何使用 PulsarCatalog 和 Flink-Pulsar SQL 连接器。你可以在 flink-example[6] 和 sql-examples[7] 代码库中找到所有示例代码。

1.创建 PulsarCatalog。

CREATE CATALOG pulsar
  WITH (
    'type' = 'pulsar-catalog',
    'catalog-admin-url' = 'http://pulsar:8080',
    'catalog-service-url' = 'pulsar://pulsar:6650'
  );

2.创建带有水印策略的显式表。

CREATE TABLE sql_user (
    name STRING,
    age INT,
    income DOUBLE,
    single BOOLEAN,
    createTime BIGINT,
    row_time AS cast(TO_TIMESTAMP(FROM_UNIXTIME(createTime / 1000), 'yyyy-MM-dd HH:mm:ss') as timestamp(3)),
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'pulsar',
  'topics' = 'persistent://sample/flink/user',
  'format' = 'json'
);

3.对表运行窗口查询。

SELECT single,
 TUMBLE_START(row_time, INTERVAL '10' SECOND) AS sStart,
 SUM(age) as age_sum from `sql_user`
 GROUP BY TUMBLE(row_time, INTERVAL '10' SECOND), single;

4.写入同一个表。

INSERT INTO `sql_user` VALUES ('user 1', 11, 25000.0, true, 1656831003);

目前为止我们介绍了如何创建和查询 explicit 表。接下来我们可以直接查询映射到主题 persistent://sample/flink/user 的 native 表。

5.从名为 user 的 native 表中读取 10 条记录。

SELECT * FROM `user` LIMIT 10;

有关更多信息,请参阅 Flink-Pulsar SQL 连接器文档[8]

未来规划

目前 Flink-Pulsar SQL 连接器不支持定义 PRIMARY KEY,因此不能支持带有 upsert/delete 操作的变化数据捕获(CDC)格式。下一步我们将改进连接器以支持 CDC 场景的 upsert 模式。

参与贡献

Flink-Pulsar SQL 连接器是一个由社区驱动的项目。参与 Flink-Pulsar SQL 连接器的相关工作,请参考以下资源:

  1. 1. 试用 Flink-Pulsar SQL 连接器:下载[9]连接器并阅读文档[10]以了解更多信息。

  2. 2. 参与贡献:如果你有任何功能请求或错误报告,请随时提交 Pull Request 来分享你的反馈和想法[11]

  3. 3. 联系我们:可以在 GitHub[12] 上创建 Issue,向 Pulsar 邮件列表[13]发送电子邮件,或在 Pulsar Flink 交流群与 Pulsar 专家共同探讨。

    扫码回复关键词 flink,加入 Pulsar-Flink 交流群

引用链接

[1] SQL 和 Table API: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/overview/
[2] Pulsar DataStream 连接器: https://hub.streamnative.io/data-processing/pulsar-flink/1.15.0.1/#pulsar-datastream-connector
[3] 此文档: https://hub.streamnative.io/data-processing/pulsar-flink/1.15.0.1#native-tables
[4] 此文档: https://hub.streamnative.io/data-processing/pulsar-flink/1.15.0.1#available-metadata
[5] SQL JAR: https://repo1.maven.org/maven2/io/streamnative/connectors/flink-sql-connector-pulsar/
[6] flink-example: https://github.com/streamnative/flink-example
[7] sql-examples: https://github.com/streamnative/flink-example/blob/main/sql-examples/sql-example.md
[8] Flink-Pulsar SQL 连接器文档: https://hub.streamnative.io/data-processing/pulsar-flink/1.15.0.1
[9] 下载: https://hub.streamnative.io/data-processing/pulsar-flink/1.15.0.1
[10] 文档: https://hub.streamnative.io/data-processing/pulsar-flink/1.15.0.1
[11] 分享你的反馈和想法: https://github.com/streamnative/flink
[12] GitHub: https://github.com/streamnative/flink
[13] Pulsar 邮件列表: https://lists.apache.org/list.html?dev@pulsar.apache.org

本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部