Apache Pulsar 2023 动态展望

原创
2023/01/11 19:50
阅读数 167

作者介绍

李鹏辉,StreamNative 首席架构师,Apache Pulsar PMC 成员。拥有十余年消息系统与微服务工作经验。其近期工作方向和 Apache Pulsar 息息相关,曾于 2019 年推动 Pulsar 在智联招聘的落地,这是 Apache Pulsar 在中国的首个落地案例。

导语

新的一年已经开始,Apache Pulsar 在 2023 年可能发生怎样的进化呢?本文为大家介绍 2023 年 Apache Pulsar 预计的功能进展,包含 LTS 版本发布、元数据服务更新、负载均衡器更新、大规模延迟消息支持与 Lakehouse 集成等新特性,以及 Pulsar 在稳定性和 Metrics 系统的提升等等。

Apache Pulsar LTS 版本

Pulsar 在 2023 年会发布第一个 LTS (Long Term Support,长期支持)版本。当前版本存在的问题有:

  • • 每个版本维护周期较短,缺少长时间维护的版本:从 2.6 版本到 2.10 版本社区版本更新较快,近期还即将发布 2.11 版本,然而很多用户使用的版本依然停留在 2.7 和 2.8 阶段。用户如果进行大版本升级,比如从 2.8 升级为 2.10 版本,会获得 2.9 和 2.10 版本新增的特性与大大小小的 Bug Fix。而旧有大版本如果没有安全漏洞或者比较大的 Bug,很可能停止发版。

  • • 大版本发布周期较长:Pulsar 发版前因需要验证是否有新问题出现、新特性会不会和旧特性冲突破坏系统行为,发布经常延迟,社区会发起 RC 版本(Release Candidate,候选版本)供大家验证。一旦发现问题就需要重新发布 RC 版本,以此循环直至版本通过验证。大版本发布周期较长,从 Release Manager 开始操作到发布,需要一个月到三个月的时间。

  • • 用户升级很难跟进最新版本:无论对追求稳定性还是追求新特性的用户来说,都会面临版本升级的问题。追求稳定性的用户更期待 Bug Fix 和安全漏洞的修复可以持续更久的时间(2-3 年),如果对大版本新特性需求不大,用户可能一直使用当前大版本并专注于小版本的漏洞和 Bug 的修复。追求新特性的用户会面临新特性版本功能不稳定的问题。

    我们希望可以同时满足两类用户的不同需求。Pulsar 从 3.0 开始发布 LTS 版本(2.11 版本 后大概率进入 3.0 版本)并最长维护 3 年。社区计划每 18 个月发布一个 LTS 版本,Bug Fix 将持续 24 个月(两年内重要的 Bug Fix 都会持续在 LTS 版本中更新),此后的一年中 支持安全漏洞修复。LTS 版本支持周期较长,可以满足追求稳定性用户长期使用一个大版本的需求。

上图中的 3.1.x、3.2.x 等版本有新特性发布版本,近每三个月周期发布一次,但是社区不会长时间维护所有特性版本,可能仅维护一两个小版本,大部分版本维护时间大概持续 6 个月。追求新特性的用户可以根据需求进行频繁更新。

在上面的示意图中,0、3、6 等数字代表月份。假设 3.0.0 版本在 0 月发布,LTS 版本如 3.0.1、3.0.2 等发布周期为 1-2 个月,具体频次取决于 Bug Fix 重要程度。3.1.0 版本在 3 月发布,包含 3.0.x 没有的特性。这个规律会一直延续下去,为追求稳定性和追求新特性的用户提供不同版本的支持。追求稳定性的用户可以使用 3.0.x 版本,追求新特性的用户可以使用 3.x 版本。

有了 LTS 版本后,仅在相邻 LTS 版本间存在兼容性保障。基于同一个特性的 LTS 版本之间可兼容。示例如下:

  • • 3.0 - 4.0 - 3.0:兼容

  • • 3.2 - 4.0 - 3.2:兼容

  • • 3.2 - 4.4 - 3.2:兼容

  • • 3.2 - 5.0:不兼容

新元数据服务

社区近几年一直在将 Pulsar 中超出元数据的接口抽象出来。在抽象的驱动下可以进行不同的实现,比如支持 ZooKeeper、etcd 和 RocksDB(支持单机模式,Pulsar 新版本 standalone 使用 RocksDB 而非 ZooKeeper),虽然 Pulsar 目前支持 etcd 但是使用很少,因为它缺少 Java 客户端。

当前 Pulsar 使用 ZooKeeper 存在的问题有:

  • • 无法横向扩容:当集群增大、Topic 数量增多时,集群内需要保留的数据越来越多,Pulsar 上需要存储的元数据越来越多。这些元数据消耗 K-V(Key-Value)存储,如 BookKeeper Ledger 元数据和 Topic ManagedLedger 元数据等。ZooKeeper 无法通过添加节点提升性能。

  • • 无法支撑大数据集:

    • • ZooKeeper 不是为服务 K-V 存储而设计的,运维 ZooKeeper 会遇到很多问题,如 Snapshot、数据节点不一致等问题。

    • • ZooKeeper 数据最终加载在内存中,数据集增加,ZooKeeper 节点需要更大内存,对 GC 不友好。

    • • 数据存储在内存中,抓取 Snapshot 时需要把整个数据集周期性地刷到磁盘上。数据集大,磁盘开销大。


    StreamNative 正在计划引入新的元数据服务来支持大规模数据和横向扩展,预计在上半

    年将项目开源,目前该特性还未在社区进行讨论。新的元数据服务目标有:

    • 用于 K8s 环境:越来越多的用户在 K8s 上使用 Pulsar,元数据服务实现更简单可以帮助用户使用 K8s 支持的特性,如 CRD 和 etcd 等,来节省元数据服务选组所需的功能开发。

    • 横向扩展存储节点:Pulsar 内大部分使用元数据的场景是 K-V 存储,其余场景是分布式锁。

    • 支持增量 Snapshot:数据集大,全量 Snapshot 开销大。

    • 支持数据集大小至 100 GB。

    • 读写速率支持每秒可处理百万级 Operations 请求(1M ops/s):对百万级 Topic 集群友好,Ledger 轮询时会进行 Topic 元数据更新。

    • 读写延迟:读 P99 延迟小于 5ms,写 P99 延迟小于 20ms。

    该服务结构图如上图所示,从上至下分为控制节点和数据节点。 控制节点是 K8s Operator,数据节点维护所有 K-V 数据。 K8s Operator 作为控制节点会监控所有存储节点的健康状态,并且将数据分成多个分片(Shard)来进行水平扩容,分片分布于不同的存储节点每一个分片都有对应的 Leader 和 Follower。 K8s Operator 协调整个集群,安排分片的 Leader 和 Follower,协调 Leader 状态变化、重新选择 Leader、选择 Follower 以及选择分片拉取数据和增量日志,并将分片信息通过 Sharding 方式更新到 K8s etcd。

    K8s Operator 利用 K8s etcd 存储元数据,这些元数据包含分片对应的 Leader 和 Follower 信息。存储节点不需要设计选主功能,只需要用嵌入式数据库负责 K-V 存储。跨节点操作只需要数据复制,K8s Operator 会通知每个存储节点分片的变化,存储节点本身含有元数据信息,客户端连接到存储节点后发现新节点并连接分片 Leader 进行读写。客户端也可以获取 Leader 状态变化并切换到新的 Leader。

具体概念如下:

  • • Client:获取分片分配列表,以便它能够连接到 Leader 特定分片的 Storage Pod。

  • • 存储节点(Storage Pod):分片或是 Leader 或是 Follower,分片被分配给存储节点。存储节点不会相互执行健康检查。

  • • K8S Operator:主要负责执行错误检测并恢复数据,以及跟踪分片状态变更。

  • • CRD:K8s 自定义资源,通过 K8s API 进行操作,并由底层 K8s etcd 实例支持。CRD 负责将状态变更写入 etcd,用户配置变化比如修改节点数量和数据副本数等也可以由 CRD 写入 etcd。CRD 有两个目的:

    • • 集群管理员(Administrator)可以独立于集群更新定制资源。K8s Operator 读取 CRD 更新并在集群上执行。

    • • 操作人员通过 CRD 中的状态存储集群状态信息,如分片被分配给哪些存储节点等。

  • • 管理员:能够通过更新 CRD 与操作员进行交互,如更改存储节点数量和更改复制因子等。

新负载均衡器

此功能正在开发中,它的主要目的是解决负载均衡器透明和可运维性的三个问题:

  • • 目前在 Pulsar 内部,负载均衡器像黑盒子一样,在 Broker 内部运行来拆分和卸载 Bundle,可见性不高,难以解决 owner 分配问题。

  • • 负载均衡器的操作不便利,无法人为影响行为。

  • • 在上报负载数据时,有些节点之间需要同步负载数据,Bundle 负载数据复制通过 ZooKeeper 进行。在一些用户场景内,如果 Topic 较多,对应的 Bundle 也相应数量较多,同步 Bundle 负载数据会占用 ZooKeeper 70% - 80% 的流量。


  • 新的负载均衡器有以下更新:

  • • 停止依赖 ZooKeeper 传输负载数据:Bundle 负载数据仅对 Broker 可见,新机制下无需同步所有 Bundle 数据,只需同步 Top N Bundle 的数据,并存在少数 Broker 负载数据跨 Broker 同步的操作。

  • • 负载均衡器的决策透明化:引入新机制将决策写入 Topic,包含做决策的原因、指标变化、决策是什么等。数据保存在日志中并暴露相关 Metrics。对运维集群的人来说,可以更好地监控 Topic 数据来进行调优。

  • • 均衡集群利用率,减少延迟:在将 Bundle 从 Broker A 迁移到 Broker B 的场景中,重新为 Bundle 分配 Broker 需要先卸载 Broker ,然后利用客户端重连机制触发给 Bundle 选择 Broker 的机制。该过程依赖于客户端重连时间,因此延迟稍长。新机制直接将 Bundle 迁移信息通知客户端,Bundle 直接连接到 Broker B,无需 Lookup 操作。

  • • 覆盖系统决策:Pulsar 内部有很多关于负载均衡的决策,新机制直接将 Bundle 从一个 Broker 迁移到另一个 Broker。通过 API 操作可以快速人工干预达到负载均衡并分配 Bundle 分布。

  • 用户部署新机制后可感知的变化有:

  • • Broker 负载数据通过 System Topic 同步;

  • • Split Bundle 无需通过 Leader,可通过 Owner 操作;

  • • Top N Bundle 负载数据可通过 System Topic 同步;

  • • 所有 Broker 都可以进行 Bundle 分配,无需通过 Leader 做决策,因为负载数据对 Broker 可见,可以应用相同的策略;

  • • Leader Broker 执行 Bundle 卸载:一部分 Broker 负载过高,需要将 Bundle 迁移;

  • • 在卸载 Bundle 时增加 Bundle Transfer 选项,客户端可以更快地连接到 Broker,无需通过尝试的方式定位 Broker。

大规模延迟消息

目前 Pulsar 内延迟消息存在一些问题:

  • • 内存受限:延迟消息索引左侧的时间按照对应 messageid 的时间进行排序,可以以此判断消息过期的先后顺序。当前的索引通常维护在内存中,如果延迟消息数量多,内存开销会很大。虽然可以通过为一个 Topic 建立多个 Partition 并分布在多个 Broker 上来利用多台机器 Broker 内存,但是总内存消耗没有变化。

  • • 延迟索引重建消耗大:在延迟消息数量过大达到上亿量级的情况下,当Topic 迁移到不同 Broker 或者 Broker 宕机时,重建索引需要回放日志。一旦大量 Topic 迁移,在重建索引的过程中消耗资源,客户端和追赶读都会受到影响。


  • 新的延迟消息机制的目标是:

  • • 重新 Load 延迟消息索引时,无需回放历史数据;

  • • 不受限于内存大小,无需把所有的索引维护在内存中,比如半年后才会消费的消息没有必要 Load 到内存中,新机制允许仅 Load 最新的消息。

  • 下图为 Pulsar 社区正在开发的 PIP 示意图。该 PIP 已通过投票,社区正在审核相关 Pull Request。我们的思路是把 Pulsar 内延迟消息索引分为多个 Bucket,Bucket 结构不可变。假设 Topic 有 10 个 Ledger 和 2 个 Bucket,每个 Bucket 负责 5 个 Ledger 的索引,因此每个 Bucket 内都有时间靠前的消息和时间靠后的消息。除非 Ledger 删除,对应的 Bucket 也可删除,否则 Bucket 不可修改。

Bucket 用于 Snapshot,每一个 Bucket 都写入 Bookie 存储节点,负载时通过 Snapshot 恢复 Bucket。这样可以更容易地管理索引,消费延迟消息 Load Bucket 时(索引落入 Bucket 时已进行重排序,时间靠前的消息写入前面的 Entry,时间靠后的消息写入后面的 Entry)只会加载第一个写入 Entry 的索引数据,前面的 Entry 如果未被消费,后面的 Entry 不应该被消费,后面的延迟消息到达时间更晚,因此可以通过 Laod Bucket 内最早部分的 Entry 观察 Topic 哪些最近的消息将要到期。

一个 Bucket 已经被 Load 完才会 Load 下一个的整个过程被称为 Shared Delayed Message Priority Queue,里面包含多个 Bucket 混合后进行排序形成的 Bucket 堆。消费延时消息时,从 Priority Queue 内检查到期的消息。从不同 Bucket 到 Priority Queue 时,如果 Bucket 内第一个 Entry 已消费,接下来的 Entry 会进入 Priority Queue,以此来达到索引增量 Snapshot 并且 Priority Queue 内存可控。当然,随着时间的推移,可能 Bucket 内大部分数据到期,仅尾部未到期,会出现 Bucket 数量过多的情况,那么就会进行 Bucket Merge 操作,将相邻的、拥有处于延迟消息索引的 Bucket 合成一个 Bucket。

通过这样的流程,我们可以利用将延迟消息索引分为 Bucket 的方式做 Snapshot,Load 时仅需要 Load Snapshot 而无需回放历史数据,解决了延迟消息数据落入内存的问题。下面是实现要点:

  • • 延迟索引 Bucket Sanpshot 不可变

  • • 根据延迟时间将每个 Bucket 分为多个分片

  • • 仅 Load 最靠前的分配进入内存,靠前的分片先到期

LakeHouse 集成

这是 Apache Pulsar 周边生态的项目。在很多用户场景中,数据采集进入 Pulsar,Pulsar 将数据发送给 Flink 或 Spark 等进行消费,然后数据写入 Lakehouse。Lakehouse 产品帮助用户将数据存储在云上,降低成本。Hudi 等主要的数据存储格式是列式文件格式(后文简称为“列存”),用户可以只关心一列数据,对分析友好;而行式文件格式(后文简称为“行存”)读取产生有读放大的问题。

我们想做的集成会把整个 Pulsar 以及 Lakehouse 整合在一起,用户在此过程中完全无感,无需关心数据从 Pulsar 到 Hudi 的过程。即使数据写入 Hudi,Pulsar API 也可以进行读取。唯一区别取决于用户数据用行存还是列存的格式存储数据,比如用户希望 Streaming API 性能高,可以在 Bookie 里面增加数据保留时间,这样回放历史数据时性能更好,此时从列存读取历史数据再组成行存会产生性能开销。但是这样的场景比较少见,大部分场景都会对历史数据进行批注,比如搜索过去一个月符合哪些条件的数据有多少,都会使用更高效的列存的方式。

这样形成了分层的状态。Pulsar 内数据是最新的(达到秒级)、正在写的,卸载后列存的数据是分钟级的(大于或十分钟,取决于卸载周期)。如果用 Table API 分析,可以直接从 Hudi 内得到分钟级的实时性。如果我们可以将 Pulsar Topic 和表进行统一,就可以得到一体化的 SDK,已经卸载的数据可以直接从 Hudi 列存读取,开销更低并且适合大批量的数据读取;留在 Pulsar 内还未卸载的数据比较少量,可以通过回放日志直接读取。通过这样的方式可以达到非常低的实时性要求,将行存和列存数据整合在一起,为用户提供统一的视图。存储格式不同对用户查询数据和处理数据的结构没有影响。

在 2022 年,StreamNative 团队用 Connector 的方式将 Pulsar 和 Iceberg 和 Hudi 等集成。数据卸载后更为一体化,Pulsar Topic 可以完全同时存在行存和列存数据。

其他特性

由于篇幅的限制,还有多个重要特性的进展在此不会进行详解。简单为大家介绍一下其他特性:

Shadow Topic

一个 Pulsar Topic/Partition 只能由一个 Broker 提供服务。如果有非常多的订阅,出口带宽可能不够用,虽然可以通过跨地域复制解决问题,但是存储相应翻倍。解决思路是让一个 Topic 有多个 Shadow Topic,将 Shadow Topic 作为不能写入、仅可以连接 Topic 消费的只读视图。Topic 可以继续写入,并且这些写入 Shadow Topic 可见。这样可以将一个 Topic 的多个 Shadow Topic 分散于多个 Broker,每一个 Broker 可以负责一个实录的出口,因此 Topic 可以拥有多倍 Failout 的性能。

Direct IO

在 BookKeeper 内增加 Direct IO 支持。PageCache 无法给 Pulsar 提供太多的帮助,因为 BookKeeper 写的时候有 Write Cache,读的时候有 Readhead Cache,PageCache 的加入增加了合成、缓存和控制等流程,让工作更加复杂。Direct IO 绕过 PageCache 直接操作,可以提升性能。

Metrics 改进

在 Pulsar 内,Merics 相对不统一。因为历史原因,Metrics 系统不断迭代。从最早的版本更新为现在的 Prometheus Open Metrics 格式。Metrics 改进主要有两个方向,一是统一 Metrics 定义,比图如何设计 Metrics、如何使用 Metrics SDK、在 Namespace 级别和 Topic 级别的标准分别是什么;二是应对大量 Topic,支持用户进行分组和过滤等。假定在 100 万 Topic 下,每一个 Broker 需要承载成千上万的 Topic,如果开启 Topic 级别下 Metrics 会产生非常多的 Metrics,每拉取一次会产生几百兆带宽,很难有 Metrcis 系统能经受这样的量级。一秒钟写几 GB 数据并分析,即使有 Metrcis 系统能承受,开销也很可观。灵活支持控制可以尽量减少 Metrics 输出,用户可以在需要排查的时候让粒度变得更细,在不需要的时候可以让粒度更粗。

稳定性提升

我们一直在做 Chaos Test 相关工作,预计 2023 年开源,并向大家公开实验组,分享测试场景与结果。

客户端 Feature Catch Up (C++、Python、Node.js、Go、Rust、C#)

除了 Java 客户端 Feature 比较新,其他语言客户端特性相对落后,很多特性没有被支持。在 2022 年下半年,社区已经在着手更新 C++、Python 和 Node.js 客户端,其中 C++ 客户端面对比较大的用户群体,而 Python 和 Node.js 客户端基于 C++ 客户端。目前 C++客户端已经从 Pulsar GitHub 主仓库分离,可以更好地控制 C++ 客户端的发布周期并进行单独维护。明年社区会开始做 Go、Rust 和 C# 等客户端的 Feature Catch Up。

本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部