技术探究|基于 Apache Pulsar 构建实时数据仓库

原创
2023/02/02 18:00
阅读数 61

文章摘要

本文整理自 ApacheCon 2022 上 StreamNative 工程师杨子棵、刘德志的分享《Build One Real-time WareHouse Based on Apache Pulsar》。本文将介绍数仓架构的演进与挑战、Apache Pulsar 架构以及 Pulsar 作为云原生消息队列能否满足实时数仓的要求,然后分享基于 Apache Pulsar、Apache Flink 和数据湖构建实时数仓的方案,以及基于 Snowflake Sink Connector for Pulsar 构建实时数仓的解决方案。

作者简介

  • • 刘德志,StreamNative 解决方案专家,Apache Pulsar Committer。前腾讯计费平台技术专家。曾任职腾讯,负责腾讯计费平台架构与技术方案实施,并主导了腾讯云 TDMQ for Pulsar 产品落地实施,拥有丰富的消息中间件开发与运维经验。

  • • 杨子棵,StreamNative 软件工程师,Apache Pulsar Committer,主要工作围绕 Metrics 特性开发。

数仓架构的演进

相信大家对数仓这个词并不陌生。从数据诞生开始就有数仓的存在,业务层会通过数据状态的变更来记录业务流程状态,以更好地了解业务的指标,比如鉴别 DI 报表就需要一套数仓架构来满足分析的场景。根据数据量级的规模,可以将数仓架构分为两种阶段——基于传统数据库实现的数仓和基于大数据框架实现的数仓。

基于数据库的数仓在数据处理上有一定的限制,因为以前的数据量级比较小,数据库可能是单机模式,数据库本身可以处理数据的量级就存在限制。随着移动互联网的到来,数据爆炸性增长,诞生了 Hadoop 生态,即大数据解决方案。HDFS 能够低成本地存储海量数据,Hive 可以通过 SQL 的方式处理这些数据,企业的解决方案逐步地从数据库迁移到大数据架构上,解决了传统数据库无法对海量数据进行有效分析的痛点。

然而,以前的方式处理数据十分迅速,分秒内就可以计算出相应的指标;而基于 Hadoop 的解决方案更多的在离线模式下处理数据,得当结果的时间往往超出分钟级别,达到小时甚至天级别。为了减少计算时间,诞生了很多快速计算的引擎,比如 Spark。可是 Spark 等引擎仍然解决不了一个问题,比如用户想观察最近 5 分钟数据的需求在离线架构中就很难实现,这是典型的 t+a 的模式,按天或按小时才能计算数据。为了实时计算指标,Lambda 架构诞生了。

Lambda 架构分为两种。一种是基于消息队列或者流的计算框架,例如常见用于实时计算场景的 Kafka + Spark Streaming/Flink,实时上报业务数据到消息队列并基于小批次的时间窗口进行指标统计。另一种是基于离线 ETL 的架构,使用 Hive 或 Spark 处理数据,数据一般存储在HDFS 等系统上,日志定期入库经过一系列 ETL 处理得到指标数据。为什么会有两套 Lambda 架构?因为在实时处理中很难实现离线场景的计算要求,所以现在大家也在探索一种新的架构 —— Kappa 架构,尝试只使用 Lambda 架构中的实时路径来同时满足实时和离线的两个场景。Kappa 架构使用一套架构处理在线数据和离线数据,同一套引擎同时处理在线和离线数据,并将数据存储在消息队列上。

以上两种架构构建实时数仓的挑战是什么?我们下面将详细介绍。

构建实时数仓的挑战

Lambda 架构会面临以下挑战:

  • • 运维成本高:需要更多的运维人员来同时维护实时平台和离线平台两套引擎,熟悉非常多组件的成本也很高;

  • • 开发成本高:需要实时、离线维护两套框架不同的代码,给开发人员带来很大的挑战;

  • • 容易造成数据不一致:数据有两条链路,造成数据不一致的概率会增大。

Kappa 架构会面临以下挑战:

  • • 消息的回收能力较弱:目前大部分架构基于 Kafka 进行数据存储,而 Kafka 很难提供无限存储能力;

  • • 无法 OLAP 引擎直接分析数仓中间层的数据;

  • • 数据的时序性可能导致实时计算的结果不准确。

总结归纳一下,实时数仓存在的挑战有以下几点:

  • • 数据集成批流一体:离线与实时数据能否适应统一的数据采集的方式,如统一通过 CDC 或 OGG 将数据实时捕获到 Kafka,批与流再从 Kafka 中的消费数据载入到明细层;

  • • 存储层批流一体:离线与实时数据是否统一分层、统一存储。实时数据经过 ETL 梳理后,根据统一分层 ODS、DMD 和 DMS 等持久化到同一个数据存储中;

  • • ETL 逻辑批流一体:通常情况下数仓的使用者更希望通过 SQL 语句来处理数据,批流一体能否提供统一的 SQL 语法或者统一的 ETL 组件,再通过底层分别适配流和批的计算引擎;

  • • ETL 计算引擎批流一体:流和批使用同一套计算引擎才能从根本上避免同一个处理逻辑面对流批需要两套代码的问题。 我们希望一套架构可以同时满足实时和离线的场景,因此无论在 Lambda 架构还是 Kappa 架构里,都需要一个强大的消息队列。该消息队列的能力决定了实时数仓覆盖的业务场景。

实时数仓对消息队列的要求

  1. 1. 基础能力:高吞吐、高可用、高可靠。这也是消息队列最基础的能力。面对海量数据的写入如何读取,必须有高吞吐的能力。实时数仓在业务中的地位越来越重要,系统的高可用和高可靠必须得到保证。

  2. 2. 存储能力:无限流存储并且支持海量 Topic。一个 Topic 就代表一张实时或离线的表的数据,而表的个数会随着业务的快速增长而增长,因此需要具备支持海量 Topic 的能力来支撑业务发展。还需要具备 Schema 管理能力,在上报数据时需要定义好 Topic 的 Schema 并进行一定约束,这样可以从源头对数据格式进行治理。

  3. 3. 生态能力:作为统一的数据上报入口,需要具备完整的生态,包括更多协议的接入、周边存储系统以及数据的导出与导入,比如数据库中的业务数据和计算后的结果数据可以导入 Elastic Search 或者 HBase;其次,需要与周边大数据框架进行深度结合,比如与 Flink、OLAP 引擎等结合。

Apache Pulsar 是否符合构建实时数仓的要求?

  1. 1. 基础能力:Pulsar 满足如 RocketMQ 的在线业务场景,有丰富的业务属性,比如死信队列、重试队列、延迟消息以及高一致、高可靠;也能够满足高吞吐的 Kafka 场景,具备异步、批量、压缩等高吞吐模式,并且比 Kafka 有更稳定的性能和更高延迟的能力。

根据最新 Kafka 与 Pulsar 性能对比测试报告可以看出,Pulsar 可以实现的最大吞吐量是 Kafka 的 2.5 倍;Pulsar 提供稳定的消息发送延迟,比 Kafka 的 P99.99 (ms) 延迟低 100 倍;Pulsar 的历史读取率是 Kafka 的 1.5 倍,Pulsar 消费者能够比 Kafka 消费者快约 2.5 倍耗尽 Backlog,而不会影响相关生产者的性能。其中历史读取非常关键,在批读取通常需要回溯更久的历史数据,Pulsar 相比 Kafka 有更高的吞吐和更稳定的表现。

  1. 2. 企业级能力:Pulsar 原生支持多租户,并且支持多种协议的接入,如 Kafka-on-Pulsar、AMQP-on- Pulsar、MQTT-on-Pulsar、RocketMQ-on-Pulsar 可以直接将其他消息系统客户端接入 Pulsar;Pulsar 也支持企业级认证体系,可以满足不同的系统服务对安全的要求;Pulsar 提供统一的消费模型,满足不同的业务对消息订阅场景的需求。这些能力方便用户构建统一的消息平台。

  2. 3. 金融级特性:Pulsar 具有存储计算分离、高一致、高可靠、跨地域复制、可以快速扩缩容等金融级特性。

  3. 4. 存储能力:Apache Pulsar 的存储基于企业级流平台 Apache BookKeeper 来实现存储,分片存储为统一的数据存储提供了很好的基础,例如 Topic Partition 的数据会写入分片,意味着已经关闭的分片不允许修改,可以很方便地将分片卸载到二级存储,如成本更低的分布式存储系统如 HDFS 和 S3 中,在数据存储层可以将实时数据和离线数据统一存储。

在数据处理层,Pulsar 提供了统一的对外接口,用户不需要担心数据存储在 BookKeeper 还是二级存储中,这些对用户是透明的。在读取二级存储时可以同时读取多个分片来加快读取速度。同时,Pulsar 内置 Schema 注册能力,支持 Topic 级别消息验证和 Schema 版本更新,可以方便地从 Flink SQL 或 Presto 像操作表一样对 Topic 数据进行分析统计等操作。

  1. 5. 生态能力:生态能力分为 Message、计算和存储三个方面。在 Message 方面,Pulsar 支持主流的多语言客户端, 如 Go、C++、Java、Node.js、Websockets 和 Python。其次,Pulsar 在协议层做了很多适配来帮助用户迁移历史业务,比如 Kafka-on-Pulsar 可以帮助用户将 Kafka 的业务迁移至 Pulsar。

Pulsar 也提供丰富的与周边存储系统的对接,例如 Pulsar IO、Elasticsearch 的读取和写入、HBase 的读取和写入等。在非常重要的 CDC 场景中,Pulsar 可以捕获数据库中变化的数据,目前支持主流数据库 Oracle、MySQL 和 MongoDB 等。

在计算方向,Pulsar 与周边大数据计算框架进行对接,比如 Presto 可以直接读取 Pulsar 数据,在流和批的计算框架中支持 Flink 和 Spark 等。Pulsar 本身也提供了轻量级函数计算框架 Pulsar Functions,可用于简单的 ETL,特别适合 IoT 边缘计算场景。

  1. 6. 分层存储:Pulsar 的历史数据可以卸载到二级存储,比如常见的 S3、HDFS 以及公用云上提供的分布式存储系统,而不必担心数据一致性的问题。 鉴于 Pulsar 以上的能力,我们可以基于 Pulsar 和 Flink 构建实时数仓。

基于 Apache Pulsar 和 Flink 构建实时数仓

Apache Pulsar 和 Apache Flink 构建实时数仓是否满足前文提到的几点挑战?

  • • 数据集成、流批一体、离线与实时数据采集方式统一:Pulsar 可以满足这几点需求。所有元数据上报给 Pulsar,Pulsar 拥有业务最前的数据。

  • • 存储层流批一体,离线与实时数据统一分层存储:Pulsar 的分层存储能力提供了无限存储模式,海量 Topic 可以支撑复杂的业务表逻辑。

  • • ETL 逻辑流批一体,流与批的处理使用统一的 SQL:Flink SQL 提供了很好的基础,结合 Pulsar Flink SQL Connector 可以采用统一的 SQL 编写业务逻辑。

  • • ETL 计算引擎流批一体,流与批使用同一套计算引擎:Flink 支持流批一体模式。 基于 Pulsar 和 Flink 构建实时数仓是可行的方案。下图示例采用了 Pulsar + Flink 在实时 ETL、数据指标计算和数据模型中的 A/B Test 场景中构建实时数仓平台。

值得参考的是,借助 Flink 与 Pulsar,BIGO 打造实时消息处理系统支持下游业务场景的实时数仓、实时 ETL、实时数据分析与实时推荐等。Pulsar Flink Connector 已正式发布,欢迎使用[1]

Pulsar 与数据湖打造实时数仓

在上面的方案中,数据可以卸载到分层存储中。但是这个方案存在一个通用的问题,卸载数据的格式来自于 Pulsar 的体系,只有 Pulsar 才能对数据进行正常的读取。Apache Pulsar 社区正在致力于将卸载的数据按照数据湖的格式进行卸载,如 Hudi 和 Iceberg 等流行的数据湖格式,数据卸载后的格式更加通用访问。这样的方式打通了 Pulsar 和数据湖的生态,为下游对接的数据计算框架、分析框架或者 OLAP 引擎都能提供更好的数据对接方式,充分利用数据湖生态扩展系统的能力。

这个方案采用了实时卸载模式,在之前的模式中,我们只会在分片关闭后进行卸载,而现在我们可以对正在写的分片进行卸载,提供了更及时的数据卸载能力。将 Pulsar 结合数据湖生态是更加优雅的实时数仓解决方案。

基于 Snowflake Sink Connector for Pulsar 构建实时数仓

Snowflake 架构

Snowflake 是基于云计算的数据仓库,提供多种数据存储和计算的功能。我们可以把 Snowflake 看成三层的数据存储结构。最底层是存储层,可以将不同格式的数据(包括结构化和半结构化的数据)存储在云平台、云供应商或者数据中心上,如 Google Cloud、AWS 等。同时底层提供了数据安全性和一致性保障。中间层为虚拟数仓层,可以很方便地根据用户需求构建虚拟数仓,提供了动态扩缩容和按需计费等功能。最顶层为服务层,Snowflake 提供了多种用于数据分析的服务,包括安全服务、优化器和数据湖等。

Snowflake 是一种多租户、事务性、安全、高度可扩展的弹性系统,具备完整半结构化和 Schema-less 数据模式支持。Snowflake 还提供了很多关键特性,比如数据安全保障、数据的治理加密和 SQL 语句运行的支持,并且提供了方便用户操作的格式化界面和工具。它也具备各种模式的生态,方便用户将来自不同的中间件数据库的工具与 Snowflake 对接。底层支持数据的复制和容错,保障数据存储的安全性。

Snowflake Sink Connector for Pulsar 原理

Snowflake Sink Connector for Pulsar 是由 StreamNative 提供的用于将用户存储在 Pulsar 中的数据实时导入到 Snowflake 数据仓库的 Pulsar Connector。它所做的事情很简单,即将 Pulsar Topic 中的数据转移到对应 Snowflake Table 中,每一个 Topic 对应一个 Table。但是转移的过程并不简单,Snowflake Sink Connector 采用了许多方案来保证数据转移的过程中不会产生数据丢失或数据重复,同时保障数据传输的性能。

在 Pulsar 中,一个分区 Topic 包含多个 Partition,每个 Partition 的数据处理独立分开。Snowflake Connector 使用了 Snowflake Internal Stage 功能,在将 Pulsar 导入到 Snowflake 过程中首先将数据导入 Internal Stage 进行聚合,再调用 Snowflake Pipeline 功能,每个 Pipe 都对应一个 Partition 的数据,经过 Pipe 并行分布的处理将 Internal Stage 中的数据文件通过一连串的复制导入 Snowflake 中。每个 Pipe 都可以进行并行处理来保证数据安全性。

Pulsar Topic 和 Snowflake Table 映射关系

Pulsar Topic 和 Snowflake Table 的映射关系也是使用 Snowflake Sink Connector 需要注意的地方。在使用 Connector 的过程中,用户可以通过自定义 Table 模式来指定 Connector 将 Pulsar 中的数据转移到 Table 中。但是自定义 Table 需要符合相应规范,包括 Snowflake Table Name 规范、保证 Table 格式兼容 Connector、需要定义两列数据项(Metadata 和 Content)等对应关系。

Snowflake Sink Connector 还支持自动映射 Table 关系。下面是主要的三条自动映射规则:

  1. 1. 将 Pulsar Topic 名称中所有小写字母转换为大写字母。

  2. 2. 如果首个字符不以字母开头,Connector 在转换过程中会在名称前加下划线。

  3. 3. 对 Table Name 进行限制,不能包含 Snowflake 规定的非法字符,Connector 在转换过程中会自动将非法字符转换为下划线。

Schema 支持

Snowflake Sink Connector 提供了多种 Schema 支持。Pulsar 的消息数据和 Snowflake 数据存储的格式不同,Pulsar 的消息可以使用多种 Schema,如 Avro schema、Json schema 和 KeyValue schema。每条消息分为两个部分,一部分是元数据,用来存储消息处理过程中用户赋予元数据的信息,如 Event Time、Key 和 Properties 等。另一部分是消息体 Payload,存储消息具体的数据内容,包含多种 Schema 格式(Avro schema、Json schema 和 KeyValue schema),同时在 Snowflake Sink Connector 进行数据转移传输的过程中可以获取 messageid 信息。用户在使用 Connector 时可以指定元数据将哪些字段落入 Snowflake Table,如 Event Time、Key、Properties 和 messageid 等,其中 messageid 会直接复制到 Snowflake Table 元数据列中。

Snowflake Table 主要分为两列,元数据和 Content,都是半结构化数据存储。Snowflake 已经对半结构化数据存储进行了优化,不会对性能造成影响。用户在访问半结构化数据时,其性能和访问结构化数据近似,所以 Snowflake Sink Connector 采用半结构化数据存储,好处是转换 Schema 的过程更方便。创建 Snowflake Table 的 SQL 语句如下:

create table if not exists MYTABLE (metadata variant, content variant)

Schema 转换

Pulsar Message 支持多种 Schema,但是在 Pulsar 将数据转移到 Snowflake 时会将其转换为 Generic Record 形式。Generic Record 是带有 Schema 信息的 Record,Snowflake Connector 读取信息时读取的即是一条条 Generic Record,通过一层 Schema convert 转换,不论是哪种 Schema 格式,在落入 Snowflake Table 前都会将 Generic Record 转换为 Json 数据文件。

Json 文件主要分为两个字段,metadata 和 content,对应 Snowflake Table 的两个列,两个字段中的数据都是半结构化数据。其中 metadata 包含元数据字段如 _message_id_ 和_event_time_ 等,content 中的数据是 Json 转换后的结果。如上图所示,用户使用 Avro Schema 定义多种字段进行转换,其中 IDPSIdealership 和 etc 是用户在 Pulsar Message 定义的 Avro 字段。

数据转换到 Snowlake table 后,用户也可以通过 Snowflake 半结构化数据的解析功能访问半结构化数据内容。在上图 SQL 语句中,展示的是从 car_sales Table 中访问 Content dealership 字段的值,操作类似于访问结构化数据。

容错机制

Snowflake Sink Connector 支持精确一次(effectively-once)语义,保证数据传输过程中的不重不丢。Snowflake Sink Connector 首先将多个 Message 写入 Table,并将多个 Message 聚合到一个 Uncommitt 文件(还没有被提交到 Snowflake Table 的文件)中,Connector 触发 SnowPipe 读取文件,并通过一连串的 COPY 命令,将数据复制到 Snowflake Table。在此过程中,文件状态转化为 Committing。过程结束后,文件状态为 Committed(已提交)。

在 Snowflake Sink Connector 中也维护 File Cleaners 组件。在每个数据写入文件中时,File Cleaners 会记录各个文件的状态和信息,当它检测到文件已提交(Committed File)就会对文件进行删除,实现了 Internal Stage 的数据清理功能。

在数据写入 Uncommitted File 过程中,Snowflake Sink Connector 可以根据消息 ID 判断是否重复写入并去重,保证写入文件的消息不会发生重复。在 SnowPipe 转移过程中,Snowflake 保证数据传输不会丢失。

总结

本文介绍了使用 Apache Pulsar 构建实时数仓的几种方案,如基于 Pulsar 和 Flink 构建实时数仓、基于 Pulsar 和数据湖打造实时数仓,并介绍了 Snowflake Sink Connector for Pulsar 的原理,以及具体如何使用 Snowflake Sink Connector 构建实时数仓。欢迎大家关注 Snowflake Connector for Pulsar 的技术进展并分享使用 Pulsar 构建实时数仓的新方案与新技术。

引用链接

[1] 使用: [https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/)


       
点击阅读原文观看视频

本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部