博文推荐|使用 Pulsar IO 打造流数据管道

原创
2021/11/16 20:00
阅读数 105


本文翻译自 StreamNative 博客。博客原作者:Ioannis Polyzos,StreamNative 解决方案工程师。原文链接:https://streamnative.io/blog/engineering/2021-11-10-streaming-data-pipelines-with-pulsar-io/

背景

构建现代数据基础设施一直是当今企业的难题。当今的企业需要管理全天候生成和交付的大量异构数据。然而,由于企业对数据的数量和速度等等有多种要求,没有“一刀切”的解决方案。相反,企业需在不同系统之间移动数据,以便存储、处理和提供数据。

粗看搭建基础设施的历史,企业使用了许多不同的工具来尝试移动数据,例如用于流式工作负载的 Apache Kafka 和用于消息工作负载的 RabbitMQ。现在,Apache Pulsar 的诞生为企业简化了这个过程。

Apache Pulsar 是一个云原生的分布式消息流平台。Pulsar 旨在满足现代数据需求,支持灵活的消息传递语义、分层存储、多租户和异地复制(跨区域数据复制)。自 2018 年毕业成为 Apache 软件基金会顶级项目以来,Pulsar 项目经历了快速的社区增长[1]、周边生态的发展和全球用户的增长。将 Pulsar 用作数据基础设施的支柱,公司能够以快速且可扩展的方式移动数据。在这篇博文中,我们将介绍如何使用 Pulsar IO 在 Pulsar 和外部系统之间轻松导入和导出数据。

1. Pulsar IO 简介

Pulsar IO 是一个完整的工具包,用于创建、部署和管理与外部系统(如键/值存储、分布式文件系统、搜索索引、数据库、数据仓库、其他消息传递系统等)集成的 Pulsar 连接器。由于 Pulsar IO 构建在 Pulsar 的无服务器计算层(称为Pulsar Function[2] )之上,因此编写 Pulsar IO 连接器就像编写 Pulsar Function 一样简单。

借助 Pulsar IO,用户可以使用现有的 Pulsar 连接器或编写自己的自定义连接器,轻松地将数据移入和移出 Pulsar。Pulsar IO 拥有以下优势:

多样的连接器:当前 Pulsar 生态中有许多现有的 Pulsar IO 连接器[3]用于外部系统,例如 Apache Kafka、Cassandra 和 Aerospike。使用这些连接器有助于缩短生产时间,因为创建集成所需的所有部件都已就位。开发人员只需要提供配置(如连接 url 和凭据)来运行连接器。托管运行时:Pulsar IO 带有托管运行时,负责执行、调度、扩展和容错。开发人员可以专注于配置和业务逻辑。多接口:通过 Pulsar IO 提供的接口,用户可以减少用于生成和使用应用程序的样板代码。高扩展性:在需要更多实例来处理传入流量的场景下,用户可以通过更改一个简单的配置值轻松横向扩展;如果用户使用 Kubernetes 运行时,可根据流量需求进行弹性扩展。充分利用 schema:Pulsar IO 通过在数据模型上指定 schema 类型来帮助用户充分运用 schema,Pulsar IO 支持 JSON、Avro 和 Protobufs 等 schema 类型。

2. Pulsar IO 运行时

由于 Pulsar IO 建立在 Pulsar Function 之上,因此 Pulsar IO 和 Pulsar Function 具有相同的运行时选项。部署 Pulsar IO 连接器时,用户有以下选择:

线程:在与工作线程相同的 JVM 中运行。(通常用于测试的和本地运行,不推荐用于生产部署。)进程:在不同的进程中运行,用户可以使用多个工作线程跨多个节点横向扩展。Kubernetes:在 Kubernetes 集群中作为 Pod 运行,worker 与 Kubernetes 协调。这种运行时方式保证用户可以充分利用 Kubernetes 这样的云原生环境提供的优势,比如轻松横向扩展。

3. Pulsar IO 接口

如前所述,Pulsar IO 减少了生成和消费应用程序所需的样板代码。它通过提供不同的基本接口来实现这一点,这些接口抽象出样板代码并允许我们专注于业务逻辑。

Pulsar IO 支持 Source 和 Sink 的基本接口。Source 连接器(Source connector)允许用户将数据从外部系统带入 Pulsar,而 Sink 连接器(Sink Connector)可用于将数据移出 Pulsar 并移入外部系统,例如数据库。

还有一种特殊类型的 Source 连接器,称为 Push Source。Push Source 连接器可以轻松实现某些需要推送数据的集成。举例来说,Push Source 可以是变更数据捕获源系统,它在接收到新变更后,会自动将该变更推送到 Pulsar。

Source 接口

public interface Source<T> extends AutoCloseable {     /**     * Open connector with configuration.     *     * @param config initialization config     * @param sourceContext environment where the source connector is running     * @throws Exception IO type exceptions when opening a connector     */    void open(final Map<String, Object> config, SourceContext sourceContext) throws Exception;     /**     * Reads the next message from source.     * If source does not have any new messages, this call should block.     * @return next message from source.  The return result should never be null     * @throws Exception     */    Record<T> read() throws Exception;}

Push Source 接口

public interface BatchSource<T> extends AutoCloseable {     /**     * Open connector with configuration.     *     * @param config config that's supplied for source     * @param context environment where the source connector is running     * @throws Exception IO type exceptions when opening a connector     */    void open(final Map<String, Object> config, SourceContext context) throws Exception;     /**     * Discovery phase of a connector.  This phase will only be run on one instance, i.e. instance 0, of the connector.     * Implementations use the taskEater consumer to output serialized representation of tasks as they are discovered.     *     * @param taskEater function to notify the framework about the new task received.     * @throws Exception during discover     */    void discover(Consumer<byte[]> taskEater) throws Exception;     /**     * Called when a new task appears for this connector instance.     *     * @param task the serialized representation of the task     */    void prepare(byte[] task) throws Exception;     /**     * Read data and return a record     * Return null if no more records are present for this task     * @return a record     */    Record<T> readNext() throws Exception;}

Sink 接口

public interface Sink<T> extends AutoCloseable {    /**     * Open connector with configuration.     *     * @param config initialization config     * @param sinkContext environment where the sink connector is running     * @throws Exception IO type exceptions when opening a connector     */    void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception;     /**     * Write a message to Sink.     *     * @param record record to write to sink     * @throws Exception     */    void write(Record<T> record) throws Exception;}

4. 总结

Apache Pulsar 能够作为现代数据基础设施的支柱,它使企业能够以快速且可扩展的方式搬运数据。Pulsar IO 是一个连接器框架,它为开发人员提供了所有必要的工具来创建、部署和管理与不同系统集成的 Pulsar 连接器。Pulsar IO 抽象掉所有样板代码,使开发人员可以专注于应用程序逻辑。

5. 延伸阅读

如果您有兴趣了解更多信息并构建自己的连接器,请查看以下资源:

查看 Pulsar 周边生态中所有 Pulsar IO 连接器[4]构建和部署 Source 连接器[5]为 Pulsar IO 编写自定义 Sink 连接器[6]监控和故障排除连接器[7]



译者简介

宋博,就职于北京百观科技有限公司,高级开发工程师,专注于微服务,云计算,大数据领域。

关注「Apache Pulsar」👇🏻,获取干货与动态

👇🏻 加入 Apache Pulsar 中文交流群 👇🏻

引用链接

[1] 社区增长: http://mp.weixin.qq.com/s?__biz=MzUyMjkzMjA1Ng==&mid=2247488107&idx=1&sn=384631ac80ee51a070c559369c5d9ef2&chksm=f9c5085cceb2814aa7182d530643bc6a23bcc931936b8f613d358864f06e1e5a754f756d454e#rd
[2] Pulsar Function: https://pulsar.apache.org/docs/en/functions-overview/
[3] 现有的 Pulsar IO 连接器: https://hub.streamnative.io/
[4] 查看 Pulsar 周边生态中所有 Pulsar IO 连接器: https://hub.streamnative.io/
[5] 构建和部署 Source 连接器: https://www.youtube.com/watch?v=w9xQyyoFds4
[6] 为 Pulsar IO 编写自定义 Sink 连接器: https://www.bilibili.com/video/BV1n44y147xr?p=7
[7] 监控和故障排除连接器: https://www.bilibili.com/video/BV1C64y197Db?p=3

点击「阅读原文」,查看 Apache Pulsar 干货集锦

本文分享自微信公众号 - ApachePulsar(ApachePulsar)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部