Uber 大规模运行 Apache Pinot实践

引言

Uber 有一个复杂的“市场”,由乘客、司机、食客、餐厅等组成。在全球范围内运营该市场需要实时的情报和决策。例如,识别延迟的 Uber Eats 订单或放弃的购物车有助于我们的社区运营团队采取纠正措施。对于日常运营、事件分类和财务情报来说,拥有一个包含不同事件的实时仪表板是至关重要的,这些事件包括消费者需求、司机可用性或城市中发生的行程等等。

在过去的几年里,我们已经建立了一个自主服务平台来支持这样的用例,以及 Uber 不同部门的许多其他用例。该平台的核心构件是 Apache Pinot,这是一个分布式的在线分析处理(OnLine Analytical Processing,OLAP)系统,该系统用于对 TB 级数据执行低延迟的分析查询。在本文中,我们介绍了这一平台的细节,以及它如何融入 Uber 的生态系统。我们重点介绍了 Pinot 在 Uber 内部的演变,以及我们如何从少数用例扩展到多集群,全主动部署,为数百个用例提供支持,以毫秒级的延迟查询 TB 级规模的数据。

用例概述

上图描述了实时分析用例的典型需求。Uber 内部的不同用例可以分为以下几个大类:

  • 仪表板

  • 分析应用程序

  • 近实时探索

仪表板

Uber 的许多工程团队使用 Ponot 为各自的产品构建定制的仪表板。Uber Eats Restaurant Manager (餐厅经理)就是其中的一个例子:

这个仪表板可以让餐厅老板从 Uber Eats 订单中获得有关客户满意度、热门菜单、销售和服务质量分析的信息。Pinot 支持以不同的方式对原始数据进行切片和分片,并支持低延迟查询,从而为餐厅老板带来丰富的体验。

类似地,我们的城市运营团队已经构建了定制的仪表板,利用 Pinot 的实时和历史数据相结合的能力,获取供需、异常事件(例如,最近五分钟内延迟的订单)、实时订单等方面的指标。这是我们日常运营的重要工具,有助于及早发现问题。

分析应用程序

另一类用例源于作为许多后端服务的一部分执行分析查询的需求。这类用例的主要区别要求是数据的新鲜度和查询延迟,他们本质上需要是实时性的。例如,实时识别 Uber 乘客分组的地理热点对于良好的用户体验至关重要。同样,立即识别出司机取消或遗弃的 Uber Eats 购物车,可以快速采取纠正措施(以消息 / 奖励的形式)。

近实时探索

数据探索通常是在传统的批处理和仓库系统(如 Hadoop)上完成的。但是,有许多情况下,用户需要能够对实时数据执行复杂的 SQL 查询。例如,工程师经常需要通过加入微服务记录的各种事件来对事件进行分流。在其他情况下,实时时间可能需要与 Hive 中的批数据集连接。在 Uber 内部,我们在 Apache Pinot 之上提供了一个丰富的(Presto)SQL 接口,以开启对底层实时数据集上的探索。此外,该接口与我们所有的内部商业智能工具(如 Dashbuilder)无缝对接,这对我们所有的客户都非常有用。例如,下面是一张简单的 Sunburst 图表,显示了 Uber Eats 在五分钟内的订单明细,针对特定地区按工作状态分组。这是使用 Dashbuilder 通过在 Pinot 之上运行 Presto 查询在几秒钟之内构建的。

今天,生产中数百个关键业务用例由 Apache Pinot 提供支持。在过去的几年里,我们已经从一个 10 节点的小型集群发展到每个区域数百个节点。由 Pinot 管理的总数据占用空间已经从早期的几十 GB 增长到今天的几十 TB。同样地,每个区域的每秒查询量也增加了 30 倍(今天生产中的每秒查询量高达数千次)。

在下面的章节中,我们将详细介绍我们的平台,讨论 Uber 对 Apache Pinot 做出的独特贡献,并详细阐述在大规模运营该平台的过程中所学到的经验教训。

Uber 的 Pinot 平台

为了服务这样的用例,我们围绕 Apache Pinot 构建了一个自助服务平台,如下图所示:

该架构的不同组件可以分为三个阶段:

  • 获取

  • 存储

  • 查询

获取

这也成为数据准备阶段,负责使数据可供 Pinot 使用。一般来说,Pinot 可从流数据源(例如 Apache Kafka)以及批处理 / 脱机数据源(例如 Apache Hadoop)中获取数据(请参阅 Pinot 文档)。在 Uber 内部中,我们添加了更多的功能,如下所述:

  1. 实时源:数以千计的微服务不断地将它们的日志和事件写入 Apache Kafaka (发布 - 订阅系统)中的主题。Pinot 可以直接使用这些主题中的许多内容,以使数据可用于实时查询。例如,跨不同维度(如时间、位置或产品线)的用户需求指标可以很容易地从用户关注的 Kafka 流中获取的 Pinot 表中计算出来。

FlinkSQL

在某些情况下,我们需要对输入的 Kafaka 主题做一些额外的处理,然后 Pinot 才能处理数据。例如:将输入主题与另一个主题 / 表连接起来,或者对一些列值进行预聚合。

对于这种情况,我们依赖 FlinkSQL 的流处理平台(以前称为 Uber 的 AthenaX ,后来回馈给 Apache Flink 社区)。它提供了一个 SQL 接口,用于表示对输入流(Kafaka)的富处理,该输入流被编译成 Apache Flink 作业,并在我们的 YARN 集群上执行。这样一个 FlinkSQL 作业的输出是另一个 Kafaka 主题,它成为 Pinot 的数据源。下面是一个简单的 FlinkSQL 作业示例,它根据设备操作系统和特定的城市 ID 过滤输入的记录。

这些经过处理的数据现在可以提取到 Pinot 中,以进一步进行切片和分片。

  1. 脱机数据源:在 Uber 的数据生态系统中,大多数重要的 Kafaka 主题也被纳入 HDFS。这成为了 Pinot 历史数据的重要来源。Pinot 提供了一个强大的特性,可以将实时和脱机数据集组合到同一个表中,为用户提供一个逻辑视图(更多有关详细信息,请参阅此处)。出于各种原因,这很有用:

  • 数据更正:在许多情况下,来自 Kafaka 的实时数据可能会丢失或不正确。主题的所有者通常会在稍后的某个时间点更正这个错误,这最终会反映在脱机数据集中。然后,Pinot 可以使用管理好的脱机数据集并覆盖不一致的实时数据,从而提高我们分析的总体准确性。

  • 批量数据加载:在某些情况下,我们需要用至少 3 个月的数据引导 Pinot 表。直接从 Kafaka 那里得到这些将会花费太多的时间。有一种方法可以直接从脱机数据源获取这些数据集,这非常方便。

  • 脱机数据集的低延迟服务:在许多情况下,工程师和数据科学家需要对他们的 ETL 作业的输出执行实时分析查询(位于某个临时的 Hive 表中)。传统的查询引擎(如 Hive 和 Presto)非常强大,但通常不能实现低延迟(亚秒级粒度)。这使得人们转向 Pinot 来导入这些数据集。

  • 复杂处理:在许多与机器学习相关的用例中,我们需要运行复杂的算法来计算模型,而这些算法在 FlinkSQL 中难以表达。在这种情况下,工程师和数据科学家可以编写一个 Spark 作业来计算这些模型,然后将这些数据提取到 Pinot 中以供在线服务。

与实时数据源类似,脱机数据源可以按原样获取,也可以在提取到 Pinot 之前进行预处理。在 Uber 内部,我们依赖另一个名为 Piper(工作流调度系统)的平台获取脱机数据集。与 FlinkSQL 一样,Piper 作业允许用户指定一个 SQL 查询(在本例中为 Hive 查询),用于指定对原始数据所需的处理。在内部,它运行 Spark 作业来运行这个查询,从输出数据创建 Pinot 段(segment)并将其导入到 Pinot 中。如下图所示:

Piper 允许用户以给定的频率(例如,每小时或每天)安排这个作业,这反过来定义了将脱机数据集导入到 Pinot 的频率。

存储

下面是 Apache Pinot 核心存储引擎的放大视图:

这是 Apache Pinot 以对称配置部署在两个不同地理区域的视图。如图所示,每个区域都有完全相同的组件:

  • Pinot 集群

  • Kafaka

  • HDFS

Pinot 集群

每个 Pinot 集群由一个控制器(集群的大脑)、代理(查询处理节点)和服务器(数据节点)组成。Pinot 被设计为从头开始的多租户,它使我们能够将代理和服务器的特定组合分组到一个租户中:一个由特定用例拥有的隔离单元。例如,图中显示的是一个拥有两个租户的 Pinot 集群:Eats 和 Maps。在这个例子中,Maps 租户有两个代理和两个服务器,Maps 数据将均匀分布在这两台服务器上,查询处理将被限制在指定的代理中,从而将其与任何 Eats 的流量隔离开来。

多区域部署

在 Uber 内部,Pinot 表可以配置为:

  • 本地区域:在这种情况下,Pinot 表从本地 Kafaka 和 HDFS 实例中获取数据。换句话说,所有传入的数据都是由属于这个特定区域的服务生成的。这对于与区域本地数据相关的分析用例非常有用。例如,关于 Uber Eats 的实时订单或延迟订单的指标只对特定区域有意义。通常,当数据的新鲜度非常重要时,就会使用这种配置。

  • 全局:这对于对数据的全局视图感兴趣的分析用例非常有用。在这种情况下,数据分别由 Kafaka 和 HDFS 聚合。换句话说,每个区域的每个聚合 Kafaka 主题都将拥有来自各个区域的所有数据:本地 Kafaka 主题(这些数据反过来又被提取到 Pinot 中)。当我们想要深入了解全局范围所有行程(例如,收入、销售或事件等),可以使用这个配置。虽然根据 Kafaka 和 HDFS 聚合数据集方面设置起来比较复杂,但它会自动使 Pinot 处于活动状态。因此,任何一个区域的故障,都不会影响 Pinot 的可用性。

除了核心 Pinot 存储,我们还利用了另外两个组件。

模式服务

这是 Uber 使用的所有模式的集中存储库。在 Uber 内部,Pinot 大量使用这一点作为所有 Kafaka 模式的真相来源。我们添加了一个定制的 Pinot 解码器,用于在获取过程中获取所需的 Kafaka 模式,并生成相应的 Pinot GenericRow 对象,该对象反过来又用于段生成。接下来,我们还计划使用模式服务来管理 Pinot 模式。

段存储

Pinot 具有段存储的概念,用于对其不可变数据段进行归档。对于任何给定的实时或脱机的 Pinot 表,一旦数据段被密封(基于某些标准),它就变为不可变的。然后将该段存档到段存储中,以便在节点或复制失败期间进行恢复。最初的 Pinot 架构依赖于安装在 Pinot 节点上的符合 POSIX 的文件系统(比如 NFS)。通过添加使用任何通用存储系统(例如 HDFS、Ceph 或 S3)作为段存储的功能,我们对这一功能进行了扩展。有关更多详细信息,请参阅以下章节。

查询

目前在 Uber 内部中有两种查询 Pinot 数据的方式。

Pinot REST Proxy

Apache Pinot 有一个称为代理的组件,用于发出 REST 风格的查询。我们在代理上添加了轻量级层,称为 Pinot REST Proxy。这是一个简单的 Restlet 服务,为应用程序查询任何 Pinot 表提供了一种方便的方式。

正如前面提到的,每个 Pinot 表都有与一个租户相关联,该租户有一组唯一的代理。任何客户端应用程序都必须查询其中一个代理才能访问指定表。这就增加了一些复杂性,因为客户端应用程序需要知道其中的不同租户和代理。使用这个 Restlet 服务,客户端应用程序可以通过一些负载均衡器(在我们的例子中是 haproxy)到达任何一个 REST 代理节点。每个 Pinot REST 代理实例本地缓存 Pinot 路由信息(通过 Apache Helix 获得),它使用这些信息来标识租户、标识代理集,并以异步方式将客户端请求路由到其中一个租户。

每个 Pinot REST 代理实例中本地缓存的元数据在各种场景下都很有用。Piper(Spark)作业可以查询 REST 代理来获取表和模式信息,而不是 Pinot 控制器。这样可以减少控制器的负载,并将其与请求峰值隔离开来。

Pinot REST 代理目前正大量使用仪表板和分析应用程序的用例。

Presto

最近,我们在 Presto 和 Pinot 的集成方面做了很多工作,它允许我们的用户使用标准的 PrestoSQL 来查询 Pinot。我们最初将重点放在实时探索用例以及一些分析应用程序上。但是,经过多次优化和多个季度的生产经验之后,我们目前也在上马实时仪表板和应用程序用例。我们的长期计划是,用 Presto 取代 Pinot REST 代理。有关更多详细信息,请参阅下面关于完整的 SQL 支持的部分。

Uber 的贡献

Uber 的 Pinot 团队在提高整体可靠性和查询灵活性方面有四大贡献。

自助加载服务

在 Pinot 平台成立之初,加载新用例是一个非常手工的过程。我们的一位 Pinot 工程师必须与客户坐下来了解需求,提出一个模式、表配置,并估计服务该用例所需的容量。当然,客户开始使用 Pinot 的周转时间可能是三天到一周不等。为了让平台实现自助加载服务,我们在以下几个方面进行了投资。

  • 模式推断

我们添加了从输入 Kafaka 主题或使用 Avro 模式创建的 Parquet 数据文件自动派生 Pinot 模式的能力。在较高级别上,这个实用程序将 Avro 字段转换为 Pinot 列类型,并自动选择其中一个字段作为 timestamp 列。在某些情况下,它还将 Avro 记录展平为各个 Pinot 列类型。这种自动转换适用于超过 80% 的 Kafaka 或 Parquet 输入数据集,节省了大量的手工操作。

  • FlinkSQL 推送

我们与 FlinkSQL 紧密集成,使客户能够将 Pinot 视为“数据接收器”。客户创建一个新的 FlinkSQL 作业,定义一个 SQL 转换查询,定义 Kafaka 主题的输入和输出,然后“push”(推送)到 Pinot。

在本例中,Pinot 模式是从输出 Kafaka 主题推断出来的。一旦 FlinkSQL 作业开始执行,这将在 Pinot 登台环境中自动创建一个表。

  • Piper 推送

类似地,我们添加了从“Hive to Avro Converter” Spark 作业的输出派生 Pinot 模式的功能,并在推送数据之前在模拟 Pinot 集群中自动创建表和模式。

每个此类表都是在一个具有最小配置的登台环境中创建的。这允许用户在几分钟内开始向 Pinot 发出查询,或者构建一个定制的 BI 仪表板。在登台阶段,表会经历几轮迭代,例如模式演变、向相应的列添加专门的索引(例如,星型树、排序或倒排)以及用户查询的验证。登台环境中的内存和磁盘使用情况可以很好地指示生产需求。我们要求每个用例在升级到生产环境之前至少要进行 24 小时的审查。

完全 SQL 支持

如前所述,我们已经将 Pinot 和 Presto 集成在一起,以实现对这些数据集的标准 PrestoSQL 查询。这种组合效果很好,因为我们将 Pinot 的秒级数据新鲜度与 Presto 在执行复杂查询时的灵活性结合起来。此外,谓词下推和聚合函数下推使我们能够实现此类 PrestoSQL 查询的亚秒级查询延迟,这在标准后端(如 HDFS/Hive)上是不可能做到的。请阅读我们之前发表的文章了解这项工作的详细信息:

https://eng.uber.com/engineering-sql-support-on-apache-pinot/

用于 Pinot 段的 HDFS 深度存储

Pinot 最初的实时流获取设计(又名 LLC)要求在 Pinot 控制器上安装一个本地文件系统来存储 Pinot 段。如果 Pinot 服务器已经落后于其副本或在节点故障后正在重建的情况下,它允许服务器从中央存储下载段。最初的设计通过在控制器上安装一个网络文件系统(NFS)来解决这一容量问题。

与 Pinot 的许多其他用户一样,Uber 并没有 NFS,因此不能使用原始的 LLC 设计。为解决这一问题,我们和 LinkedIn 的工程师一起增强了 LLC 协议的分段完成阶段,使其可以与深度存储或 HDFS 或 Amazon S3 之类的外部存储服务一起使用。如今,Uber 的 Pinot 实时获取管道使用 HDFS 作为其深度存储,其中有数百个来自 Pinot 的段。

模式演变

我们的团队发现了 Pinot 模式演变中的一个重要问题。对于实时 Pinot 表(具体地说,从流数据源获取),并不完全支持向现有模式添加新列。尽管较旧的数据段准确地反映了这一点,但新列在最活跃的数据段中并不可见,从而导致查询失败。我们对这一关键问题的解决方法,可以在这里找到:

https://github.com/apache/incubator-pinot/issues/4225

经验教训

当我们在 Uber 内部扩展 Pinot 的用例时,我们学到了很多东西。本节中的许多经验教训来自于解决在操作、部署、内存管理和监控方面遇到的难题。

易操作性

易操作性对于扩大 Pinot 的使用范围至关重要。当更多的用例加入到 Pinot 集群时,我们希望将集群管理的开销降到最低限度。谢天谢地,Pinot 有几个现成的功能,使操作和管理变得更容易。

多租户 Pinot 集群

大规模运行分析系统的核心需求之一是多租户。Pinot 提供了原生的多租户支持,并且在操作中,特别是在减少停机方面显示了巨大的价值。借助一流租户的支持,这些表可以在裸机上组合在一个单一租户名下,并分配给该租户的主机。这提供了强大的隔离性,并避免了嘈杂的邻居问题。当出现问题时,如对服务器进行错误的查询时,我们可以限制对其租户的影响,而不会违反其他租户的 SLA。

易于扩展集群

Pinot 提供了段分配策略,因此段可以在主机之间均匀分配,包括新添加的主机。这大大简化了集群 / 租户扩展工作。我们所需要做的就是提供一个新的 Pinot 服务器主机,并将其添加到所需的租户。它将自动开始获得新的段。

Uber 在开源 Pinot 基础上改进的一个方面是增加了与租户相关的 JMX 指标。在 Uber 这样的规模上,我们的 Pinot 集群有用大约 1000 张表,数百台 Pinot 服务器或代理。这使得 Pinot 管理员很难进行监控;一个典型的场景是,一个表有查询性能问题,但这可能是由同一租户上的其他一些表引起的。使用 Pinot 租户指标,可以将服务器分组为十几个租户,以检查每个租户的资源使用情况和查询性能。它使 Pinot 服务器 / 代理和问题分类变得更加容易。

Pinot 提供的另一个有价值的特性是段存储(这里提到过),如远程 HDFS 集群或云存储。这一特性大大减少了更换服务器节点所需的操作工作量,有助于处理大型计算机池中发生的硬件故障。通过深度存储备份,新增加的主机能够在没有人工干预的情况下即可下载和恢复数据,并在段完全下载后自动为流量提供服务。

内存开销

通常,给定的 Pinot 控制器或服务器可能会经历垃圾回收(Garbage Collection,GC)暂停。根据垃圾回收暂停的严重程度,这可能会导致性能下降或宕机。例如,如果引导者控制器持续经历完整的垃圾回收暂停,它就无法创建新段,从而停止对该集群中的所有表的提取。如果 Pinot 服务器经历了完整的垃圾回收,它可能会导致查询延迟峰值和属于该节点的所有表中的查询结果不一致。以下是这些问题背后的典型原因。

大型扫描

在 Pinot 之上启用 Presto 查询后,我们注意到,错误的查询通常会使 Pinot 服务器不堪重负。例如,如果用户试图在很长的时间范围内(或没有任何时间范围谓词),这将导致 Pinot 服务器耗尽大量内存(我们的设置使用内存中的索引而不是 mmap),最终导致垃圾回收完全暂停。有几种方法可以将此类大型赛秒的影响降至最低:

  • 使用可能影响整个查询延迟的推外(off-heap)索引。

  • 限制 Pinot 查询调度程序配置中的查询并行性。减少这个限制还可以减少相应查询造成的内存压力。当然,这也会影响整体查询吞吐量。

通常,为了避免这类问题,我们将这种特殊查询用例分离到一个单独的租户。

Apache Helix 指标错误

Pinot 控制器是整个 Pinot 集群的控制平面。按照设计,它需要有限的堆空间,因为它不承载或提供数据。当我们的 Pinot 集群变得越来越大,有了更多的表或更多的段时,我们发现,Pinot 控制器的堆使用量大大超过了默认值 4GB。随后发生了主要的 Java 垃圾回收事件,这对整个集群造成了破坏。通过对 Pinot 控制器的堆分析,我们发现这个问题与 Pinot 控制器中使用的 Helix 库保持的直方图指标有关。直方图指标使用默认的 1 小时滑动窗口,这意味着对于繁忙的生产控制器来说,内存中将保留太多的事件数据点。我们对 Apache Helix 提供了一个补丁,以便可以配置滑动窗口长度来减少内存占用。

其他原因

随着各个 Uber 团队使用各种各样的 Pinot 索引和查询模式,我们还看到了内存开销的其他情况:

  • 涉及 Hyperloglog 对象的查询(用于近似的不同计数),对输入大小的变化非常敏感;尽管 Pinot 服务器可以提取这些对象,但由于垃圾回收活动,如果数据量增加 2~5 倍,查询时间性能会下降很多。

  • IN 子句中包含大量(数千个)值的查询也可能导致垃圾回收,因为这些查询在服务器上的多个表段上运行,从而增加了内存使用量。

  • 当查询流量达到正常流量的 2~3 倍或更多(通常是由区域故障转移引起的)时,可能会导致严重的垃圾回收。

段管理

由于 Pinot 集群的规模和整体数据量,到目前为止,我们在段管理方面遇到了一些挑战。

每台服务器的段过多

随着数据规模的不断扩大,我们也经历了由于段过多而导致的一些问题。用于集群管理的 Pinot 利用Apache Helix 而不是 Apache ZooKeeper。例如,当服务器从脱机转换为在线时,Pinot 将通过 Helix 传播状态转换消息以通知其他实例。这种状态转换消息的数量与服务器上的段数量成正比。当一个服务器承载太多的段时,Helix 上可能会出现一个状态转换消息的尖峰,从而导致大量的 Zookeeper 节点。如果 Zookeeper 阶段的数量超过缓冲区阈值,Pinot 服务器和控制器将会崩溃。为了解决这一问题,我们在 Pinot 控制器中添加了消息节流,以消除状态转换浪涌。

热点

我们面临的另一个挑战是,由于段分配策略对热点的潜在影响。默认情况下,Pinot 通过为分配最少的主机分配一个新段来平衡服务器之间的段。因此,在集群扩展的情况下,可以在新添加的服务器中创建最近的段。为环节这一问题,我们在集群扩展后运行表以重新平衡。

将 Pinot 与段存储去耦合

在段深度存储的操作过程中,我们发现当前的 LLC 协议存在两个主要问题:

  1. 深度存储是实时获取流的单点故障

  2. 所有段的上传和下载都通过 Pinot 控制器进行

第一个问题特别严重,因为我们的许多用户希望在数据先限度方面有较高的 SLA(对于第 99 个百分位数,少于 5 分钟)。在 Uber 内部,我们已经见过一些情况,HDFS 由于维护或宕机而无法使用长达一个小时左右的情况。这违反了我们所有重要实时表的 SLA。实际上,HDFS 有自己的 SLA,并且可以独立于 Pinot 发生故障。为解决这个严格的依赖关系问题,我们提出了对 LLC 的重大改进,以便即使深度存储停机长达几个小时,它也可以继续实时获取。在深度存储停机期间,该方案利用对等服务器存储来下载段。该方案已经得到了社区的批准,代码已经完成,目前正在测试中。

结论

总体而言,我们在使用 Apache Pinot 方面的经验非常棒。在 Uber 内部,它已经成为解决大规模实时分析用例的关键技术。高效的内存索引和列压缩有助于降低存储成本。内置的多租户特性以及节点和租户易于维护,运行成本低。此外,围绕着 Pinot 的 Apache 社区非常热情,参与度很高。我们将继续投资 Pinot,并计划在未来的项目中与社区合作,如 Pinot Upserts、联合段存储和查询、智能索引等。

本文分享自微信公众号 - 互联网后端架构(fullstack888)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部