linkis与SQL中间件(跨数据源混查)结合实践分享

原创
04/19 17:33
阅读数 2K


【我与WeDataSphere的故事】征文大赛获奖文章分享(一)

作者简介|PROFILE

李宏伟

资深大数据开发专家

长期奋斗在一线的码农,目前在360技术中台数据平台部参加数据中台建设工作,多年大数据领域技术架构与开发经验以及多年CDN领域技术栈开发经验。



01  背景需求


-业务需求:

在公司有很多运营、数据分析的童鞋,虽然自有的BI产品功能丰富,有各式各样的定制化分析报表、各种维度的图表。
但是有时需要临时查询数据,发现数据是分布在不同数据源里的,也有可能数据来自不同业务的不同集群。
比如查询hive表的数据,但是维度映射数据在mysql里。还有要两个mysql数据库(不同服务器)关联查询等情况,
平时这些需求都需要程序猿大神们写程序实现。

现在可以通过Linkis与SQL中间件完美结合满足上述需求。难道不香吗?


-技术迭代:

之前使用的Apache Livy作为SQL执行入口,把查询的请求都提交给Livy,但是体验上不尽人意。
一直想找个替代组件, 后来发现功能强大的linkis。至于Linkis跟Apache Livy的对比,可以查看官方相关文档。



02  SQL中间件介绍


SQL中间件是基于公司目前开源的XSQL和Quicksql两款SQL中间件,两种都支持跨数据源混查,两个都很优秀,

至于大家选择集成哪个可以根据自身情况决定。因为之前使用过XSQL,所以是在linkis增加了xsql查询引擎。

以下分别简单介绍下两款开源组件:


XSQL:

XSQL是一款低门槛、更稳定的分布式查询引擎。它允许你快速、近实时地查询大量数据。对于一些数据源(例如:Elasticsearch、MongoDB、Druid等),他可以大幅地降低学习曲线,并节省人力成本。除Hive外,每种数据源都支持除子查询外的下推执行优化。用户有时希望将位于不同数据源上的数据关联起来进行查询,但是由于各种数据源是异构的且一些数据源不支持SQL或者支持的SQL语法非常有限,因此传统互联网公司的做法是,将不同的数据同步到统一的存储介质中,再进行OLAP的查询。数据同步的过程中可能面临数据迁移、主从同步、网络带宽等诸多困难和挑战,而且需要浪费大量的人力、物力及时间,无法满足大数据产品当前阶段对于近实时甚至准实时的场景。通过XSQL你将可以避免数据迁移和时间浪费,更加专注于业务本身。XSQL可以通过下推、并行计算、迭代计算等底层支撑技术,对各种数据源的查询加速。



Quicksql:

Quicksql是一款跨计算引擎的统一联邦查询中间件,用户可以使用标准SQL语法对各类数据源进行联合分析查询。其目标是构建实时\离线全数据源统一的数据处理范式,屏蔽底层物理存储和计算层,最大化业务处理数据的效率。同时能够提供给开发人员可插拔接口,由开发人员自行对接新数据源。

03  执行流程图




04 实践过程


参考linkis官方文档《如何快速实现新的底层引擎》、《Spark引擎介绍》,然后在uejs/definedEngines下创建xsql模块进行相关开发。


功能点:

1、支持按照不同集群加载相关配置
2、支持自定义结果存储路径
3、支持是否开启默认limit 5000限制保护
4、linkis网关上socket支持token user认证。
5、适配公司内部hadoop版本
6、增加XSQL执行引擎


05 实现过程简述


由于公司大数据是有多集群的,为了节省客户端资源,可以复用客户端提交任务到不同集群,这时就需要能够灵活指定不同集群的配置文件。
目前是将用到的集群配置文件放到linkis-hadoop-conf文件夹下,用于在启动引擎时以及己启动的执行引擎里进入动态加载。


├── client-viewfs.xml├── core-site-cluster1.xml├── hbase-site-cluster1.xml├── hdfs-site.xml├── hive-default.xml├── hive-exec-log4j.properties├── hive-log4j.properties├── hive-site-cluster1.xml├── ivysettings.xml├── mapred-site-cluster1.xml├── spark-defaults-cluster1.conf├── xsql-spark-defaults-cluster1.conf└── yarn-site-cluster1.xml
在linkis-gateway网关模块,改造让socket支持token user认证。比如在创建socket连接时可以通过传入token相关参数来完成用户认证。
ws://gateway.linkis.net:9001/ws/api/entrance/connect?Token-User=xxxx&Token-Code=BML-AUTH

//这个地址也需要增加token参数   "method":"/api/rest_j/v1/entrance/execute?Token-User=xxx&Token-Code=BML-AUTH",   "data":{    "params": {      "variable":{      },      "configuration":{        "special":{
}, "runtime":{
}, "startup":{ } } }, "executeApplicationName":"xsql", "executionCode":"SELECT * FROM abc limit 5;", "runType":"sql" }}
由于业务实际查询时是需要全量数据,不需要进行limit限制。
而且是想根据每次请求中参数动态来设置是否需要Limit,而不是通过全局配置统一禁用还是开启。
业务需要自定义存储结果路径,比如跨集群跨账号存储查询结果。
以上是Linkis\ujes\entrance入口模块里进行参数接受处理。
XSQL执行引擎实现:
  • 目录结构
由于xsql是基于spark实现的。所以xsql执行引擎基本是复用了linkis spark引擎代码。
重点是修改如下:
主要涉及到linkis-ujes-xsql-engine 模块相关改动
  • pom.xml

<!--<spark.version>2.4.3</spark.version> --><!--把2.4.3修改为2.4.3.xsql-0.6.0 --><spark.version>2.4.3.xsql-0.6.0</spark.version>,
2.4.3.xsql-0.6.0这个版本请根据从开源xsql编译时获取,由于适配了公司内部,所以版本号可能略有不同。
SparkEngineExecutorFactory 类
override def createExecutor(options: JMap[String, String]): SparkEngineExecutor = {
val confFile = Paths.get(configPath, "xsql-spark-defaults-" + clusterName + ".conf").toAbsolutePath.toFile.getAbsolutePath SparkUtils.getPropertiesFromFile(confFile).filter { case (k, v) => k.startsWith("spark.") }.foreach { case (k, v) => conf.set(k, v) sys.props.getOrElseUpdate(k, v) }
}
def createSparkSession(outputDir: File, conf: SparkConf, addPythonSupport: Boolean = false): SparkSession = {

//val builder = SparkSession.builder.config(conf) //builder.enableHiveSupport().getOrCreate()
//划重点:将enableHiveSupport改成enableXSQLSupport() val builder = SparkSession.builder.config(conf) builder.enableXSQLSupport().getOrCreate()}
SparkEngineExecutor 类
override protected def executeLine(engineExecutorContext: EngineExecutorContext, code: String): ExecuteResponse = Utils.tryFinally {
//同样要增加加载配置代码段 val confFile = Paths.get(configPath, "xsql-spark-defaults-" + clusterName + ".conf").toAbsolutePath.toFile.getAbsolutePath SparkUtils.getPropertiesFromFile(confFile).filter { case (k, v) => k.startsWith("spark.") }.foreach { case (k, v) => sc.getConf.set(k, v) sys.props.getOrElseUpdate(k, v) }
}

06 如何使用



提交参数如下:
{    "params":{        "variable":{        },        "configuration":{            "special":{            },            "runtime":{                "clusterName":"cluster1",                "configPath":"/usr/local/dss/linkis/linkis-hadoop-conf",                "userName":"hadoop",                "wds.linkis.yarnqueue":"hadoop",        //可以传入绝对路径,跨集群写,前提执行账号是对目的路径有写权限                "resultPath":"hdfs://namenode.hadoop.net:9000/home/hadoop/dwc/lihongwei"                //如果不想linkis进行limit限制,则需要传入"allowNoLimit" : true                //否则不需要传这个参数,linkis则默认会进行limit 5000限制                //"allowNoLimit" : true            },            "startup":{                "clusterName":"cluster1",                "configPath":"/usr/local/dss/linkis/linkis-hadoop-conf",                "userName":"hadoop",                "wds.linkis.yarnqueue":"hadoop",        //可以传入绝对路径,跨集群写,前提执行账号是对目的路径有写权限                "resultPath":"hdfs://namenode.hadoop.net:9000/home/hadoop/dwc/lihongwei"                //如果不想linkis进行limit限制,则需要传入"allowNoLimit" : true                //否则不需要传这个参数,linkis则默认会进行limit 5000限制                //"allowNoLimit" : true            }        }    },    "executeApplicationName":"xsql",    "executionCode":"    REMOVE DATASOURCE IF EXISTS mysql_connect_name;    ADD DATASOURCE mysql_connect_name(type='mysql',url='jdbc:mysql://10.22.22.22:3306',user='root',password='123456',pushdown='false',useSSL='false',version='5.7.28');    REMOVE DATASOURCE IF EXISTS hive_cluster1;    ADD DATASOURCE hive_cluster1(type='hive',metastore.url='thrift://10.222.222.222:9083',user='test',password='test',version='1.2.1');    SELECT t1.id,t1.name,t1.title,t2.time,t2.url,t2.partner,t2.m2 FROM (SELECT id,name,title,ip FROM mysql_connect_name.database_name.mysql_tables) t1 JOIN     (SELECT m,time,url,partner,ip FROM hive_cluster1.database_name.hive_tablse WHERE day = 20200903) t2     ON t1.ip=t2.ip order by t2.time;",    "runType":"sql"}

XSQL语法说明:
删除数据源时请使用这种语法 REMOVE DATASOURCE IF EXISTS 数据源名称; 避免直接REMOVE DATASOURCE 数据源名称, 因为上来就执行删除数据源,会因为找不到数据源来报异常。
查询的表名需要增加数据源以及数据库进行限定,要符合三段式表名。比如:hive_cluster1.database_name.table_name
第一段数据源名称,就是添加数据源语法时自定义的名称hive_cluster1,比如ADD DATASOURCE hive_cluster1(... ...)
第二段数据库名称,这个需要是真实的数据库,比如database_name
第三段表名,表要是第二段数据库下真实的表名。
更多XSQL使用语法,可以查看官方相关文档。https://qihoo360.github.io/XSQL/tutorial/syntax/
这样就可以实现mysql与hive数据进行关联查询了。

相关版本



hive 1.2.1

spark 2.4.3

linkis 0.9.3

xsql 0.6.0

java 1.8+

hadoop 2.7.2

相关资源



https://github.com/WeBankFinTech/Linkis
https://github.com/Qihoo360/XSQL
https://github.com/Qihoo360/Quicksql



 扫码关注我们 

                               更多惊喜,请点击阅读原文                                                    




更多 Linkis的相关介绍请访问:

https://github.com/WeBankFinTech/Linkis/blob/master/docs/zh_CN/README.md

项目地址:

https://github.com/WeBankFinTech/Linkis

https://gitee.com/WeBank/Linkis


相关推荐

Linkis数据中间件,打造全面连通融合的金融级大数据平台

计算中间件Linkis-助力Kylin 强化连接层治理的探索 | Apache Kylin Meetup 北京站

Linkis架构解析系列(一)——Linkis RPC架构解析

微众银行大数据平台携手行业伙伴举办计算中间件Linkis线上交流会

Linkis架构解析系列---Job执行流程源码解析

Linkis JDBC是如何适配Tableau的?

【重磅发布】Linkis 0.10.0 版本发布

【里程碑】Linkis 1.0.0-RC1 版本发布


本文分享自微信公众号 - WeDataSphere(gh_273e85fce73b)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
打赏
0
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
0
分享
返回顶部
顶部