腾讯将 Apache Pulsar 应用于Apache InLong 的技术实践

2023/05/26 13:50
阅读数 125

本文由 StreamNative 整理自 ApacheCon Asia 2022 上腾讯工程师陈林的分享《腾讯将 Apache Pulsar 应用于Apache InLong 的技术实践》。本文将介绍什么是 Apache InLong,Apache Pulsar 如何应用于 InLong,腾讯数据平台团队针对 Apache Pulsar 做了哪些优化,并展示在腾讯安全、腾讯视频在集群运营等一线实践场景中遇到的问题及对应的解决方案。

作者简介

陈林,毕业于四川大学,目前就职于腾讯数据平台部,主要工作方向为 MQ,Apache Pulsar 和 Apache BookKeeper 贡献者。

关于 StreamNative

StreamNative(北京原流数据科技发展有限公司),由 Apache Pulsar 与 Apache BookKeeper 创始团队成员组建,致力于打造云原生批流融合数据平台,助力企业挖掘实时数据价值。

Apache InLong 简介

Apache InLong 是腾讯捐献给 Apache 软件基金会的一站式数据集成平台,目前应用于多个知名腾讯产品,如微信支付、腾讯会议、企业微信、腾讯大数据和财付通等。

下图为 InLong 的架构图。最上层为 Manager,它提供了统一的元数据管理、鉴权认证、Open API 和周期调度等能力。下面的五个模块分别为采集、汇聚、Cache、分拣和入库。

采集模块支持 SDK、File、HTTP 和 DB 等多种数据源。汇聚模块采用 DataProxy 来解决链路收敛并防止将缓冲器宕机。Cache 层是消息系统部分,目前 InLong 支持的消息系统有 Pulsar、Kafka 和腾讯自研的 TubeMQ。分拣模块可以直接从 Cache 实时消费离线和在线数据,也可以通过 InLong 自带的 Sort 将数据发送到 Iceberg、 HBase、Hive 和 ClickHouse。

图片

Apache Pulsar 如何应用于 Apache InLong

为何选择 Apache Pulsar

我们通过测试对比 Pulsar、Kafka 和 TubeMQ,发现 Pulsar 对比另两个系统有多个优势:

  • • 低延迟:TubeMQ 和 Pulsar 的系统延迟都非常低,达到 10ms,而 Kafka 延迟为 250ms。

  • • 多副本:Kafka 和 Pulsar 都有多副本功能,TubeMQ 不提供多副本特性。

  • • 高吞吐:TPS(每秒事务数)直接衡量系统的吞吐能力。TPS 越高,系统处理能力越强,系统性能也就越好。TubeMQ 和 Pulsar 的 TPS 可高达到 14W+/s,而 Kafka 为 10W+/s。

  • • 高一致、高可用:Kafka 和 Pulsar 同时具备高可用、高一致的特性,TubeMQ 仅高可用。

  • • 稳定性:Pulsar 和 TubeMQ 稳定性较好,而 Kafka 在 Topic 数量增加时系统不稳定。

  • • 丰富的语言客户端:TubeMQ 支持 Java 和 C++;Pulsar 官方提供多语言的客户端接入能力,支持如 Java、Go、C++、Python 等。



  • 接下来介绍两个 Apache InLong 内的概念,我将通过这两个概念展示 InLong 完成数据流程的示例,以及数据流向中 Pulsar 地位与作用。

  1. 1. Stream:数据流向,Stream 对应 Pulsar Topic。

  2. 2. Group:数据流组,包含多个数据流。Group 对应 Namespace,一个 Namespace 下有多个 Topic,说明 Group 下有多个 Stream。在下图中,Group 1 包含两个 Stream, People Stream 和 Stocks Stream,它们通过 Proxy 分别写入 Pulsar 集群内同属于 group1 Namespace 下的两个 Topic,people 和 stocks。通过 Sort 作业进行消费,两个 Topic 的数据分别消费并写入下游的两张 DB 表。这就是 InLong 中完整的数据流向。

图片

针对不同的应用场景,我们对 Pulsar 进行了对应的优化。

腾讯安全

腾讯安全负责海量业务安全运营及黑灰产对抗,保护上百万的服务节点。腾讯安全面临的挑战有:

  • • 海量数据:日写入消息达到 3000 亿。

  • • 客户端较多:一个订阅组最多拥有 1.6W 客户端,并且消费者使用了多种订阅模式,Shared、Key_Shared 和 Failover 模式。

  • • 客户端频繁重启:客户端部署在 K8s 上,使用了自动扩缩容策略,经常出现客户端自动下线或者扩容的情况。腾讯安全经常上线临时防御策略,会出现批量重启、删减的现象。Pulsar 在此场景中存在的问题与对应的解决方案如下:

  • • Tailing read cache 丢失:首先需要了解 Cache 在 Pulsar 读写流程中的作用。数据写入时,Broker 收到 Producer 请求后将数据放入 pendingQueue,然后写入 BookKeeper,最后放到 Cache。消费数据时,先从 Cache 中获取数据,命中则直接返回,否则从 BookKeeper 读取。

    图片

既然存在 Cache,就有对应的 Cache 驱逐策略。目前 Cache 驱逐条件有两个,一是基于 Set 和时间,二是所有已经被 Consumer 读取的数据会被从 Cache 中驱逐,即以读取最慢的 Consumer 的 Read Position 为准,这个位置前面已推送的数据都会被清除。

图片

因为场景数据量大、客户端多并且经常出现重启,数据推给客户端后还没有进行消费,系统可能出现数据重推。重推的数据会从 Cache 移除,尤其是在业务批量重启、升级时,这些数据产生大量的 Miss Cache 影响集群的稳定。我们的解决方案是新增 MarkDelete 驱逐策略,根据 Consumer 的 ACK 位置而非读取位置进行驱逐。数据重推时,只要 Cache Set 的时间条件不是太小,都会出现 Miss Cache,那么 Tailing Read 时就不会出现 Miss Cache。

  • • 频繁 unload:首先需要了解 Pulsar 如何将 Topic 映射到 Broker,以及 Pulsar 负载均衡机制如何运行。Pulsar 引入 Bundle 概念,Bundle 是哈希值的区间范围,Topic 通过哈希到对应的区间范围上完成 Topic 到 Broker 的映射。这里用到了一致性的哈希算法。0x00000000L - 0x40000000L 被划分为一个区间范围,对应一个 Bundle,上面存在两个 Topic。同样,0x40000000L - 0x80000000L 也是一个区间范围,对应一个 Bundle,存在两个 Topic。

    图片

将 Bundle 分配到 Broker 上默认 Leader 在均衡时让负载低的 Broker 承载 Bundle,实现了从 Bundle 和 Topic 到 Bundle 和 Broker 的映射,最终解决了 Topic 和 Broker 的映射。Pulsar 引入 Bundle 时减少了元数据信息,从而支持引入更多的 Topic。

图片

下图包含 Leader 选举模块、负载计算收集上报模块和负载均衡管理模块。Leader 选举模块主要借助 ZooKeeper 选出 Leader Broker,通过 Leader Broker 进行最终负载均衡。第二个模块会周期性采集、计算和上报 Broker 相应的负载信息到 ZooKeeper。负载信息包括 CPU 使用率、内存(堆内内存、堆外内存)使用率、网卡(入网卡、出网卡)使用率、Bundle 流量和 QPS 等信息,最终会被写入 ZooKeeper。

负载均衡管理模块在初始化时会 Watch 对应的 ZooKeeper 目录,一旦负载进入 ZooKeeper 就可以通过回调获取所有 Broker 负载信息,然后通过配置的负载均衡算法判断是否进行负载均衡操作。负载均衡算法的目的是选择负载高的 Bundle 进行卸载,此过程中 Bundle 上 Topic 客户端会主动关闭连接并重连。重连后 Leader 会对 Bundle 进行重分配,这样就完成了 Bundle 从卸载到分配的负载均衡过程。Bundle 会被分配到哪些 Broker 上取决于使用的分配策略,默认策略是选择负载最低的 Broker。

图片

Bundle Split 策略解决的问题是,当某一个 Bundle 流量比较大或该 Bundle 是热点 Bundle 时,无论分配到哪个 Broker 上都会产生大的负载,可以将大 Bundle 拆分为若干个小 Bundle,方便负载均衡时达到更细的粒度。

当发现 Pulsar 不停地进行负载均衡,即 Unload Bundle 时,会对 Bundle 上所有 Topic 关闭连接。频繁 Unload 会频繁导致客户端的断连与重连,最终会影响客户端读写。我们可以在下图看到对应的 Unload 日志。

图片

我们在日志中找到一个 Broker,它的资源使用率无穷大,超出平均资源使用率,卸载的流量也是无穷大的值。我们可以看出 Broker 的负载信息有误,因为不可能存在无穷大值的负载信息,因此怀疑有些 Broker 负载信息计算出现了问题,并且将错误的信息上报给 ZooKeeper,导致在负载均衡过程中,错误地判断该 Broker 负载过高,从而对其进行流量卸载。

通过监控查看每个 Broker 上报的负载值,我们发现有一个 Broker 上报负载载一段时间内无穷大,此时间内恰恰出现了 Broker 负 GC。我们来看一下负载计算方式,usage=100 * (usage - lastCpuUsage) / elapsedTimeSeconds。该计算公式用采集当前的 CPU 使用情况减去上一次 CPU 使用情况,并除以两次计算的时间间隔(以秒为单位),得到 CPU 资源使用率。这样的计算是周期性的,且速率固定。Broker 负 GC 与固定速率的计算会导致两次计算间隔为 0(时间间隔小于 1 秒),相除后得到无穷大的值。

解决方案很简单,在 Broker 周期计算资源复杂时,我们使用固定周期间隔 scheduleWithfixedDelay 替代 scheduleAtFixedRate,这样即使 Broker 负 GC 也会按照一定的周期间隔完整运行,不会出现两次间隔非常短的情况。

  • • OOM:我们通过抓取线程拿到了队列层。通过队列层我们可以看到一个变量叫做 pendingAcks,其数值大于 9GB。

    图片

    pendingAcks 变量的含义是,当客户端使用 Shared 或者 Key_Shared 模式消费时,消费者对消息进行单条确认,即 Ack 每一条消息,然后发送请求到服务端表示消息已消费。Broker 在把消息推送给消费者之前会把消息的标志信息记录到结构,表示等待消息的 Ack 确认。收到客户端消息确认后,Broker 会将信息从结构中移除。理想状态下,如果推送和确认都很及时,变量体量不会很大。然而,即便即时 Ack 仍然会出现 pendingAcks 大于 9GB 的情况。

这个结构使用的是 Pulsar 自定义 Map 结构,底层用数组实现,数组就存在扩容的问题。向 Map 结构增加数组元素时,使用率如果超过 75% 数组就会进行翻倍扩容;移除数组元素时,删除元素对应位置的使用标志位。由于时间上存在问题,移除数组元素时没有删除其对应位置的使用标志位,因此在增加元素时,系统误判使用率超过 75% 、需要扩容,然而实际上 Map 中可能只存在一两个元素。

我们的解决方案是在 Map 底层移除数组元素清理使用标志位,并增加 Map 自动缩容能力。当 Map 使用率超过 15%(默认值)就会对其进行缩容,这样可以避免扩容过大导致内存使用过多的情况。上线了解决方案后,我们的系统结构占用最低可达到小于 10 兆。

腾讯视频

腾讯视频面临的问题如下:

  • • 订阅组数量多:2,000+ 订阅组

  • • 客户端经常重启、回退消费

    图片

Pulsar 在此场景中存在的问题与对应的解决方案如下:

  1. 1. Ack 持久化,BookKeeper 写放大:此场景中只有十几个节点,数据写入 QPS 不高,在 5k/s - 8k/s 之间,然而 BookKeeper 写入 QPS 达到 7w/s,远远大于整个集群的写入。我们发现大部分 7w/s 是 Ack 持久化的数据。BookKeeper 写入 QPS 过高产生的问题有生产者处理耗时,超过 20s,偶尔出现生产者客户端超时的情况。原因是生产者客户端对每一条消息进行 Ack,并且出口流量大,Ack 请求多,写入 BookKeeper 的请求对应很多,导致 BookKeeper 出现瓶颈。我们在服务端调整了两个配置。一是限制 Ack 持久化一次写入 BookKeeper 消息的大小,二是限制写入 BookKeeper 的频率。

图片

managedLedgerDefaultMarkDeleteTateLimit=0.01,代表一秒钟写入 BookKeeper 0.01 次,即 100 秒钟会持久化写入 BookKeeper 一次,大大降低了写入 BookKeeper 的速率。原来的速率为 25w,经调整后降至 700,对应的写入时延从 20s+ 降至 200ms。

  1. 1. 负载不均衡:我们在前文介绍负载均衡时提到,如果一个 Bundle 承载的流量过多,就需要将其拆分,否则无论 Bundle 被分配到哪个 Broker 都会导致负载过高。目前 Bundle Split 算法有两种,一是基于哈希的均分方法,二是基于 Topic 数量的均分方法,但是两种方法都不够灵活。

    图片

上图为一个 Bundle,它的范围是 0x00000000L --  0x40000000L。如果我们使用哈希的均分方法,就要从 0x30000000 和 0x20000000 的位置进行拆分。但是 T13 和 T11 两个热点 Topic 哈希值分别为 0x32000000 和 0x25000000,如果按照哈希值均分的方法,这两个热点 Topic 会被分配到同一个 Bundle 上;如果按照 Topic 数量进行拆分,会在 T8 处均分 Topic 数量,两个热点 Topic 依然会被分配到同一个 Bundle 上。两个方法都无法解决问题。

我们认为 Pulsar 需要提供能按照指定位置拆分 Bundle 的能力。首先需要通过 Topic 获取哈希位置,其次 Pulsar 需要提供接口支持指定哈希位置进行拆分。我们可以通过监控找到热点 Topic,然后通过接口获取哈希位置,并进行指定拆分,这样可以把两个热点 Topic 拆分到两个 Bundle 上。

图片

此方法虽然非常灵活,但是需要人工介入。上图展示了拆分的过程。T11 的值为 0x25000000,落在 0x00000000L --  0x30000000L 之间,我们可以指定从 0x30000000L 进行拆分。

集群运营

目前我们使用 Pulsar Manager 对集群进行管理。为了避免重启服务来修改配置,我们对 Pulsar 的多个配置进行动态化。在 BookKeeper 升级过程中,我们遇到了一个问题——如果轮动重启 BookKeeper 节点会导致流量逐渐倾斜到少量 BookKeeper 节点,在升级过程中业务会受到影响,集群不稳定。

我们先看一下 Pulsar 在调用 BookKeeper 客户端时在其内的操作。BookKeeper 客户端内部会统计每个 BookKeeper 读写失败的次数,一旦达到阈值,就会对其进行隔离,在 30 分钟后重新放开。隔离的意义在于隔离后,不会再选择该 BookKeeper 进行读写,降低 BookKeeper 故障带来的影响,及时进行恢复。BookKeeper 在轮动升级过程中,也会重启节点,此时不应对 BookKeeper 进行隔离。重启即隔离会导致很多流量流向少量节点,整个集群不稳定。

图片

我们的解决方案是在 BookKeeper 客户端内部添加一个功能,首先通过 BookKeeper 客户端命令向 ZooKeeper 设置标志位,BookKeeper 客户端如果内部判断标志位存在,就会判断为升级过程,不对其进行隔离,在升级完成后删除标志位。这样可以保证升级过程足够平滑。

未来规划

前面介绍了 Apache Pulsar 在 Apache InLong 的实践。Apache InLong 是一站式数据集成平台,在对比 Kafka、Pulsar 和自研 TubeMQ 后,InLong 选择 Apache Pulsar 作为消息系统部分实时消费数据。文章介绍了多个场景下,包括助力腾讯安全处理 3,000 亿日写入消息、在腾讯视频管理 2,000+ 订阅组以及集群运营的 Pulsar 调优实践。

我们目前以及未来的工作会聚焦于几个方向:

  • • 负载均衡:我们目前主要在做两件事情,因为使用同构网络环境,所以团队在验证线上完全基于流量的负载均衡算法;我们也正在开发基于流量的 Bundle Split 算法,社区正在积极讨论中。

  • • 读写分离:虽然 BookKeeper 的架构读写分离,但是在线上场景中我们依然会发现读影响写的现象,初步判定发生在 Broker 端,因为 Broker 端会发生读写交叉的逻辑。我们后续的工作会分离这些读写逻辑。

  • • GC 内存优化:我们在线上场景中发现,当业务脱离历史数据时,Broker 会出现频繁 Young GC,时延受到影响。后续我们也会解决这个问题。


本文分享自微信公众号 - ApachePulsar(ApachePulsar)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部