摘要:本文整理自大数据研发高级工程师唐尚文,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为三个部分:
应用场景
实践与优化
未来规划
一、应用场景
1.1 顺丰集团业务概览
顺丰除了大家所熟悉的个人快递服务之外,它还支持同城即时配送、冷链、仓配一体、供应链综合物流服务等等,它是一个能够为客户提供涵盖多行业、多场景、智能化的综合物流服务商。
在这些行业场景背后,主要是由三网进行支撑,分别为天网、地网、信息网。天网指我们所理解的货机和无人机的航空资源;地网指服务网络、网点、陆运、铁运等资源;信息网主要是由顺丰科技支撑。
1.2 顺丰科技业务全景
顺丰科技的主要业务可以分成两部分,数字化全流程和科技服务。
数字化全流程:通过提升数字化的科技能力,助力我们内部经营的智能化升级和管理效率的提升。
科技服务:顺丰也非常关注生态上下游的用户需求,希望通过多年累积的科技能力和数字化实战经验,去沉淀输出科技化的产品和服务。比如一些成熟的行业解决方案、标准的科技产品、数字化物流开放平台等等,助力用户产业的升级与转型。
1.3 实时计算在顺丰的应用
在顺丰的业务场景中,哪些场景使用到实时计算的能力了呢?简单列举了如下四个场景。
- 第一个场景,可视化监控及资源动态调度。我们通过经营热力图对件量、客户、产品、收派比进行可视化,对资源进行动态调度。
- 第二个场景,路径规划。依据收派标准、路线距离等进行路径规划、为小哥输出动态时效及最优路径。
- 第三个场景,运单/运力异常监控。对于异常晚到的快件,现场调度人员就可以联系司机确认车辆情况。对于时效性较高的快件,比如生鲜如果出现运力异常,就会导致货物损坏和赔付。这时我们就可以通过 Flink 做实时和准实时的监控,让一线人员能够快速干预异常情况,降低公司损失。
- 第四个场景,IOT 万物互联。顺丰有数以万计的传感器设备,它们无时无刻都在产生大量的数据。我们需要对这些数据进行收集、清洗、分析,及时发现问题或者优化业务流程。
除了以上场景,实时计算也在其他场景起着非常重要的作用,给业务带来巨大的价值。
二、实践与优化
2.1 实时数据接入实践
下面来看一下我们是怎样支撑这些场景在顺丰落地的。
首先是数据实时接入实践。在顺丰实时数据接入也经历了几个较大版本的迭代和演进。
2017 年,我们基于 JStorm + Canal 实现了第一版实时数据入湖方案。但这个方案存在一些问题,比如 JStorm 不能保障数据一致,且吞吐率较低,比较难维护。
2019 年,随着 Flink 社区不断发展,Flink 逐渐补齐了很多重要的特性。因此也在这一年,我们基于 Flink+ Canal 实现了第二版实时数据集成方案。但这个方案也存在着一些问题,这个问题会在后续章节里详细展开。
为了解决这些问题,我们经历了内部的调研和实践,在 2021 年全面转向 Flink CDC。
上图是 Flink + Canal 的架构。首先 Flink 在启动时,它会读取当前 Binlog 的位置,然后 Flink 通过全量采集的方式将数据发往下游。当全量数据读取完后,再从刚刚标记的位置里消费 Binlog 数据。数据采集完后,会将数据发往 Kafka,再由 Spark 程序消费 Kafka 中的数据写往 Hudi。
这个架构它存在以下三个问题。
- 第一个问题,数据一致性难保障。因为在全量采集过程中,不会进行锁表。如果发生数据变更,全量任务可能就会采集到这些变更的数据,那么就有可能和 Binlog 采集的数据存在一些重复。
- 第二个问题,架构复杂、链路长。因为需要两套计算引擎和一个消息队列 Kafka 才能将数据写入到数据湖中,所以这个方案涉及的组件多,链路长,消耗资源大,维护复杂。
- 第三个问题,存储端要求高。它需要下游 Upsert 或者 Merge 写入,才能去除重复的数据,确保数据的最终一致性。如果下游不支持这个能力,它的数据一致性就无法得到保障。
基于以上问题,我们整理出了对实时数据接入的一些需求,主要概括为以下三点。
- 能够保障数据一致。全量增量数据同步自动切换,并能够保障数据的一致。比如刚刚提到的 Flink+Canal,它能做到全量增量的自动切换,但不能做到数据的准确性。
- 最大限度降低对源数据库的影响。比如不能使用锁,也要能进行流控等等。
- 具有较好的同步性能,这是非常核心的需求。
在正常的实践过程中,我们往往比较关注同步任务的稳定性和处理的性能,因为它直接可能影响到实时性要求较高的业务下游。我们对比了很多开源的社区的同步框架,最终选择了 Flink CDC,主要有下面几方面的考虑。
- 它能很好的解决我们的业务痛点。
- 它的可扩展性和稳定性比较好,社区活跃度也比较高。
- 它的技术栈和我们的技术栈相匹配。
Flink CDC 的核心特性包括数据转换能力强,它能够复用 Flink 数据清理、数据转换的能力,同时也能利用 Flink 支持 Sink 的能力;支持全量增量的采集自动切换;能够保证数据的一致性;能够做到架构简单,这是之前的架构所不能达到的。
在引入 Flink CDC 后,用户怎样使用实时数据同步呢?
- 第一步,用户 A 需要向数据库管理员申请数据库 A1 的访问权限,这个数据库的相关信息是由管理员去维护的。
- 第二步,用户 A 得到相关的数据库访问信息后,他通过 SQL/JAR 模式,在实时数据计算平台上进行作业的开发、调试、上线,然后由用户 A 自己维护作业整体的生命周期。同时用户 B 可能也有类似的表的接入需求,他也需要走前面的流程。
- 第三步,用户可以选择将他的表注册到数据地图供其他人查询使用。
这个基本上也能够满足一些业务的需求,但是我们在接入过程中发现了一些问题。
问题一:使用门槛高、维护难。用户对实时数据进行接入的时候,他需要了解 Flink SQL、Flink DataStream API 的使用方式以及相关的参数。对小白用户来说,它的门槛比较高。而且用户只想做表的接入,并不想太了解相关的技术细节。另外,数据库这种分享链接的方式,很容易造成信息泄露,就导致数据源管理员工作量较大,同时也会造成一定安全性的问题。
为了解决接入门槛较高这个问题,我们首先对接入过程进行了产品化,使用 Flink CDC 做底座,实现了顺丰实时数据接入产品,实时数据直通车。通过实时数据直通车,用户可以以一种零编码的方式勾选待同步表的相关信息,然后自动生成对应的数据同步任务。同时也能够完成敏感字段自动加密的功能,无需了解 Flink 的相关配置就能实现数据快速入湖。
另外,通过数据源管理授权用户访问、避免密码泄漏,方便用户进行数据管理和数据共享。
上图是实现的大概架构,简要步骤如下:
-
第一步,数据源授权。用户申请数据源读取权限并获得管理员授权。
-
第二步,作业创建。直通车根据用户勾选的相关信息生成对应的同步作业。
-
第三步,元数据同步。直通车根据待同步的表信息在数据资产创建对应的元数据。
-
第四步,数据使用。用户根据数据资产上面的信息,通过查询引擎使用同步后的数据。
问题二:实时采集链路不稳定,有可能会对源系统造成较大的影响。以 MySQL CDC 为例,它本质上是伪造了一个从节点。当 MySQL 发生数据变更时,它就会把变更的一些 Binlog 同步给从节点。然后 MySQL CDC 需要解析出 Binlog 信息,获取出当前监听表的数据源。如果需要采取多个表,对应就会分成多个 CDC 任务,Binlog 就会被反复的去拉取,很容易造成机器带宽打满,同时也会造成数据库压力较高。
除此之外,Flink CDC 采集的过程中,也很容易出现 OOM 的问题,导致作业被反复拉起。
任务带宽打满的问题,其实就是做任务的合并。任务的合并我们也尝试了很多方案,最终使用了如下的方案。
比如每个表在采集全量数据的过程中,都会启动 Flink CDC 任务读取数据。当任务到达 Binlog 阶段时,Flink CDC 就会采集当前任务具体的 ID 位置汇报给后台的服务。后台的服务会定时检测每个任务,当有任务达到合并状态时,也就是都达到 Binlog 时,他就会把这两个任务合并成一个新的任务。
对于数据库系统压力的问题,我们实现了 Flink CDC 的数据限流。在全量数据读取的过程中,有可能因为突然采集的数据比较多,造成数据库压力较高的问题,数据读取限流就能降低数据全量读取数据时数据库的压力。
同时读取限流对增量读取也有一定的效果。比如当一个 CDC 任务挂掉很长一段时间后再次启动时,它会先去挂掉之前的位置重新消费,这个段时间就可能会产生大量的数据。如果没有限流,数据大量涌入会造成流量暴涨,程序的反压,甚至有可能导致程序内存不足而挂掉。
Flink CDC 在读取全量数据块时,有可能会在某一个高低水位中写入大量数据,然后把高低水位中的 Binlog 数据 Merge 到全量的数据块的时候,会导致程序的 OOM 的问题。我们优化了这个步骤,使用流式 Merge 避免了 OOM 的问题。
Flink CDC 在分配 Binlog 时,会默认分配到 Subtask-0 上,而 Flink 程序支持多实例采集。当采集几十个 DB 时,这几十个 DB 的实例都分配在 Subtask-0 上,会导致数据倾斜或者 OOM 的问题,因此我们支持了随机分配策略,即让 Binlog 采集随机分配到不同的 Subtask 上。
同时在顺丰内部,我们很多数据源都是分库分表的,分布在不同的实例上。因此我们支持一个任务采集多个实例上的表,每一个实例对应一个 CDC Source,多个 Source 通过 Union 的方式进行数据合并。另外我们也支持同一个表写入不同的存储后端,这样就不需要为同一个表的采集再起一个任务。
问题三:表结构变更无法同步。Flink CDC 支持 DataStream API 的方式获取 DDL 变更的数据,但不同的存储系统处理的 Schema Evolution 的方式是不一样的。如果在写入下游的时候没有处理 Schema Evolution,有可能就会造成数据丢失和原数据不一致的问题。
在表结构变更自动同步到 Kafka 的场景中,当我们选择写入的存储后端是 Kafka 时,为了支持 Kafka Schema Evolution,引入了 Schema Registry 组件,写入 Kafka 中的数据都是以 Avro 形式进行存储的,所以当原端系统发生 Schema 变更时,我们只需要把 Schema 信息注册到 Schema Registry 上,同时再把 Schema 同步到数据地图就能够完成 Schema Evolution 的流程。
那么我们如何高效的识别同步的数据是否发生了变更呢?
我们知道 Flink CDC 在采集数据到 Binlog 阶段的时候,它是单线程的,而且它采集的数据是严格有序的。我们的方案就是利用了这个原理,在单线程处理的过程中,实现了带 Schema 签名的 Source Record。
当遇到 DDL 时,会对这个 DDL 之后的第一条数据重新计算 Schema 的签名,且这个 Schema 的签名的有效期只到下个 DDL 为止。此时的数据都包含了 Schema 的签名信息,然后数据会发到不同 Subtask。在每个 Subtask 中,我们只要对比前后数据的签名就能够判断 Schema 发生了变更。
通过这种方式,去判断 Schema 是否一致,实现高效的 Schema 变更识别。
在表结构变更自动同步到 Hudi 的场景中,与原来设计的方案相比,Flink 写入 Hudi 的任务在启动之后写入的 Schema 是不能变更的。如果 Schema 进行了变更,需要停止任务,并以新的 Schema 进行启动,这种方式会造比较大的维护成本。而且 Flink 写 Hudi 时候,每一次 Checkpoint 都会触发一次 Commit,每一个 Commit 中的数据对应的 Schema 信息都必须保持一致。因此为了实现表结构同步而不终止任务,我们做了如下两件事。
当遇到 DDL 时,我会对数据进行截段,如上图所示。当我遇到 DDL 时,我就会把 DDL 和 DDL 之后数据存储到 Split Data 中,这样做是为了 Schema 的数据一致。直到下一次 Checkpoint 的时候,他会把 Split Data 中的 DDL 数据取出来,并发往下游的某个 writeFunction。writeFunction 接收到 DDL 之后会将它保存,然后在 Checkpoint 的时候将 DDL 数据发往 coodinator,由 coodinator 执行 DDL 变更。然后动态刷新对应的 Commit 的 Schema 信息,让 Commit 的 Schema 信息在接下来的数据 Schema 的信息里面保持一致。
我们通过表结构自动变更,能够解决大部分生产中比如像简单的类型修改、列增加等一些场景的 Schema 同步的问题,降低了维护的成本。
上图是场景化的整体架构。从下往上看,我们支持 MySQL、Oracle、PG 的采集,还有数据直通车、分库分表、敏感字段识别、表结构同步、数据加密、任务合并、读取限流、资产注册等功能。用户只需要在实时计算的实时数据直通车里做一些数据源的申请和表结构的确认,就能实现快速的数据入湖。
2.2 实时数据开发实践
首先介绍下实时数据开发的背景。如上图所示,对于离线开发,我们可以通过数据地图对数据资产进行查询和管理,同时资产打通了离线开发平台。然后在开发平台上使用数据地图的元数据,实现了查数据、管数据、用数据的统一。
但对于实时开发,我们如果需要使用实时资产,首先需要获取到对应 Kafka 的相关信息,这些信息是由每一个用户自行维护的,另外需要在实时开发平台上通过 create table 引用这个信息,然后再进行数据开发。这里会出现一些问题,我们没有对元数据进行维护,管理也会很不方便。比如 Kafka 有上百个字段,用户就需要写很长的 SQL,就会造成比较难维护。
为了解决查数据的问题,我们对实时数据资产和离线数据资产,在数据地图上进行了统一的管理。数据地图上有详细的字段描述信息以及数据的类型、安全等级等等,用户可以快速了解字段的含义。同时我们也支持数据的快速预览,通过预览数据用户可以快速知道数据的实际情况,以便后续的数据开发。
除此之外,我们还支持链路血缘追踪。通过血缘用户可以知道数据上下游之间的关系,对应的计算作业以及具体的影响范围。
在管数据方面,为了快速让数据进行实时资产的接入,在资产引入过程中,只要用户填好对应的信息,数据地图就会采集具体数据自动生成对应的 Schema,并让用户确认和补充完善相关的信息。
另外,在数据地图上还支持权限的授权管理,它会生成对应的策略到 Ranger 中做权限进行统一管理。
在用数据方面,我们实现了 KafkaCatalog 和离线类似一样的开发体验,就是说用户可以在左边栏通过点击的方式就可以很快对实时字段进行引用。这些数据都统一来自数据地图进行管理,同时还对 SQL 进行一些相关的权限校验。如上图右侧的两个截图,实时开发基本和离线开发保持一致,这样用户用起来就比较方便。
另外我们还支持调试功能。比如用户登录实时计算开发平台时,SessionManager 就会识别到这个登录,同时在后台创建一个 Flink Session DEBUG 的服务。这时用户将他需要调试的 SQL 发送到的调试服务,调试服务会根据它的 SQL 获取到对应的目标 Schema 信息。
然后通过获取到的目标 Schema,创建模拟的目标表,同时生成对应的 insert into 语句,再发送到 Flink DEBUG 集群。Flink DEBUG 集群会将调试结果实时写回调试服务。然后用户的前端会以轮询的方式查询这个实时的 DEBUG 数据。通过调试功能,能让用户清楚每一段 SQL 生成的数据结果,提高调试的效率。
接下来讲一下数据接入和开发全链路追踪对比。
以前,如果要接入数据和数据开发。首先要知道数据库的地址和 Kafka 的地址,同时申请权限和了解字段的相关信息。之后还要需要了解 Flink CDC 和 Flink SQL 接入等相关知识、配置参数使用,测试环境调试。之后在生产环境发布,验证数据。最后对资产在地图上的维护,同时在生产环境供其他人使用。
优化后,我们只需在直通车上申请权限,然后就可以直接在直通车上进行发布,同时验证数据。之后就可以开放给所有人使用了。发布时间从原来的五天到现在不到一天就能够完成实时数据的接入。
2.3 总结
我们从以下三个方面进行了优化。
- 实时接入方面,我们进行了实时接入的产品化,同时支持表结构实时同步和一些实践的优化。
- 实时开发方面,我们对实时资产进行了管理,同时我们支持 Flink SQL 权限校验和 KafkaCatalog。
- 实时调试方面,我们支持单步调试,对用户的 DEBUG 体验进行了优化。
三、未来规划
未来的规划主要分为以下三个方面:
-
资源的弹性伸缩。通过识别有瓶颈的 Flink 作业,为其适配合适的资源、并行度,保障业务时效。
-
统一的元数据管理。通过构建统一的元数据服务,并集成支持数据湖表管理等功能,推进数据湖在顺丰实时场景落地。
-
流批一体。基于 Flink、数据湖技术打造流批一体的计算平台。
更多内容
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc