可视化任务编排&拖拉拽 | Scaleph 基于 Apache SeaTunnel的数据集成

原创
07/04 19:59
阅读数 2K

这次在 6月 Meetup 为大家带来的是Scaleph 基于 Apache SeaTunnel (Incubating) 的数据集成介绍,希望你有所收获。

本次演讲主要包括五个部分:

  1. 关于Scaleph

  2. Scaleph架构&功能简介

  3. SeaTunnel社区贡献

  4. 系统演示

  5. 开发计划

Apache SeaTunnel (Incubating)

王奇

Apache SeaTunnel Contributor

搜索推荐工程师,大数据 Java 开发

01 Scaleph的缘起

我最早是从事搜索推荐工作,在团队里面负责维护Dump系统,主要是为我们的搜索引擎提供喂数据的功能,先给大家介绍在维护过程中主要的5个痛点问题:

及时性和稳定性

搜索推荐是电商平台的核心在线系统,尤其是对数据的及时性和稳定性要求非常高。由于搜索推荐会接收整个电商平台 C 端的绝大部分流量,所以一旦服务出现波动的时候,可能就造成服务受损,导致用户的体验大打折扣。

业务复杂/大宽表设计

Dump系统会将电商平台的商品、类目、品牌、店铺、商品标签、数仓的实时/离线数据及模型数据会经过一系列的预处理,最终输出成一张大宽表,在这个过程中,业务的复杂性和多变性,会侵入到Dump系统中来,所以应对的技术挑战相对就更高了。

全量+实时索引

全量索引每天跑一次,主要目的是更新 T+1 频率更新的数据。当全量索引结束之后,我们会通过实时索引去刷新需要实时更新的数据,比如说商品的价格、库存变动相关的信息。

我们的上游数据来源非常多,有消息队列、数据库、大数据相关的存储以及 dubbo 接口,由于是大宽表设计,以商品索引为例,大宽表会以商品为主,如果是店铺索引,会以店铺为主,根据数据的不同,上游的数据变动不一定是商品或店铺维度的,数据也会产生一定的联动更新。

搜索推荐服务当时也承担着C端绝大部分的流量,当公司其他团队的性能跟不上的时候,他们一般会把数据通过Dump系统送到搜索引擎,然后我们团队代替他们返回给Web页面,避免后续对他们发起二次请求调用。

同时,如果其他团队的业务系统产生了脏数据,也需要Dump系统做数据保护,防止数据外泄给C端用户造成不好的影响,所以开发维护中的时候,也有很大的难度。

02 为什么引入Flink?

作为国内 Flink 的早期使用者,阿里巴巴在搜索推荐领域拥有悠久的历史和成功的经验,在搜索推荐团队开发维护 Dump 系统的职业经历促使我开始关注使用Flink做A/B实验的报表、数据实时流之外的相关工作,主要也就是用Flink来实现Dump系统为搜索去提供Dump平台的工作,使用Flink做数据集成有5个优点:

  1. 天然的分布式支持:Flink支持多种部署和运行方式,单机、yarn、Kubernetes;

  2. 低延迟、海量吞吐:在众多大厂中应用广泛;

  3. 生态支持:Flink提供了众多开箱即用的connector,支持csv、avro数据格式,kafka、pulsar等消息系统以及众多的存储系统,和大数据生态紧密结合;

  4. 基于分布式轻量异步快照机制实现exactly-once语义,为任务的失败、重启、迁移、升级等提供数据一致性保障;

  5. metrics。Flink除了自身提供的 metrics 外,metrics 框架可以让用户为任务开发自定义的 metrics,丰富监控指标;

03 为什么选择SeaTunnel?

后来接触到 SeaTunnel 的时候,很喜欢 SeaTunnel 的设计理念!SeaTunnel 是运行在 Flink 和Spark 之上,高性能和分布式海量数据的下一代集成框架。

重要的是它是开箱即用的,并且针对现有的生态可以实现无缝集成,因为运行在 Flink 和 Spark 之上,可以很方便地接入公司现有的 Flink 和 Spark 的基础设施。另一方面 SeaTunnel 也有很多的生产案例,在进入 Apache 基金会孵化之后,社区非常活跃,未来可期。

04 关于Scaleph

项目出发点

我们最开始的想法就是为 SeaTunnel 提供 Web 页面,能够做一个数据集成的开源系统。目前我们最主要的目标还是想为 SeaTunnel 做一个开源可视化的数据开发和管理系统,后面期望 Scaleph 能够最大程度的降低实时和离线数据任务的开发门槛,为开发人员提供一站式的数据开发平台。

项目亮点

在真正的生产应用中,进行数据集成的时候,以可视化任务编排或 SQL 开发为数据集成的主要形式,我们认为 Drag and Drop 可视化任务编排可以最大程度减轻用户做数据集成的负担;

另外就是实现对作业进行多版本管理,数据源的支持;

  • Flink集群支持多版本/多部署环境;

  • 实时/周期任务也有相关的支持。

上面是我们系统的架构图,用户主要使用 Web UI,通过作业管理功能封装的 SeaTunnel 算子,用户在页面进行拖拉拽配置,系统自动生成SeaTunnel的配置文件,最后通过资源管理中用户上传的资源 jar 包一起通过 Flinkful 库提交到 Flink 集群中。资源管理的资源 jar 包的存在目的是支持用户可以上传自已研发的相关 jar 包,补足SeaTunnel 相关的缺陷,或对SeaTunnel和Flink本身的功能进行增强!

我们用 quartz 开发了一个调度任务,当任务提交到 Flink 后,任务会定时去 Flink 集群将任务信息拉过来,存储到 MySQL 里面,最终用户在 Web UI 页面可以看到任务相关运行信息。

Scaleph功能简介(数据开发)

01 项目管理

主要是用户创建数据同步任务的时候,能够按照不同的业务维度进行相关的管理工作。

02 作业管理

通过拖拉拽的操作可以创建SeaTunnel的数据任务,然后进行相应的提交运行。

03 资源管理

SeaTunnel 是以 Apache2.0 开源证书进行开源的,与 MySQL 的 JDBC 驱动包开源协议不兼容,SeaTunnel 的 jdbc connector 是不提供相关的 JDBC 驱动依赖的。当用户使用 jdbc connector 时,**需要自行提供 JDBC 驱动包。**我们在这里提供了资源管理的功能,用户可以自己上传驱动包,然后再把 SeaTunnel 任务和 MySQL 驱动一起提交到集群中以保证任务的正常运行。

04 集群管理

主要是提供Flink集群信息的录入,目前可以支持Standalone Session 集群录入,用户录入后,提交SeaTunnel作业时就可以选择集群,任务就会在集群运行。

05 数据源管理

支持用户提前录入一些数据源信息,这样就不用每个任务都把数据源信息输入一遍。同时,还可以去实现数据源的共享和权限限制,防止数据源信息明文泄露。

Scaleph功能简介(运维中心)

运维中心是一个实时任务和周期任务的运行日志,用户提交任务的时候看到任务相关的信息,我们还提供了链接跳转操作,用户点击可以跳转到Flink的Web UI上面去,通过Flink官方的Web UI页面,可以看到任务具体的执行信息。

Scaleph功能简介(数据标准)

01 数据元

数据治理是个大的体系,大家比较关心元数据、数据血缘、数据资产,但是数据标准也是数据治理的重要一环,我们把公司自己内部使用的标准系统开源出来,给大家分享数据标准的相关知识。

在很多数仓的开发过程中,由于是多人协作的,同样一个含义的字段,在不同的模型表中,开发会定义不同的字段来表达同样的含义和业务。数据标准希望能通过数据元,来统一数仓开发人员的模型字段定义。

02 参考数据

数仓中的数据是通过数据集成工具从业务系统中拉过来的,会不可避免地出现同样含义的字段在不同业务系统中有不同的定义,而这些含义相同定义不同的字段就需要数仓人员去进行维护,而且维护的过程以线下文档为主,可能存在维护过时的情况。

同时也会出现业务知识无法直接映射为数仓模型信息的问题,数据标准让用户可以在 Web 页面中对这些业务知识进行维护。

上图是一个具体案例。这里是定义的两个业务系统,一个是系统A,一个是系统B,它们分别有不同的性别枚举值,同时A/B系统的枚举描述也都不一样,那怎么办?

这个时候,我们通过数仓开发人员可以定一套统一的标准,比如把编码统一定为**0,1,2,**相应的描述也定义好,通过中间的一个参考数据映射,用户就可以方便的去看。

03 后续设想

是否能在数据集成过程中,直接通过数据标准进行Transform 操作,实现知识和模型自动维护和映射。

04 Scaleph功能亮点

**数据的可视化开发。**我们认为在数据同步领域,可视化拖拉拽,可以帮助用户快速创建数据集成任务,用户拖拉拽出两个算子,填写相应的参数就可以创建数据集成任务。

Flinkful是我们为 Flink 开发的一个Java客户端。

Flink 作为一个流行的计算引擎,提供了很多方式让用户使用,比如说命令行接口、HTTP 接口等,通过命令行接口用户可以提交任务、创建任务及取消任务;HTTP 接口主要是用于 Web UI 界面。

在对接 Flink 的过程,我们发现 Flink 作为一个运行在 JVM 之上的一个应用与同样运行在 JVM 之上的 Scaleph 应用,二者的集成却要通过 shell 脚本,很不合理。所以我们开发了 Flinkful,打开 Flink 在 Java 生态的开放能力,让用户通过 Flinkful,可以直接对 Flink 集群和任务做管理。

我们认为 Flinkful 对 Flink 基础设施维护人员是比较有意义的,所以从 Scaleph 仓库中剥离出来,单独开源。

插件体系。我们希望通过定义插件,提供系统扩展接口,用户和 Scaleph 开发者可以通过这些接口快速增强 Scaleph 的功能和特性。目前我们定义了两个插件,分别是数据源插件和 SeaTunnel 插件,通过数据源插件可以快速扩展出 JDBC、ES、Kafka、Clinkhouse之类的数据源,把这些数据源集中到 Scaleph 系统进行统一的配置和使用。

目前 SeaTunnel 里面提供了很多 connector 和 transform 插件,如果逐一去开发页面的话,是比较耗时的一个事情,我们就想着用一种简单、声明式的方式,把 SeaTunnel 相关的参数定义出来,能快速的把 SeaTunnel 相关的能力完整的迁到 Scaleph 项目上来。

问题分析

Flink-jdbc-connector 功能增强

SeaTunnel 官方文档中的很多案例,都是以 FakeSource和 ConsoleSink 实现的,而我们在开发中是以 jdbc-connector 为主的。在集成过程中,我们发现 flink-jdbc-connector 插件的 JdbcSink 只支持 Stream 模式运行,后来我们就给它实现了 Batch 模式。

JdbcSource 需要用户提供 sql,程序在内部通过正则表达式获取到 sql 的列、表信息,以生成 JdbcSource 的 RowTypeInfo。**但是在定义复杂 sql 的时候会出现别名、子查询之类的情况,正则表达式难以覆盖所有场景。我们使用 Jdbc 的 Connection 获取到 sql 的 ResultSet,从 ResultSet 直接获取 sql 的列信息,以生成 JdbcSource 的 RowTypeInfo。

Seatunnel-core-flink.jar 瘦身

SeaTunnel 是运行在 Flink 和 Spark 之上,二者会分别打成两个 jar 包,seatunnel-core-flink.jar 就是 Flink 对应的实现。在2.1.1版本中,Seatunnel 会把基于 flink 实现的 connector 都打进这个 fat jar 包中。

而真正去使用的时候,数据同步任务,可能只会使用其中的 1-2 种 connector。Seatunnel 任务提交的时候会有一定量的额外网络开销。

我们想实现这种效果:**有一个比较 thin 的 core jar包,然后再加上相关的 connector 的 jar 包。**提交的时候,以core-jar包为主,加上相关的 connector 的 jar 包。同时前面介绍过的资源 jar 包上传,如 SeaTunnel 的 jdbc-connector 缺少的 JDBC 驱动包,携带资源 jar 包和 connector jar 包的任务提交都是同一种处理方式。

后来社区在开展 connector 拆分的时候,我们也积极在相关 issue 下分享了相关经验,当 Seatunnel 2.1.2 发布时,我们的系统也是很轻松地就适配了 seatunnel-core-flink.jar 和 connector jar 分离的发布形式。同时用户没有在 Flink 集群提前准备 JDBC 驱动的情况下,也可以通过资源管理的功能,上传驱动包,在提交 SeaTunnel 任务时,带着驱动包一起提交。

Flink jobId 获取问题

Flink 任务提交这一块的最核心方式是以命令行接口的形式去实现的,因此用户需要通过 shell 脚本去提交 Flink 任务。Flink 任务提交后,命令行客户端会把对应的任务 id 输出到控制台日志中,用户就需要捕获输出到控制台上的日志,从中提取出任务 id。

因为我们这个项目和 Flink 的所有交互全是通过 Flinkful 库实现,Flinkful 可以把这样一个 jobId 直接作为接口调用的返回值给发回来。所以我们的实现相比捕获控制台日志提取 jobId 还是比较优雅的。

SeaTunnel 调用 System.exit() 问题

SeaTunnel 任务在去执行的时候,先会对用户编写的配置文件进行检查,如果检查失败,会直接调用 System.exit(),然后这个时候 JVM 也就退出了。SeaTunnel 本身的提交方式是以 shell 脚实现的,因此 JVM 退出是没有问题的。

但是当 Scaleph 系统,把它集成到我们应用里面的时候,在调用这个方法,就会导致我们 Scaleph 这样的一个应用会直接挂掉,导致我们服务的一个不可用。因此,我们也是对任务提交的这一块代码,通过 SecurityManager,增加了相关的一个权限限制,然后规定 SeaTunnel 相关的提交任务程序,禁止调用 System.exit() 方法。

05 SeaTunnel 社区贡献

和我一起开发 Scaleph 一个朋友,这里是我们俩的一些提交的 pr,比如上面说的 jdbc-connector 的功能增强。还有就是 jdbc-connector 的 upsert 功能的实现。flink-jdbc-connector 的 JdbcSink 的一个很大的缺陷是**只支持 insert 功能,无法实现 update,这会相当限制这个 connector 的功能。**我们也是开发了 upsert 语义的支持,支持数据的重复同步。

01 系统演示

这个项目时间充足的话是可以进行 Docker 环境和 IDE 环境演示的,这里时间有限就选择 Docker 环境给大家进行演示,演示视频(直接跳转23'18s):

02 后续开发计划

目前我们还是会尽快把 SeaTunnel 相关的 connector 和 transform 插件,全搬到我们的可视化拖拉拽的页面上去,能够让用户完整的感受到 SeaTunnel 的一个强大。另外一个就是随着 SeaTunnel-connector 的相关插件丰富,也要把 connector 对应的数据源种类给它丰富上去。

我们也希望能为数据开发和数据集成做一些 DAG 相关的编排调度,同时也希望能够在数据开发方面支持 SQL 的任务开发。

Apache SeaTunnel

//  保持联络 //

小助手 : Seatunnel1 备注oschina

来,和社区一同成长!

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

**仓库地址: **

https://github.com/apache/incubator-seatunnel

网址:

https://seatunnel.apache.org/

Proposal:

https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal

Apache SeaTunnel(Incubating) 2.1.0 下载地址:

https://seatunnel.apache.org/download

衷心欢迎更多人加入!

能够进入 Apache 孵化器,SeaTunnel(原 Waterdrop) 新的路程才刚刚开始,但社区的发展壮大需要更多人的加入。我们相信,在**「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」**(精英管理)、以及「**多样性与共识决策」**等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:

https://github.com/apache/incubator-seatunnel/issues

贡献代码:

https://github.com/apache/incubator-seatunnel/pulls

**订阅社区开发邮件列表 : **

dev-subscribe@seatunnel.apache.org

开发邮件列表:

dev@seatunnel.apache.org

加入 Slack:

https://join.slack.com/t/apacheseatunnel/shared_invite/zt-123jmewxe-RjB_DW3M3gV~xL91pZ0oVQ

**关注 Twitter: **

https://twitter.com/ASFSeaTunnel

展开阅读全文
加载中

作者的其它热门文章

打赏
0
2 收藏
分享
打赏
0 评论
2 收藏
0
分享
返回顶部
顶部