Apache Pulsar 摆脱 topic metadata 限制的改进

原创
2023/06/09 11:50
阅读数 153

本文整理自 Apache Pulsar PMC member & StreamNative 工程师李鹏辉在大会上的分享《摆脱主题元数据的限制,Apache Pulsar 的无限数据保留改进》。视频链接:https://www.bilibili.com/video/BV1kT411L7PH

本次分享的主题是 Apache Pulsar 如何摆脱限制,支持无限数据保留。本文将从 Topic Metadata 的一些限制出发,探讨摆脱 Topic Metadata 限制的解决方案。

作者介绍

李鹏辉,StreamNative 工程师,也是 Apache Pulsar PMC member。Apache Pulsar 是他第一个亲身参与的开源社区。从 2018 年开始,李鹏辉开始投身 Apache Pulsar 社区,与社区成员一同修复 bug 并参与开发。

进入StreamNative 前,李鹏辉在智联招聘工作。从 2018 年开始,他帮助智联招聘采用 Apache Pulsar;2019 年,智联招聘又将所有在线业务系统迁移到 Apache Pulsar 。2020 年,李鹏辉正式加入 StreamNative,主要负责 messaging 方向上的系统开发。

关于 StreamNative

了解 Apache Pulsar 的同学几乎都听说过这家公司。StreamNative 是一家开源商业化公司,由 Apache Pulsar 创始团队创立,致力于 Apache Pulsar 项目的推广和社区的构建,主要构建 cloud-native event streaming 的平台,帮助企业更容易地处理实时数据。

为什么 Pulsar topic 需要 metadata

先介绍下 Pulsar 相关的基本概念。

图片

第一,为什么 Pulsar topic 需要 metadata?这是因为在 Pulsar 上,计算层和存储层是两个不同的开源项目。Pulsar 的存储层由分布式日志系统 Apache BookKeeper 负责,充当 Topic 存储的服务。Pulsar 的计算层是 broker,它是一个无状态的组件,换句话说,用户可以平滑的扩容以及缩容 broker。

Pulsar 中的 topic 有一个 ownership 的概念,即 topic 的数据并不与某个具体的 broker 进行绑定,一个 topic 可以从 A broker 移动到 B broker 再移动到 C broker。因为 topic 中的消息数据实际上存储在 bookie 节点中,只需要修改 Zookeeper 的元数据,即可把 topic 的 ownership 转移到别的 broker 上。

那么为了让用户能够通过消息 ID 查询到具体的消息数据,就要依赖 topic metadata。Pulsar 中的一个消息 ID 是一个 (ledgerID, entryID) 的二元组,一个 topic 被分为若干逻辑分段(segment),也就是 Bookeeper 中 ledger 的概念。

简单来说,用户从一个 topic 中查询一条消息,需要先在该 topic 中用 ledgerID 查询对应的 ledger,然后再在该 ledger 中使用 entryID 寻找具体的消息数据。所谓的 topic metadata 就是记录者一个 topic 中存储的若干 ledger 的信息。

segment 实际上就是一个 topic 的逻辑分段,这样做的好处是每一个 partition 不会受到单个节点的资源限制,尤其是存储资源的限制。比如单个磁盘,如果在 Kafka 上,假设 broker 有 1TB 的磁盘,那么一个 partition 最多只能承载 1TB 的数据。如果这个 topic 想承载更多数据,那么必须增加 partition,然后把之前 partition 的数据重新分配到其他 partition 上。而有了 segment 这个中间层,一个 topic 可以承载更多的数据。

图片

每一个 topic 都可以有很多个 segment,而每一个 segment 都存储在多个 bookie 上。通常来说,需要几个 replica 数据副本,就会选择几个 bookie。

如果没有 topic metadata 的信息,那么没有办法找到这个 topic 下面所有的数据分段,即无法知道它们分别存储在哪个 bookie,进而无法读取具体的消息数据。所以,这可以解释为什么 Pulsar 的 topic 需要维护这个 metadata。

Kafka 并不需要维护这样的一个 metadata,因为 Kafka 中每一个 topic partition 对应到一个机器上的一个文件夹,segment 对应到该文件夹中的若干文件。而 Pulsar 的 topic partition 中的 segment 分布在不同的 bookie 节点上,需要 metadata 进行记录。

Ledger rollover 详解

图片

我们知道一个 partition 会有很多个 segment。因此,这里涉及到一个概念,即 Ledger rollover 或者 Segments rollover。当然,熟悉 Pulsar 的朋友知道更常见的名称是 Ledger rollover。

对此,解释一下,当数据不断地往一个 topic 里发送时,其实有一些规则。比如当一个 ledger 关闭后,新数据不会再写到这个 ledger 中,而是写到一个新 ledger 里。这样做的目的是因为 Pulsar 在删除数据时,是以 segment 为单位,而非以单条数据为单位进行删除的。

另外,当你切分成多个 segment 时,每个 segment 会被分布在不同的 bookie 上,这样能利用更多的 bookie 存储资源。

想知道 Pulsar 怎样工作的?可以查看 

https://jack-vanlightly.com/blog/2018/10/2/understanding-how-apache-pulsar-works

Metadata 的限制

那么,有了 metadata 后,会有哪些限制?

  1. 1. 如果数据想保存很久,如三年或更长时间,甚至想把数据永远保留,那么这里面要维护的一个 topic 会有很多数据分段,这是 metadata 可能产生限制的地方。如果把数据保存三年以上,segment 的数量可能超过 10 万,也可能达到 100 万。这取决于 rollover 策略设置。当 segment 数量太多时,要维护的 Topic Metadata 也会很多。因为每一个 Topic Metadata 都需要记录每个 segment 对应的 Ledger ID 是什么。有了 Ledger ID才能找到对应的 Ledger,进而确定数据存储在哪几个 bookie 上,这样才能正确地读取到数据。

  2. 2. 如果有很多 topic,比如 10 万个 topic 或者更多,那么这些 topic 也要保留很长时间的数据。整个 Topic Metadata 的大小相对来说会达到一个较大量级,其大小可能超过几百 GB,甚至到达 TB 级别。这可能超出很多 metadata service 能正常工作的数据大小区间。尤其是像 ZookKeeper 可能并不擅长做这样的事情。

Metadata 限制带来的三大问题

如果一个 Topic Metadata 变大之后,以及 topic 很多要保留的数据的时间变长,那么这会带来哪些问题?

问题一,metadata 写放大问题。

如果一个 Topic Metadata 的大小超过几十 MB,那么做 Ledger rollover 时,可能要往 metadata 中新增一个数据分段。这个时候,Pulsar 目前的实现是将整个 metadata 覆盖,即把这几十 MB 数据做一次 update,然后重新再写到 ZK 里。那么,Key 相当于 topic 的名字,它本身是一个 manageredLedger 的名字,与 topic 名字是相对应关系。这样,如果你同时有几十个 topic 或几百个 topic 做 Ledger rollover,可能会让你的 broker 到 ZookKeeper 的带宽成为一个瓶颈,甚至导致 ZookKeeper 本身的写入到达瓶颈。

问题二,metadata service 总的数据大小的限制,当 topic 较多时,容易超过 metadata service 的容量上限。

问题三,在 broker 内部对某个 topic 提供 messaging 服务,其实需要加载这部分 metadata 到内存,因为它需要找到对应的数据位置。当前的实现是把 topic 所有的 metadata 加载到内存中,只要这个 topic 被加载,那么它就需要加载所有 metadata。这时,如果 metadata 大小达到 TB 级别,那么单个 broker 能承载的 topic 数量就会非常有限,因为所需要的内存可能相对比较大。

Topic Metadata format 的定义

message ManagedLedgerInfo {
    message LedgerInfo {
        required int64 ledgerId = 1;
        optional int64 entries = 2;
        optional int64 size = 3;
        optional int64 timestamp = 4;
        optional OffloadContext offloadContext = 5;
    }
    
  repeated LedgerInfo ledgerInfo = 1;
 
     //if present,it signals the managed ledger has been
     //terminated and this was the position of the last
     //committed entry.
     //No more entries can be written
     optional NestedPositionInfo terminatedPosition =2;
     
     repeated KeyValue properties =3 ;
}

如上图是 Topic Metadata format 的具体定义。它在 Pulsar 里叫 ManagedLedgerInfo,因为 ManagedLedger 对应的是 Pulsar 里负责 Pulsar topic 数据写入读取的模块。

前面提到的所有 segment 的元数据,即存储在 LedgerInfo 里。因为它是一个 repeated,相当于是一个 list。每一个 LedgerInfo 中会记录这个数据分段的 Ledger ID 是什么。通过这个 Ledger ID 又能找到正确的数据,并记录一些其他的 metadata,比如这个数据分段有多少个 entry、有多大以及它的时间、offloadContext。

记录这些信息是为了让你读取数据或查找数据时,尤其是回溯消息过程中,它们能提供帮助。比如 size 字段说明这个 Ledger 的大小,如果不满足条件的话,可以直接跳过整个 Ledger。

接下来,详细分析下整个 Topic Metadata 变化的过程。当然,这里面只包含一些非常核心的变化。这样,不仅好分析面临的问题,以及可利用的规则,而且构思有什么样的办法应对问题。

给 topic 增加一个新 segment

图片

当去增加一个新分段时,它有一些特点,即我们永远是在尾部添加。旧的 segment 已经在那里了,永远是在新的 segment。这是可以利用的一个规则。

图片

第二,当去删除一个 segment 时,只会从前往后删除,在 Pulsar 里不存在跳着删除,比如 segment 0 和 segment 1 还没删除时,却要删除 segment 2 和 segment 3,不会有这样的行为。

Data offloading 的概述

图片

第三,这里谈 data offloading,是因为有一个问题,即 data offloading 会导致 Offload Context 的修改。

先简单介绍下 data offloading,Pulsar 里是支持这种二级存储,简单说把冷数据可以存储在更廉价的设备上。Pulsar 可以帮助用户把旧数据迁移到 AWS S3 或 Google 对象存储上。这样的话,存储资源的价格会降低,但是 metadata 还是维护在 Topic Metadata 里面,就是前面看到的 offload context。在 offload 完后,offload 的对象地址是什么、如何连接到对象存储服务上把数据读取回来,这些信息均包含在 context 里。

message ManagedLedgerInfo {
    message LedgerInfo {
        required int64 ledgerId = 1;
        optional int64 entries = 2;
        optional int64 size = 3;
        optional int64 timestamp = 4;
        optional OffloadContext offloadContext = 5
     }
     
   repeated LedgerInfo ledgerInfo = 1;
}

一旦数据被 offload 后,这个 offload contex 就会被修改。因为在一开始,数据没有 offload 的时候,这里是一个缺省状态。一旦数据被 offload 后,那么就需要去修改 offload 的 context。因此,除了之前提到的操作(心得 segment 只能添加在最后,删除操作只能从前往后删除),这里又出现一个更新操作。

图片

当然,这个更新操作也有特点,它从老的 Ledger 到新的 Ledger,即从老的数据分段到新的数据分段去更新 offload 的状态。所以,所有 segment 的删除和修改都是按照从老到新的顺序执行。只有添加 segment会从尾部添加。

图片

这样,可以利用的一个规则是,在一定条件下,有些 Topic Metadata 会变成一个不可变状态。

如果用户已经开启 tired storage 的特性,那么一旦这些数据被 offload 后,就不会再发生变化,这是我们可以利用的特点。

Metadata 限制带来的问题的应对方法

前面讨论的三个问题,一个是写放大的问题,一个是 metadata service 可能要承载很大量级的数据存储的问题,以及 broker 去 load 所有 Topic Metadata 会带来很多 broker 内存负担的问题。

基于前面阐述的 Topic Metadata 的一些特点,以及可以利用的规则,就有一个相对简单的解决方案:将 Topic Metadata 分成多个部分,比如一个 topic 中可能有 10 万个 segment,那么可以把 10 万个 segment 给分开,将 1 万个 segment 做成一个 metadata 的一个分段,这样,一个 topic 会有 10 个 metadata 的分段。

另外,我们可以把这些分段 offload 到 tiered storage 里,避免将这些冷数据对应的 metadata 维护在 ZookKeeper 里面,或者大家也可以用类似 ETCD 或 RocksDB 之类的数据服务,这样的话可以解决上面的几个问题。比如将一个大的 metadata 拆分成几个小部分时,这时你去更新时只需更新最后一个分段,不用管前面的分段。因为随着 Ledger rollover,谁也不会再去修改它们。对 metadata 来说,Ledger rollover 是发生频率相对较高的操作。

第二个,如何避免 metadata service 存储太大量的 metadata 呢?我们可以把这些 metadata 的分段去 offload 到 tiered storage 里面。如此,在 metadata service 里,它只包含那些相对来说比较新的数据。

这样的话会有一个限制条件,即,我们需要用户开启tiered storage 功能,如果没有 enable tiered storage 的话,那么没办法将 metadata 的数据挪到二级存储里,这样可以解决写放大问题,也能把一个 metadata 分成多个部分,但是不好减少总体积,因为没有其他地方可以存储。

拆分 Topic Metadata

图片

接下来,我们看一下具体的拆分过程。如上图,这可能是一个比较简单的示例,因为这个 ledger 的数量或 segment 的数量较少,即我们把这种已变成不可变状态的 metadata 的 ledger,按照数量分成不同的分段。

当然,还含有潜在的没转变成不可变的 segment,由于数据没有被 offload 或还处于 active 的 segment,它们是处于一个可变的状态。那么,可以根据这些特点去分开不同的段,在分开不同的段后,把它按照一段一段的 metadata 用不同的 Znode 存储。例如,上图左下角展示的一个例子,如果这个 partition 已经有好几万个这样的数据分段,那可以把它的这个数据分段分别记录在不同的 Znode。然后,在每一个 Znode 中再详细记录 segment 信息,比如 10000 或 5000 个 segment 的信息。这样,我们就能把一个大的 metadata 拆分成一个个小的分段。

图片

这时,我们再去 load 这个 Topic Metadata 时,不需要把所有 Topic Metadata 全部都 load 到 broker 内存里。因为在 Pulsar 里,每一个 subscription 的内部是可以知道读取的那个 segment 属于哪一个 metadata 的分段。这样,可以选择性的加载。换言之,如果你需要读取它,可以把整个 metadata 中对应的的 segment 加载到内存里。

图片

如果消费者读取到 segment2 的时候,可以把 segment0 和 segment1 逐出,因为它再被利用的概率不会太高。除非有新的订阅进来,并读取这部分数据,或者已有的要回溯到之前的历史数据,这时按需读取即可。

图片

再看没有变成不可变状态的 segment,可以把这些 metadata offload 到 tiered storage 里面,因为它们已经是一个不可变的状态,所以管理起来更简单。

删除 metadata segment

图片

我们再看,整个删除数据时,其实都是按照以 ledger 为单位去删除,即一个数据的 segment 去删除。因为我们的设计目的是尽量以一种不可变的方式去维护,这样,我们去删除一个 segment 时,相当于要修改 S0、S1 在一个 metadata 的一个分段里,这个时候怎么办?可以在那个 Topic Metadata 里维护一个类似 first_ledger 的字段,因为 first_ledger 这个字段其实本身维护在 ZK 里,它是不会被 offload 下去的字段。

如果第一个 segment 被删除,那么把它的 first Ledger ID 往后挪就可以了。

图片

如果第二个 segment 也被删除,那么把 first Ledger ID 指向 segment 2。这时,我们可以把前面的 metadata segment 直接删除掉,因为那两个已经没有用,此时整体删除它即可。

这样,对操作对象存储也会更简单,因为没有 update,就只有写入,直接写入一个对象,再删除一个对象就可以了。

总结

本次分享介绍了在 Pulsar 里面,Topic Metadata 怎样工作的,涉及 tiered storage,数据分段的删除,offload 的过程,以及修改 metadata。大概梳理了目前实现里关于 Topic Metadata 的限制,我们提出的思路是把一个大的 Topic Metadata 分成多个小的分段,然后把每一个 metadata 的分段 offload 到 tiered storage,从而减轻 metadata service 的存储负担。

总结一下,核心思路是尽量把 metadata segment 变成一个不可变的状态,让整个设计以及实现变得更简单。


本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部