博文推荐|Kafka-on-Pulsar 突破性进展:2.8.0 及更高版本的连续偏移量实现

原创
2022/04/21 19:00
阅读数 2.5K

本文翻译自 StreamNative 博客《Offset Implementation in Kafka-on-Pulsar》,作者 StreamNative 工程师徐昀泽,Apache Pulsar Committer,KoP maintainer。

协议处理器[1] 是 2020 年一月份发布的 Pulsar 2.5.0 所引入的新功能,目的是将 Pulsar 的能力扩展到其他消息领域。默认情况下 Pulsar Broker 仅支持 Pulsar 协议。而通过协议处理器,Pulsar Broker 就可以支持其他消息协议,包括 Kafka、AMQP 以及 MQTT(现已新增 RocketMQ)。这使得 Pulsar 可以与基于其他消息技术的应用进行交互,从而扩展 Pulsar 生态系统。

Kafka-on-Pulsar (KoP)[2] 就是一种协议处理协议,它将原生 Kafka 协议引入了 Pulsar,使得开发人员能够使用现有的 Kafka 应用将数据发布到 Pulsar 或从 Pulsar 读取数据,而无需更改代码。KoP 极大降低了 Kafka 用户使用 Pulsar 的壁垒,这让 KoP 成为最受欢迎的协议处理器之一。

KoP 解析 Kafka 协议,并通过 Pulsar 提供的流式存储抽象接口直接访问 BookKeeper。虽然 Kafka 和 Pulsar 有许多通用的概念,例如主题和分区,但在 Pulsar 中没有对应 Kafka 偏移量的概念。KoP 的早期版本通过一种简单的转换来应对这个问题,但这种转换不支持连续偏移量,同时也容易出现问题。 

为了解决这个痛点,KoP 2.8.0 引入了 Broker Entry Metadata,以实现连续偏移量。这个更新使得 KoP 可用并且生产就绪。需要特别注意的是,这个更新破坏了向后兼容性。本文将深入探讨 KoP 2.8.0 之前和之后分别是如何实现偏移量的,并解释该突破性变化背后的基本原理。

版本兼容性说明

Pulsar 2.6.2 版本之后,KoP 版本即随着相应的 Pulsar 版本而更新。KoP x.y.z.m 版本对应 Pulsar x.y.z 版本,其中 m 是补丁版本号。例如,最新的 KoP 2.8.1.22 版本与 Pulsar 2.8.1 版本兼容。本文中 2.8.0 同时指代 Pulsar 2.8.0 和 KoP 2.8.0。

Kafka 和 Pulsar 的消息标识符

Kafka 偏移量

在 Kafka 中,偏移量是一个 64 位整数,表示消息在特定分区中的位置。Kafka 消费者可以向分区提交偏移量。如果偏移量提交成功,那么消费者重启后就能够从已提交的偏移量位置继续消费。

Kafka 偏移量是连续的,遵守如下约束:

  1. 1. 第一条消息的偏移量为 0。

  2. 2. 如果最后一条消息的偏移量为 N,那么下一条消息的偏移量将会是 N + 1。

Kafka 将消息存储在每个 broker 的文件系统中:

  • • 每个分区切分为分片

  • • 每个分片是一个文件,存储特定偏移量范围内的一组消息

  • • 每个偏移量有一个位置,即消息的起始文件偏移量(文件偏移量是指字符在文件中的位置,而 Kafka 偏移量是消息在一个分区中的索引。)

由于每条消息的头部都记录了消息大小,所以对于给定偏移量,Kafka 可以很容易地找到其分片文件以及位置。

Pulsar 消息 ID

Kafka 将消息存储到每个 Broker 上的文件系统,而 Pulsar 则不同,它使用 BookKeeper 作为其存储系统。在 BookKeeper 中:

  • • 每个日志单元称为一个 Entry

  • • 日志 Entry 流称为 Ledger

  • • 存储 Entry Ledger 的单独的服务器称为 Bookie

Bookie 可以通过 64 位 Ledger ID 和 64 位 Entry ID 找到任何 Entry。Pulsar 可以在一个 Entry 中存储单条消息或一批消息。因此,Pulsar 的消息 ID 由 Ledger ID、Entry ID、 批索引(如果不是批量消息则为 -1)以及分区编号组成,Pulsar 可通过这种消息 ID 找到一条消息。

就像 Kafka 消费者可以提交偏移量来记录消费位置一样,Pulsar 消费者可以确认消息 ID 来记录消费位置。

KoP 如何处理 Kafka 偏移量

KoP 需要如下 Kafka 请求来处理 Kafka 偏移量:

  • • PRODUCE:当 Kafka 生产者生产的消息被持久化之后,KoP 需要告诉 Kafka 生产者第一条消息的偏移量。然而 BookKeeper 客户端只返回一个消息 ID。

  • • FETCH:当 Kafka 消费者想要从指定偏移量开始获取消息时,KoP 需要找到对应的消息 ID 并从 Ledger 中读取相应的 Entry。

  • • LIST_OFFSET:查找最早或最新的可用消息,或者按时间戳查找消息。

我们必须支持计算特定消息的偏移量,或通过给定的偏移量定位消息。

KoP 2.8.0 之前版本如何实现偏移量

实现细节

如前文所述,Kafka 通过 分区编号和偏移量 来定位消息,而 Pulsar 通过 消息 ID 来定位消息。在 Pulsar 2.8.0 之前,KoP 简单地在 Kafka 偏移量和 Pulsar 消息 ID 之间进行一个转换。将 64 位的偏移量映射为 20 位 Ledger ID、32 位 Entry ID 以及 12 位批索引。如下是一个简单的 Java 实现。

    public static long getOffset(long ledgerId, long entryId, int batchIndex) {
        return (ledgerId << (32 + 12) | (entryId << 12)) + batchIndex;
    }

    public static PositionImpl getPosition(long offset) {
        long ledgerId = offset >>> (32 + 12);
        long entryId = (offset & 0x0F_FF_FF_FF_FF_FFL) >>> BATCH_BITS;
        // BookKeeper only needs a ledger id and an entry id to locate an entry
        return new PositionImpl(ledgerId, entryId);
    }

在本文中,我们使用 (ledger id, entry id, batch index) 来表示一个消息 ID。例如,假设一个消息的 ID 是 (10, 0, 0),则转换后的偏移量为 175921860444160。在一些情况下这样的数值能正常工作,因为偏移量是单调递增的。然而当发生 Ledger 翻转,或应用程序想要手动管理偏移量时,就会出现问题。下面详细介绍这种简单转换方法存在的问题。

简单转换存在的问题

转换后的偏移量不连续,这会导致许多严重问题。

例如,假设当前消息 ID 是 (10, 5, 100)。如果发生 Ledger 翻转,则下一条消息的 ID 可能是 (11, 0, 0)。在这种情况下,两条消息的偏移量分别为 175921860464740 和 193514046488576,两者差了 17,592,186,023,836。

KoP 利用 Kafka 的 MemoryRecordBuilder将多条消息合并为一个批量消息。 MemoryRecordBuilder 必须确保批量大小小于 32 位整数的最大值 (4,294,967,296)。在上文示例中,两个连续偏移量的差值远大于 4,294,967,296。这将导致抛出 Maximum offset delta exceeded 异常。

为了避免该异常,在使用 KoP 2.8.0 之前版本时,我们必须配置 maxReadEntriesNum 为 1 (此配置限制 BookKeeper 客户端读取的最大 Entry 条数)。如此一来,每个 FETCH 请求只读取一个 Entry,会显著降低性能。

然而,即使使用 maxReadEntriesNum=1 这种变通方法,这种转换实现在某些场景下也不能正常工作。例如,Kafka 与 Spark 的集成依赖于 Kafka 偏移量的连续性。当消费偏移量为 N 的消息后,Spark 会寻找下一个偏移量 (N + 1)。但是偏移量 N + 1 可能无法转换为有效的消息 ID。

转换方法还存在其他问题。而在 2.8.0 之前版本,没有好办法实现连续偏移量。

自 KoP 2.8.0 版本的连续偏移量实现

实现连续偏移量的解决方案是将偏移量记录到消息的元数据中。然而,偏移量是由 Broker 端在将消息发布到 Bookie 之前决定的,而消息的元数据则是在客户端构建的。为了解决这个问题,我们需要在 Broker 端做一些额外的工作:

  1. 1. 反序列化元数据

  2. 2. 设置元数据的“偏移量”属性

  3. 3. 再次序列化元数据,包括重新计算校验和值

这会导致 Broker 端的 CPU 开销显著增加。

轻量级 Broker Entry 元数据

PIP 70[3] 引入了轻量级 Broker Entry 元数据。它是 BookKeeper Entry 的元数据,并且只在 Broker 内部可见。

默认的消息流如下图所示:

如果配置了 brokerEntryMetadataInterceptors,即配置一组 Broker Entry 元数据拦截器,那么消息流将会是:


可以看到 Broker Entry 元数据存储在 Bookie 上,但对 Pulsar 消费者不可见。

2.9.0 版本之后,可以将 Pulsar 消费者配置为可以读取 Broker Entry 元数据。

每个 Broker Entry 元数据拦截器都在消息元数据前面加上特定的元数据(称之为 “Broker Entry 元数据”)。由于 Broker Entry 元数据和消息元数据是独立的,所以 Broker 无需反序列化消息元数据。此外,BookKeeper 客户端支持发送包含多个 ByteBuf的 Netty CompositeByteBuf,而无需任何复制操作。从 BookKeeper 客户端角度看,只是将一些额外字节发送到套接字缓冲区。因此,额外的开销会很低且可接受。

索引元数据

我们需要配置 AppendIndexMetadataInterceptor (即 索引元数据拦截器) 来支持 Kafka 偏移量。

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

Pulsar Broker 中有个名为 “Managed Ledger” 的组件,它管理分区中的所有 Ledger。索引元数据拦截器维护了一个从 0 开始的索引。Pulsar 使用术语“索引”而不是“偏移量”。

每次将 Entry 写入 Bookie 之前,都会发生如下两件事:

  1. 1. 将索引序列化到 Broker Entry 元数据中。

  2. 2. 将索引自增 Entry 中的消息数目。

之后,每个 Entry 记录第一条消息的索引,相当于 Kafka 中的“基础偏移量”概念。

现在,我们需要保证即使分区的 owner Broker 宕机,索引元数据拦截器也能从某个地方恢复索引。

在某些场景下,Managed Ledger 需要将其元数据存储起来(通常存储到 ZooKeeper)。例如,当一个 Ledger 发生翻转,Managed Ledger 需要将所有 Ledger ID 归档到一个 z-node。这里我们不深入研究元数据的格式,只需要知道在 Managed Ledger 元数据中有一个属性映射。

在将元数据存储到 ZooKeeper (或其他元数据存储) 之前:

  1. 1. 从索引元数据拦截器中检索索引,该索引代表了最新消息的索引。

  2. 2. 向属性映射中添加一条属性,属性名为 “index”,属性值为索引值。

每次初始 Managed Ledger 时,都会从元数据存储中恢复元数据。那时,我们可以将索引元数据拦截器中的索引设置为“index”键关联值。

KoP 如何实现连续偏移量

让我们回顾一下 KoP 如何处理 Kafka 偏移量 一节,看看在如下 Kafka 请求中如何处理偏移量。

  • • PRODUCE

当 KoP 处理 PRODUCE 请求时,它利用 Managed Ledger 将消息写入 Bookie。相关 API 有一个回调可以访问 Entry 数据。

     @Override
     public void addComplete(Position pos, ByteBuf entryData, Object ctx) {

我们只需要从 entryData 中解析出 Broker Entry 元数据,然后检索索引即可。该索引就是返回给 Kafka 生产者的基础偏移量。

  • • FETCH

FETCH 是通过给定偏移量找到消息位置 (Ledger ID 和 Entry ID)。KoP 实现了一个回调,从 Entry 中读取索引并与给定的偏移量进行比较。然后将回调传给 OpFindNewest 类,该类使用二分查找算法来查找 Entry。

二分查找可能要花一些时间。但它仅发生在初始搜索中,除非 Kafka 消费者断开连接。当找到位置后,会创建一个非持久化的游标来记录该位置。随着 fetch 偏移量的增加,游标会移动到更新的位置。

  • • LIST_OFFSET

    • • 最早:获得 Managed Ledger 中的第一个有效位置,然后读取该位置的 Entry,并解析索引。

    • • 最新:从索引元数据拦截器中检索索引,并加一。需要注意的是,Kafka 中的最新偏移量(也被称为 LEO)是下一个将要分配给消息的偏移量,而索引元数据拦截器中的索引则是分配给最新消息的偏移量。

    • • 按时间戳:首先利用 Broker 的基于时间戳的二分查找找到目标 Entry,然后从 Entry 中解析出索引。

从 KoP 2.8.0 之前的版本升级到 2.8.0 或更高版本

KoP 2.8.0 实现的连续偏移量是有折衷的 —— 向后兼容性被破坏[4]。KoP 2.8.0 之前版本存储的偏移量无法被 KoP 2.8.0 或更高版本识别。

如果在此之前你还没有使用过 KoP,需将 Pulsar 升级到 2.8.0 或更高版本后使用相应版本的 KoP。

如果你在此之前已经使用过 2.8.0 之前版本的 KoP,则需要知道从低于 2.8.0 版本到 2.8.0 或更高版本有突破性变化。使用新版本前,你必须删除 __consumer_offsets 主题以及 KoP 之前使用过的所有主题。

KoP 中有一个最新的功能,可以通过启用配置来跳过这些旧消息。这个功能将包含在 2.8.1.23 或更高版本。注意:旧消息仍将无法访问,这个功能只是节省了删除旧主题的工作量。

总结

本文首先解释了 Kafka 偏移量的概念,以及 Pulsar 类似的消息 ID 概念。然后讲了 KoP 在 2.8.0 版本之前是如何实现 Kafka 偏移量的及其带来的相关问题。

为了解决这些问题,Pulsar 2.8.0 引入了 Broker Entry 元数据。基于此特性,通过相应的拦截器实现了索引元数据。之后,KoP 可以利用索引元数据拦截器来实现连续偏移量。

最后,由于这是一个突破性的变化,我们谈了从 2.8.0 之前版本到 2.8.0 或更高版本的升级。强烈建议直接尝试 KoP 2.8.0 或更高版本。

更多资源

译者简介

王中兴,就职于 eBay 消息中间件团队。社区昵称 AlphaWang。

引用链接

[1] 协议处理器: https://github.com/apache/pulsar/wiki/PIP-41%3A-Pluggable-Protocol-Handler
[2] Kafka-on-Pulsar (KoP): https://github.com/streamnative/kop
[3] PIP 70: https://github.com/apache/pulsar/wiki/PIP-70:-Introduce-lightweight-broker-entry-metadata
[4] 向后兼容性被破坏: https://github.com/streamnative/kop/blob/master/docs/upgrade.md



📣 
扫描下方 🤖️ Pulsar Bot 二维码,回复 KoP 加入 KoP 特别兴趣小组(SIG)。


 Apache Pulsar 

本文分享自微信公众号 - ApachePulsar(ApachePulsar)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中

作者的其它热门文章

打赏
0
2 收藏
分享
打赏
0 评论
2 收藏
0
分享
返回顶部
顶部