Apache Pulsar 的湖仓一体方案:Pulsar-Lakehouse 分层存储集成详解

2023/02/08 18:00
阅读数 652

文章摘要

本文整理自 ApacheCon Asia 2022 上 StreamNative 工程师陈航的演讲《Make Apache Pulsar as Lakehouse: Introduction Lakehouse Tiered Storage Integration for Pulsar》。本文将介绍将 Apache Pulsar 二级存储与 Lakehouse 集成的湖仓一体解决方案。

作者简介

陈航,StreamNative 专家工程师,Apache Pulsar PMC 成员,Apache BookKeeper Committer。陈航曾任 BIGO Staff Engineer,他将 Apache Pulsar 引入 BIGO 消息平台,并与上游和下游系统如 Flink、ClickHouse 和其他内部系统集成,进行实时推荐和分析。在 BIGO 期间,陈航专注于 Pulsar 的性能调整,新功能开发和 Pulsar 生态系统的整合。其对 Pulsar 的主要贡献有 Pulsar Node balance、性能调优、KoP、Pulsar Flink connector SQL 等。

导读

Apache Pulsar 是一种用于缓存数据并在不同系统之间解耦的消息总线。为了支持长期的主题数据存储,Pulsar 内引入了分级存储,将冷数据 Offload 到分级存储中,例如 GCS、S3 和 HDFS 等。但是,当前 Offload 的数据是由 Pulsar 管理的非开放格式数据,是原始的数据格式,且只有 Pulsar 可以访问,因此很难将其与其他大数据组件集成,例如 Presto、Flink SQL 和 Spark SQL 等。

为了解决这个问题,StreamNative 引入了 Lakehouse 来管理 Offload 数据,并与当前的主题冷数据 Offload 机制集成。用户可以使用 Lakehouse 提供的所有功能,例如事务支持、Schema 演进和 BI 支持等。集成后,Pulsar Topic 中的冷数据能够以流模式持续 Offload 到 Lakehouse 中,经 Schema 解析后存储为 Lakehouse Table 字段。

为了支持 Streaming Offload 并使 Offload 机制更具可扩展性,StreamNative 引入了按 Reader Offload 机制从主题中读取数据并写入分级存储。此外,用户还可以通过 Offloader 提供压缩服务后端,并将主题作为表,键的每个更新操作都被转换为表的 Upsert 操作。

本文分为四个部分,首先为大家简介 Apache Pulsar 及其二级存储功能和 Lakehouse 特性,然后分享将 Pulsar 二级存储与 Lakehouse 集成的方案,最后为大家展示相关未来规划。

Apache Pulsar 简介

Pulsar 在官网上的定义是“云原生分布式消息流平台”,在类似场景中,用户使用的其他消息系统有 Kafka、RabbitMQ 和 RocketMQ 等等,那么 Pulsar 相比于其他消息系统有哪些区别?Pulsar 的哪些功能吸引了社区爱好者和用户?为什么用户选择从其他消息系统切换到 Pulsar?

历史

首先回顾 Pulsar 历史。Pulsar 诞生已有 10 年时间,在此期间,Pulsar 经历了很多大公司大场景的考验。Pulsar 诞生于 2012 年,最初的目的是为了在 Yahoo 内部取代其他消息系统,构建统一逻辑大集群的消息平台。经历三年的开发与测试,在 2015 年,Pulsar 完成了在 Yahoo 内部的部署和替换,大规模部署在 Yahoo 内部,服务于 Yahoo mail、 Yahoo Finance、 Yahoo Sports/ Flickr 及广告平台等大规模的场景。2016 年,Yahoo 正式将 Pulsar 开源。2017 年 6 月 Pulsar 被捐赠给 Apache 基金会,2018 年 9 月毕业成为 ASF 顶级项目。

从毕业至今的四年时间内,Pulsar 从社区到 StreamNative 进行商业化开发发展至今,社区贡献者数量从几十人增长至近 600 人。Pulsar PMC 成员和 Committer 数量不断增加,每月活跃贡献者数量和日合并 Pull Request 数量都一直维持在很高的水平。Pulsar 经历了 10 年的生命周期,并且现在处于蓬勃发展的阶段。

架构

Pulsar 很多特性吸引大量的企业和开发者选择从其他消息系统切换到 Pulsar,其中最重要的特性是存储计算分离。作为 Pulsar 用户期间,我将消息系统从 Kafka 切换到了 Pulsar。当时我们面临的痛点是消息量增大,Kafka 运维压力非常大。我们发现 Pulsar 计算存储分离的架构在设计之初就解决了运维的痛点,深深地吸引了我逐渐深入 Pulsar 社区。

在存储计算分离的架构中,上层的数据服务层(即 Broker 层)不存储任何数据,所有的数据都交给底层的数据存储层 BookKeeper 进行存储。Producer 将消息发送给 Broker,Broker 以 Pipeline 的形式将数据的副本同时写入 Bookie。Bookie 和 Broker 是两个独立的系统,好处是 Broker 无状态、可以使用扩展,Bookie 也可以单独扩展,这样可以解决 Kafka 在扩缩容时需要不断搬运数据的痛点。

底层 BookKeeper 可以理解为一个特殊设计的 WAL log 存储系统,支持以下几个特性:

  • • 多副本并行写入

  • • 强一致性

  • • 读写高吞吐

  • • 低延迟

  • • I/O 隔离

大家可以通过 Apache Pulsar 公众号博客、BookKeeper 官网等渠道更多地了解 BookKeeper,此处不做详细展开说明。

周边生态

Pulsar 系统分为三个部分,Message、Compute(Processing,计算) 和 Storage。Message 部分首先提供了 Pub-Sub 功能,这也是所有消息系统功能的基石。Pub-Sub 上涉及对不同客户端的对接,如 Go、Java、C++、Python、Node.js、Rust、C# 等。Pulsar IO 对接多种类型 Connector 来保证其他系统的数据可以方便轻松地和 Pulsar 进行整合,如 Kafka Connector 等。Message 也会对接数据库系统,如 Oracle、MongoDB 和 MySQL 等,收集 CDC 日志写入 Pulsar Topic 并进行消费。

在上图右侧的计算部分,Pulsar 支持通过 SQL 的方式读取 Topic 数据,这个功能对数据分析与问题定位中非常有帮助。目前的 Pulsar SQL 是 Presto Connector,可以通过 Hive 查询的方式查询 Pulsar Topic 数据并展现出来。第二个功能是支持批流一体,Pulsar 已支持 Storm、Flink 和 Spark 等 Connector 读取并写入 Topic 数据。第三个功能是 Pulsar Functions,用以解决轻量化计算的问题。假定某一个 Pulsar Topic 的数据需要筛选字段或者整合多个字段来写成另一个字段,此类轻量级转换不适合交给 Flink 等系统,并且数据需要从一个系统流向另一个系统,Pulsar Functions 即可很方便地在 Pulsar 集群内进行轻量级计算。Pulsar Functions 也支持进行多次转换,StreamNative 提供 Function Mesh 功能将多个 Function 组合成网络,用户定义数据流向后并部署后,数据从一个 Function 中处理后流向下一个 Function。

在上图下侧是本文将重点介绍的存储部分。横向的箭头是时间线,BookKeeper 是最新的分级存储,Hadoop 和 HDFS 是最早的分级存储。最新的数据在 BookKeeper 上,最早的数据可以 Offload 到二级存储,并且可以在需要时从二级存储中读取。Pulsar 在设计存储时原生考虑了对批和流的统一。首先我们需了解批流的运行。

Pulsar 将 Topic Partition 数据分为多个分片(Segment)对应多个 Ledger。Ledger 是 Pulsar 内部的概念,可以理解为分片。生产者将数据发布到 Topic 时依次写入各分片,Segment 0 写满后才会写入 Segment 1,以此类推,当最新的分片写满后对分片进行轮询后写入下一个分片。消费者在读取时,假如读取位置是 Segment 1,那么消费者会在读取完 Segment 1 后再读取 Segment 2,以此类推,这样的流读保证生产和消费顺序的一致。

流读的特性是保证顺序,并非是并行的,因此吞吐可能受限。批读不需要保证顺序,可以同时对分片并行读取。在通过 Presto/Pulsar SQL 读取 Topic 数据时,首先会解析 SQL,然后划分分片,比如在读取 Segment 1、2、4 的数据时,三个读取请求分别分配给三个 Worker 同时并行读取,因此批读可以达到很高的吞吐。上图是 Pulsar 支持批流一体的最原始的设计,本文涉及的二级存储也是基于分片实现的。每一次 Offload 下来的单位一定是分片,Segment 0 Offload 后会依次 Offload Segment 1、Segment 2 等。

二级存储

Pulsar 分级存储架构第一层是 Cache,主要用于存储热数据,Cache 需要的内存最多,开销最大。BookKeeper 可以为分布式 Cache,用于存储温数据。由于 BookKeeper 需要达到高吞吐和低延迟,所以一般在生产环境中对 BookKeeper IO 性能有要求,如果磁盘性能太差,就没有办法保证 IO 的高吞吐。在大部分场景下,BookKeeper 的 Journal 盘都是 SSD,条件允许的话,Ledger 也使用 SSD 可以达到更好的性能。

冷数据长期留在 BookKeeper 中是对资源的浪费,因为冷数据大概率不会被经常访问,且访问频率低于温数据,于是我们更加倾向于将冷数据搬运到二级存储上。这样可以解决其他消息系统存在的问题,比如在 Kafka 中需要配置 Retention 让数据过期,否则磁盘无法承受系统压力,如果不删除数据就需要加磁盘,然而冷数据使用概率很小,为其增加磁盘成本太高;而 Pulsar Topic 的数据可以作为存储,不需要让数据过期。在 Pulsar 内无需配置 Retention,可以将冷数据 Offload 到更加廉价的分级存储,如 S3。用户依旧可以使用同样的接口来读取冷数据,无需关心数据的位置,比如在需要流读 30 天之前的数据时,Pulsar 会将数据路由到分级存储上来读取,如果需要进行批读、数据已经被 Offload 到分级存储上,Pulsar 也会并行地从分级存储上拉取数据。

当前分级存储存在的问题

当前分级存储的问题在于数据很难永远保存在 Pulsar 内,主要原因如下:

  • • 分级存储是 PIP-17[1] 引入的特性,当前的 Offload 系统以分片(Ledger)为单位,不支持 Streaming Offload:假如系统 4 个小时都不轮询,那么系统至少需要等待 4 个小时才能触发 Offload,无法支持 Streaming。目前社区有一个支持 Streaming 的提案仍在优化中。

  • • Offload 自定义格式很难与其他系统集成:比如通过 Hive 将 Pulsar Offload 到 S3 上的数据用 SQL 的方式读取不可行,因为缺少相应 Connector 解析 License。

  • • Offload Raw Data 对数据分析不友好:假如 Topic Schema 数据有 100 个字段,统计某一个字段出现的次数最理想的方式是抽取一个字段做 Offload Raw Data 对数据分析不友好:假如 Topic Schema 数据有 100 个字段,统计某一个字段出现的次数最理想的方式是抽取一个字段做 Distinct Count,但是目前 Pulsar 不支持这种方式,在 Offload 时需要读取搜索区间内所有符合条件的数据,然后在内存中进行过滤,非常消耗资源。

  • • PIP-17[2] 设计之初没有考虑 Lakehouse,架构上很难和 Lakehouse 集成。

  • • 读吞吐性能不高:社区正在修复这个问题,读吞吐性能将提升,但是从整体设计上看,读吞吐并不友好。

Lakehouse 简介

下面为大家介绍 Lakehouse 吸引我们的特点:

  1. 1. 以 Streaming 的方式集成、迭代数据。

  2. 2. 可以很好地处理 Schema 和 Schema Evolution:Pulsar 和 Lakehouse 都有 Schema,将二者 Schema 进行集成可以支持很多功能特性。

  3. 3. 对元数据和大规模数据管理友好。

  4. 4. 存储格式是 Open-Format:只要把数据写入 Lakehouse 就可以供所有分析系统读取。

Lakehouse 目前主要有三个产品:Apache Hudi、Iceberg 和 Delta Lake。下图展示了我们最终希望 Pulsar 和 Lakehouse 集成实现的效果。首先各种类型的数据包括 Event、Log、Image、CDC、Connect 和 Pulsar IO 相关的数据都会被发送给 Pulsar Topic,Pulsar Offloader 将数据实时地 Offload 到 Lakehouse Table,每次 Offload 都会进入 Snapshot。Snapshot Commit 之后在 Lakehouse 内可见。如果进行流读,已经 Offload 到 Lakehouse Table 的数据会被读取回来并提供给 Broker,由 Broker 提供给上游的流读接口。假如我们通过流读 API 读取 0 - 9,7 已经被 Offload,那么 0 - 7 会在 Lakehouse 读取出来;8 和 9 还在 BookKeeper 内,从 BookKeeper 读取。

数据分析型业务也需要流读和批读,流读会从 0 一直按顺序读到 9,批读则会先观察哪些数据在 Lakehouse、哪些在 BookKeeper 内。0 - 7 在 Lakehouse 内,8 和 9 在 BookKeeper 内,批读会对此进行区分,将 0 - 7 通过 Table API 读取,8 - 9 通过流读 API 读取。最终,用户将看到数据全集。

如何将 Lakehouse 与 Pulsar 集成

将 Pulsar 和 Lakehouse 集成有两种方案,第一种方案是以 IO Connector 的方式集成,第二种方案是通过 Offload。从下图可以直观地看到两种方案的区别。

如果以 IO Connector 的方式集成,数据是单向的,仅仅将 Pulsar Topic 数据写入 Lakehouse Table,数据的整个生命周期由 Lakehouse 管理。如果通过 Pulsar Offloader 集成,数据 Offload 后通过 Streaming API 读回 Pulsar,并通过 Lakehouse API 支持批读,所以这种方案同时支持流读和批读,数据的整个生命周期由 Pulsar 管理。这样可以存储任意长时间的数据。以上两种方式 StreamNative 团队都在开发,第一种方案已经完成,大家可以在这里看到 Pulsar IO 与 Lakehouse 的集成,目前已支持 Delta Lake 和 Iceberg。未来我们会与大家分享第一种方案的具体实现,本文主要介绍第二种方案 Offloader。

Offloader 会抽取 OffloadService 来简单化 Offload 读写,并且 OffloadService 是可插拔的模块化事件。OffloadService 设计分为三个模块:Message Container、Message Formatter 和 Reader & Writer。

Message Container 工作原理如下。每一个 Topic 都拥有 Container Group,Pulsar 内通过 Raw Reader 来观测是否有新数据产生,如果有,就消费新数据并填充到当前的 Container。Offload 顺序地从 Container 向 Lakehouse 写入数据,比如如果需要将 Container 1、2、3、4 的数据写入,那么会首先将 Container 1 的数据 Offload 到 Lakehouse,然后依次处理 Container 2、3、4。此处我们需要注意,用 Raw Reader 消费的数据的格式是 Raw Message,而不同的 Lakehouse 可接受的数据格式不同,因此此处需要进行 Format Encode 后再写入数据。用 Raw Reader 来集成 Pulsar 和 Offload Service 可以同时运行在 Pulsar Broker 的内部和外部(当前 Offload 实现只能运行 Pulsar Broker 内部)。

Message Formatter & Writer 模块可以进行多元化处理。数据写入不同的目标存储就需要不同的 Formatter,向 HDFS、Hudi、Delta Lake 和 Iceberg 等写入数据都有各自的 Formatter,将 Raw Message 转换为各自的存储格式。

Message Formatter & Writer 和 Reader 对应的是系统内 Reader 和 Writer 的触发实例。Message Formatter & Reader 读取和写入远离相近。Reader 将数据从 Storage 读取后,最原始的数据格式是目标存储各自的格式,那么就需要用 Formatter Decode 将数据转换为 Raw Message。

数据转换为 Raw Message 后会回填到相应的 Container,由 Container 呈现给 Pulsar Streaming Reader。从上向下看整个读取流程,比如从 0 - 1000 读取 Pulsar Topic 里某一个 Ledger 的一个 Entry,首先会将读进行划分,0 - 300 分给 Container 1,301 - 600 分给 Container 2,601 - 1000 分给 Container 4,几个 Container 分别发送请求给存储回填数据,这样读取的时候存储可以知道 Container 1、2、4 有数据,依次读取即可。此过程必须在 Pulsar Broker 内进行。

未来规划

本文介绍了 Lakehouse 分级存储和 Pulsar Offload Service 如何集成。接下来,我们将继续将 Pulsar 和 HDFS、Apache Hudi、Delta Lake 和 Apache Iceberg 单独做具体的集成,此项工作已完成大半,期待完整版在 2023 年与大家见面。

其次,我们计划把 Lakehouse 作为 Compaction Service。Pulsar 的 Compaction 基于 BookKeeper 来实现,我们可以利用 Lakehouse Table 的 Upsert 功能来实现 Pulsar Topic Compaction。

引用链接

[1] PIP-17: https://github.com/apache/pulsar/wiki/PIP-17:-Tiered-storage-for-Pulsar-topics

[2] PIP-17: https://github.com/apache/pulsar/wiki/PIP-17:-Tiered-storage-for-Pulsar-topics

本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部