01
导语
02
背景
03
方案选型
-
Hive on Tez
-
无感切换:SQL语法仍然是Hive SQL,通过配置将Hive的执行引擎由MapReduce替换为Tez即可,上层应用无需改造
-
性能较差:该方案对大规模数据集的并行处理能力较差,在发生数据倾斜时表现明显 -
社区不活跃:该方案在业界落地相对较少,社区交流讨论不多 -
运维成本高:Tez引擎执行出现异常时,可以参考的资料较少
-
Hive on Spark
-
无感切换:SQL语法仍然是Hive SQL,通过配置将Hive的执行引擎由MapReduce替换为Spark即可,上层应用无需改造
-
版本兼容性差:仅支持Spark 2.3以下版本,没法利用Spark 3.x以上版本的新特性,不符合未来升级需求 -
性能不理想:Hive on Spark 仍然使用Hive Calcite解析SQL为MapReduce原语,只是它会用Spark引擎而非MapReduce引擎执行这些原语,性能并非十分理想 -
社区不活跃:该方案在业界落地较少,社区不活跃 -
资源申请不灵活:Hive on Spark 的方案在提交Spark 任务时,资源只能固定设置,难以适用于多租户、多队列场景
-
Spark SQL
-
选型小结
04
技术改造
-
Spark兼容性改造
-
支持UDF多线程: 先前Hive上的UDF,如遇SimpleDateFormat类型日期处理时不会抛出异常,然而使用Spark执行会报错,原因是Spark引擎采用了多线程方式执行此类函数。通过修改UDF的代码,把SimpleDateFormat设置成ThreadLocal可以解决该问题。 -
Grouping ID支持: Spark不支持Hive的grouping_id,使用自带的grouping_id()来代替,但是这会引发兼容性问题,我们通过改造Spark,实现了在解析SQL的时候把grouping_id自动转换成grouping_id() -
参数兼容性: Hive特定的参数需要映射到Spark中相应的参数
-
复杂函数不起别名: 在Hive当中,如果没有给某个通过计算得到的列起别名的话,Hive默认会起一个以_c开头的列名,但是Spark却不会,当调用到某些可能会返回逗号的函数的时候(比如get_json_object),会报列个数不匹配的问题。该问题的work around建议是给所有的列都起别名,拒绝使用_c0的这样的别名。 -
不支持永久函数: Spark不支持永久函数的原因是代码里没有去HDFS上把jar包下载下来。另外临时函数是不需要指定库名的,但是永久函数是需要的,为了推广永久函数特增加了一个功能:在当前库找不到对应函数的时候,会去查找default库下的永久函数。 -
不支持reset参数: 线上任务有使用reset命令的场景,我们通过改造Spark,使Spark SQL支持reset命令。
-
Spark新特性启用与配置优化
-
开启动态资源分配策略(DRA): 任务根据当前程序的需求自动申请或释放 Executor 实现动态资源调整,解决了资源分配不合理的问题。自动回收空闲资源极大地降低了集群资源浪费,另外通过限制最大 Executor 数量来避免大查询占用过多资源导致队列阻塞。 -
开启自适应查询优化(AQE):记录任务执行阶段的相关统计指标,根据统计的指标优化后续执行阶段的执行计划,如:动态合并小的 Shuffle 分区、动态选择合适的 Join 策略、动态优化倾斜的分区等,提高了数据处理效率。 -
自动合并小文件: 在写入前插入 Rebalance 算子,再结合 Spark 的 AQE 优化,自动的合并小分区、拆分大分区,进而很好地解决了大量小文件问题。
-
Spark 架构改进
-
基于标签配置: 对于不同的计算场景或平台预定义一些标签绑定一些特定的配置,在任务执行时只需要带上对应的标签,就会自动在配置中心补充预设的配置。例如:即席查询任务,配置共享引擎和大查询限制等配置;ETL 任务,配置独立引擎和小文件合并配置等。 -
并发限制:在一些异常情况下,某个客户端可能发送大量的请求导致 Kyuubi 服务工作线程被占满。我们在 Kyuubi 中实现了 User 和 IP 级别的连接并发限制,避免某个用户或客户端发送大量请求导致服务被打满,此功能也已经贡献给社区。 -
事件采集:Kyuubi 在 SQL 执行的各阶段暴露了各种事件,通过这些事件可以很方便的进行 SQL 审计和异常分析,为小文件优化、SQL 优化等提供很好的数据支撑。
05
自动化迁移工具
-
通过Pilot收集Hive任务的信息,获取SQL语句、队列、工作流名称等信息 -
SQL解析:使用SparkParser分析Hive任务的SQL语句,找到输入输出所对应的数据库、数据表等信息 -
构建输出映射表:为双跑任务创建输出数据的映射表,与线上数据表区分开来,避免影响线上数据 -
引擎替换:将双跑任务的执行引擎替换为Spark SQL -
模拟运行:使用Hive、Spark引擎执行对应的SQL任务,并将任务运行结果输出到上述映射表里,用于对数校验 -
一致性校验:通过比较两张表的行数、循环冗余码(基于CRC32算法)进行数据一致性校验。 其中,CRC32算法是一种简单快速的数据校验算法。Spark中提供了内置函数CRC32,该函数的值是Long类型,最大值不超过10^19。在我们的应用场景下,首先将表中每行的各列数据concat_ws起来计算其CRC32,并将该CRC32转换为Decimal(19, 0);接着对表各行计算所得的CRC32值求和得到可反映整表内容的checksum CRC32值,用于一致性比较。该条校验SQL具体为: 映射表中部分字段为Map、List等集合类型,会存在两张实际数据一致的表,由于集合类型字段内部数据排序的不同,导致CRC32统计结果发生偏移并影响到一致性校验结果。针对这类情况,我们开发了专门的UDF对集合内部排序后进行一致性校验。 映射表中有部分字段为Float、Double等浮点类型,在数据一致性校验环节,由于统计精度的问题,两张表的CRC32统计结果可能存在差异,导致一致性校验环节发生误判。为此,我们优化了校验算法,在计算CRC32统计值时,对浮点字段保留小数点后4位。 自动降级:Hive任务切换到当SparkSQL运行失败后,通过Pilot自动降级到Hive并重新提交运行,保证无论如何任务都能顺利执行。
我们提供了平台化的手段来执行上述流程:用户根据项目名称,找到所属工作流。
在模拟运行阶段,支持监控运行状态
06
迁移效果
-
广告:离线任务整体性能提升约 38%,计算资源节省30%,计算效率提升20%,加快广告数据产出,促进增收 -
BI:总耗时降低 79%,资源节省 43%,保障 P0 任务的产出时效,核心报表提前半小时至1小时产出,同时,提升补数效率,快速解决数据故障、数据回溯等日常问题 -
用户增长:数据生产提早2小时,帮助用户增长核心报表在10点前产出,提升UG运营效率 -
会员:订单数据生产提早8小时产出,数据分析提速10倍以上,帮助会员提升运营分析效率 -
爱奇艺号:平均执行时间缩短 40%,日执行时间减少约 100 小时
07
未来计划
-
升级迁移工具
-
引擎优化
-
存储变大问题:由于小文件优化中引入了 Repartition 使得数据被打散,导致部分任务写入的数据压缩率降低,后续对社区提供的 Z-order 优化进行调研自动优化数据分布。 -
DPP 导致 SQL 解析过慢问题:在迁移中发现 DPP 优化可能导致部分多表 Join 的 SQL 解析非常慢,目前是通过限制 DPP 优化的 Join 个数来避免这个问题,Spark 3.2 以及后续的版本中对 Spark SQL 解析进行加速,并且也有一些相关的 Patch,计划对这些 Patch 进行分析并应用到当前版本。 -
任务关键指标完善:我们目前已经在平台侧采集了一些 Spark SQL 的执行指标,如:输入输出文件大小和文件数、Spark SQL 各阶段运行时间等,可以直观的看到有问题的任务以及一些优化的效果。后续还需要对这些指标进行完善,例如:Shuffle 数据量、数据倾斜、数据膨胀等指标,探索更多的优化手段,提升 Spark SQL 计算效率。
-
模拟测试引擎

本文分享自微信公众号 - 爱奇艺技术产品团队(iQIYI-TP)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。