实战案例 | spark load 实战之 hive 数据导入 doris 提速9倍

2022/09/30 14:23
阅读数 790

本文目的通过介绍 spark load 一个实战案例来分享下遇到的问题,踩过的坑,希望提供一些参考和借鉴意义, 案例使用版本为 doris-1.1.1

 

   背景


知乎 DMP 平台用户圈选、画像洞察功能是基于 Doris 建设的。其中标签数据更新分为:

  1. 离线批量更新:数据源 -> Spark -> Broker Load -> Doris。

  2. 实时流式更新:数据源 -> Flink -> Routine Load -> Doris。

其中离线批量更新的数据量较大,每天 1100+ 亿条,更新时间为 8+ 小时。随着业务的发展,离线批量更新的数据量不断增加,更新时间也在不断延长,导致每天下午离线批量画像才更新,影响运营投放,用户体验不佳。因此百度 Doris contributor 陈林忠与知乎用户理解&数据赋能研发 Leader 侯容合作,共同推进基于 Spark Load 的离线导入方案,解决现有性能瓶颈的问题。

   为什么用 spark load


客户数据存储在 hive 表中,用 doris 来做实时数据分析工具, doris 本身支持 hive 外表,但性能存在瓶颈,为了提升 doris 分析性能,初期的方案是定期把 hive 表的数据,通过 brokerload 把数据从 hdfs 导入到 doris 中,过程如图:

这个方案开始运行还算平稳,后期随着数据量的增加和导入任务数变多,问题逐渐出现。直接表现就是:

    • 查询变慢:很多业务的查询对延迟敏感,影响使用体验

    • 导入变慢:很多耗时在小时级别,时效性满足不了业务的要求


观察监控可以发现:导入任务运行时 cpu 下降的比较明显,同时磁盘 I/O 保持在低水平( SSD 盘),说明在导入时 cpu 算力是个瓶颈,因为导入时需要排序、解压缩、分桶计算,这些都是 cpu 密集型的操作。



同时客户的 spark 集群算力比较空闲,所以就想到把导入过程的计算卸载到 spark 上, 充分发挥各自的优势

概括:broker load 在对大量数据导入场景的表现"费时费力"满足不了业务侧时效性的要求,所以通过 spark load 来提速


   spark load提速原理

spark load 把导入拆成计算和存储两部分,把分、排序、聚合、压缩等计算逻辑放到 spark 集群,产出结果写到 HDFS,doris 再直接从 HDFS 中拉取结果文件写到本地盘

    • broker load :BE 节点负责计算,算力取决 BE 节点个数及配置,

    • spark load : spark 集群负责计算,算力取决于集群配置,且弹性强

概括:把导入涉及的聚合排序等计算卸载到 spark 集群,充分利用 spark 强大的计算能力


   sparkload 导入过程遇到的关键问题


以导入 hive 表 hive_tag_base_v2 到 doris 中为例,来说明下整个导入过程中遇到的一些关键问题和解法,doris 表和 hive 表结构定义如下,聚合类型为 bitmap_union ,两个定义如下表



对比之前用 broker load 时导入的耗时,1100亿+行数据导入,耗时8小时+ 



接下来对比下 sparkload,使用 sparkload 前需要额外配置相关依赖,主要有3个:

    • spark 客户端:   主要用到 spark-submit ,用来提交任务到 spark 集群

    • hadoop : 主要用到hdfs组件,用来操作 HDFS

    • spark-dpp:  计算任务,把导入涉及到的分桶,聚合,排序逻辑封装到 dpp jar包中,给到spark

具体配置方法参考: https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/spark-load-manual


配置好后提交任务


问题1:  低版本hive,元数据失败

建hive表时提示报错 "Invalid method name:'get_table_req'"

原因: doris支持hive 2.3.7及以上版本,客户用是hive2.1.1版本过低,接口不兼容导致拉取元数据失败


解法: 客户对低版本hive meta的接口进行适配后,把doris默认用的hive-metastore-2.3.7.jar包手动替换成了适配后的jar后



问题2: 提交任务时报权限失败


我们在提交任务填了 broker.username,hadoop 获取的是 user 是 root(doris) 进程用 root 启动的),说明spark在提交任务的时候没有拿broker的信息,但是文档中又没有说哪里填user的信息,问题出在哪?后续跟开发沟通了解到 sparkload 最初由美团贡献,美团的 hadoop 环境是认证方式不是 sample 方式(用户名和密码),后对 sample 方式没有去支持。


spark-submit在提交任务时候拿的是环境变量HADOOP_USER_NAME,HADOOP_USER_PASSWORD 作为用户名和密码,而我们的诉求是不同的任务要支持使用不同的账号来提交,所以我们为 spark-load 新增了动态设置环境变量的功能,相关pr也合入到了社区,之后我们把导入语句改成这样,问题得到解决。




问题3:两边任务状态不一致,doris 显示状态 killed,spark 显示 runing

提交提交到spark集群后,发现两边的状态不一致


原因:任务提交到spark后,doris会通过yarn工具定期不断获取任务状态,如果持续5分钟任务还是处于accpeted状态则会自动kill

spark从接到任务到实际运行任务间隔有7分钟超过了最大超时时间

解法:把超时时间调大,而且用户协调更空闲的spark队列来跑任务



问题4:sparkload分隔符不支持字符串,只能支持单个ascii字符


sparkload 分隔符不支持字符串,只能支持单个 ascii 字符,brokerload 没有这个限制,使用时需要注意




问题5:doris拉取spark产物速度上不去,限速



spark ETL 耗时 10 分钟,doris 拉取产物 44 分钟,从监控上看速度限制在 50MB/s



原因是BE默认有限速,默认单个BE最大导入速度为10MB,控制参数名push_write_mbytes_per_sec。


把 push_write_mbytes_per_sec 调大到 300MB 后,观察监控发现速度确实上来了,同时也发现新问题:"之前虽然低,但中间不会中断,这会儿虽然速度快,但是断断续续"


通过分析代码,可以知道原因是:


断断续续跟统计代码的位置有关, 因为统计的值更新是在整个文件加载完才更新 be_push_bytes 。而当文件很大时比如需要几分钟才能加载完,我们 promethures 监控的更新时间为1分钟一次,意味着这个值在如果单个文件加载时间超过1分钟的话,不会更新反应出来就是0,但实际导入速度是不影响的。



同时观察 BE的网卡接受速度发现并没有提升,还是50MB/s,也就是说上面那个参数值并没有提升加载速度,问题出在哪?



通过进一步分析,发现瓶颈是push任务少了,ETL 任务完成后,FE 获取 spark 预处理过的每个分片的数据路径,并调度相关的 BE 执行 Push 任务。默认这个任务数是 3个,调大到 9 后验证下

    • push_worker_count_high_priority:改为9

    • push_worker_count_normal_priority 改成 9

重新提交任务后,效果非常明显,doris load 的时间极大缩短到了,单个 be 写入到120MB/s

通过这个测试我们得到这几个控制参数

    • push_write_mbytes_per_sec:BE 磁盘写入限速

    • push_worker_count_high_priority:  同时执行的 push 任务个数

    • push_worker_count_normal_priority: 同时执行的 push 任务个数



问题6:spark任务频繁oom


spark在跑任务时频繁oom


一开始怀疑 executor 给的内存不够,然后我们从16G 调大到64G,没有效果

后面发现是不仅仅需要 executor 设置,driver 也同样需要设置

"spark.driver.memory"="64g",

"spark.driver.cores"="4"


问题7:doris拉取阶段报错 type not match varchar(*) ,targetType=BITMAP



原因:doris向量下,隐式转换bitmap会有诡异的问题,所以把隐式转换默认都禁止了,sparkload在doris拉取数据阶段存在隐士转换问题,导致失败

解法:进行显示转换,相关pr已合入了社区


问题8:spark 聚合数据倾斜,导致spark ETL慢


spark 聚合时是按照分组聚合的,发现聚合比较慢,原因是数据存在倾斜, 后面重新按照离线计算的一个 key 来分,新增了一个 bucket 列,来解决数据倾斜导致计算慢的问题。

问题9:并发上限200,在数据量大的情况下 ETL 执行速度慢


原因:  由于聚合任务并发之前是写死的,上限200,导致在在数据量大的情况下 ETL 执行速度慢


解法 让spark根据stage1产生的任务数自适应并发数,相关pr已合入社区


   最终效果


上面关键问题解决后跑了一个全量的任务,数据 1.2 TB,数据行数:1100 亿+,再看下整体耗时缩短至 55 分钟,相比之前 broker load 需要 8 小时,整体导入性能提升 9 倍以上,数据导入的时效性问题得到解决,同时降低了集群的负载。


几点总结

  • doris 导入是一个 cpu 密集性操作

  • broker load 在大量数据聚合导入场景下存在性能问题,瓶颈在计算

  • sparkload 可以很好的把导入涉及的计算任务卸载到 spark 中,充分发挥各自优势

  • sparkload 配置较为复杂,门槛高,spark/hadoop 环境不同公司差异大容易踩坑,非大规模数据导入下谨慎使用

  • sparkload 支持 hive 和hdfs 两种数据源,hdfs 作为数据源直接导入问题较多,不建议使用,hive 作为数据源测试比较充分,可以使用

  

  未来规划

sparkload 底层实现时 doris 和 spark 目前是耦合的,对使用者的技术要求比较全面,既要了解 doris 也需要了解 spark,而且 spark/hadoop 环境由于不同公司差异大,版本多样,导致使用门槛较高,doris 社区目前有计划在重构这部分的逻辑。核心思路是:spark跟doris解耦,不需要doris来提交任务,任务直接在spark提交生成,产物是doris segment数据文件,完成后通知doris下载segment。


-  -

   陈林忠

百度 PALO 团队资深研发工程师,有丰富的分布式存储及分布式数据库研发经验,擅长 Doris 执行引擎及存储引擎研发。负责过 doris quick compaction的优化、Remote UDAF 等功能的实现

   侯容
知乎-平台团队-用户理解&数据赋能研发 Leader。

18 年入职知乎,曾担任社区、社交等业务高级研发和业务架构师,21 年加入平台团队担任用户理解&数据赋能组研发 Leader。带领团队从 0 到 1 从底层到业务层搭建实时数据基建和业务,同时整合资源完成用户理解工程及 DMP 的建设。当前负责业务包括用户理解工程、DMP、实时数据基建以及基于用户内容理解和数据的运营平台等。



  Apache Doris 开源社区链接参考

Apache Doris官方网站:
http://doris.apache.org

Apache Doris Github
https://github.com/apache/doris

Apache Doris 开发者邮件组:
dev@doris.apache.org


本文分享自微信公众号 - ApacheDoris(gh_80d448709a68)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部