Spark SQL

原创
2019/04/11 14:56
阅读数 87

Hive On Spark和SparkSQL都是一个翻译层

把一个SQL翻译成分布式可执行的Spark程序。而且大家的引擎都是spark。
两种方式使用SparkSQL,Spark SQL is a Spark module for structured data processing.
一种是直接写sql语句,这个需要有元数据库支持,例如Hive等
另一种是通过Dataset/DataFrame编写Spark应用程序
using either SQL or a familiar DataFrame API.
In the Scala API, DataFrame is simply a type alias of Dataset[Row].
While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

Spark SQL的运行机制

  1. 将SQL语句通过词法和语法解析生成未绑定的 逻辑执行计划 (Unresolved LogicalPlan),包含Unresolved Relation、Unresolved Function和Unresolved Attribute,然后在后续步骤中使用不同的Rule应用到该逻辑计划上
  2. Analyzer使用Analysis Rules,配合元数据(如SessionCatalog 或是 Hive Metastore等)完善未绑定的逻辑计划的属性而转换成绑定的逻辑计划。具体流程是县实例化一个Simple Analyzer,然后遍历预定义好的Batch,通过父类Rule Executor的执行方法运行Batch里的Rules,每个Rule会对未绑定的逻辑计划进行处理,有些可以通过一次解析处理,有些需要多次迭代,迭代直到达到FixedPoint次数或前后两次的树结构没变化才停止操作。
  3. Optimizer 使用Optimization Rules,将绑定的逻辑计划进行合并、列裁剪和过滤器下推等优化工作后生成优化的逻辑计划。
  4. Planner使用Planning Strategies,对优化的逻辑计划进行转换(Transform)生成可以执行的 物理计划 。根据过去的性能统计数据,选择最佳的物理执行计划CostModel,最后生成可以执行的物理执行计划树,得到SparkPlan。
  5. 在最终真正执行物理执行计划之前,还要进行preparations规则处理,最后调用SparkPlan的 execute执行计算RDD

Spark SQL --join的运行机制

    Join大致包括三个要素:
  	     哪些表的-什么样的数据,大致多少数据量
  	     Join方式、
  	     Join条件以及过滤条件
		 
    Spark来说有3中Join的实现,每种Join对应着不同的应用场景:
  	Broadcast Hash Join : 适合一张较小的表和一张大表进行join
      Shuffle Hash Join :  适合一张小表和一张大表进行join,或者是两张小表之间的join
      Sort Merge Join : 适合两张较大的表之间进行join
	  
    Spark SQL中Join的一些理解:
  	Join基本实现流程-spark提供了三种join实现:sort merge join、broadcast join以及hash join。
       00.Hash join    01.SparkSQL中Broadcast Join 、 Shuffle Hash Join    Sort Merge Join
	   
      01.小表对大表(broadcast join)
          概述
            维度表和事实表进行Join操作时,为了避免shuffle,
          	我们可以将大小有限的维度表的全部数据分发到每个节点上,供事实表使用
          注意事项
          	. 被广播的表需要小于spark.sql.autoBroadcastJoinThreshold 所配置的值,默认是10M
          	 基表不能被广播,比如left outer join时,只能广播右表

      	步骤:
      02.Shuffle Hash Join
	  
	  因为被广播的表首先被collect到driver段,然后被冗余分发到每个executor上,所以当表比较大时,采用broadcast join会对driver端和executor端造成较大的压力
	  
          概述:
          	利用key相同必然分区相同的这个原理,SparkSQL将较大表的join分而治之,
          	先将表划分成n个分区,再对两个表中相对应分区的数据分别进行Hash Join,
          	这样即在一定程度上减少了driver广播一侧表的压力,也减少了executor端取整张被广播表的内存消耗

          注意事项:
         	
          步骤:
          	01.shuffle阶段:分别将两个表按照join key进行分区,
          	     将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。这个过程称为shuffle
          	02. hash join阶段:每个分区节点上的数据单独执行单机hash join算法。
			
      03.大表对大表(Sort Merge Join)
		将两张表按照join keys进行了重新shuffle,保证join keys值相同的记录会被分在相应的分区。分区后对每个分区内的数据进行排序,排序后再对相应的分区内的记录进行连接

      	步骤
      	   1. shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理;
           2. sort阶段:对单个分区节点的两表数据,分别进行排序;
           3. merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,
      	    碰到相同join key就merge输出,否则取更小一边
  不同连接类型的
  	inner join是一定要找到左右表中满足join条件的记录,我们在写sql语句或者使用DataFrmae时,可以不用关心哪个是左表,哪个是右表,
  	在spark sql查询优化阶段,spark会自动将大表设为左表,即streamIter,将小表设为右表,即buildIter

SPARK SQL 调优

SparkSQL也可以根据内存资源、带宽资源适量将参数spark.sql.autoBroadcastJoinThreshold调大,让更多join实际执行为broadcast hash join。
	Spark的调优
  	优化:主要是执行的速度和使用的空间存储
  			定位的方式:  Job Stages Storge 这些页面中查看
  			了解执行过程-查看性能指标
  	01.系统层面:
  		硬件:增加资源
  		资源参数调优
  				    硬件配置: 执行器节点数。核心数、
  					内存: 内存大小、内存区域比例、缓存
  					序列化:序列化的方式 Kryo
  					并行度
  	02.任务层面:
  			代码
  
  各个主要执行的过程
      01.Map中
        Stage的数据来源主要分为如下两类
      	01.从数据源直接读取
      	02.读取上一个Stage的Shuffle数据   
      02. Shuffle过程,
      	01.将分布在集群中多个节点上的同一个key拉取到同一个节点上,
      		进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作
      	02.慢的原因:可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作
      	03.常见的会触发:
      		reduceByKey、join、distinct、repartition
      	解决方式:
      			提高shuffle操作的并行度-
      			增加shuffle read task的数量,
      			可以让原本分配给一个task的多个key分配给多个task,
      			从而让每个task处理比原来更少的数据
      03.Reduce:
      	key-value键值对进行聚合
      
      其他:
          Join会将Join key 相同的数据分发到一个执行的Instance中

针对hive SQL, Spark SQL, Flink SQL,调优思路

1.计算管理---计算优化--调优
	    调优调什么,以及如何定位?   
	    怎么调?    
	    调试的常见案例
	    
2.怎么调试:
	从两个层面上来优化-一个是系统优化,一个是任务优化
	01.系统层面:
			
	02.任务优化层面
		从数据倾斜的角度---
		Map倾斜
		Reduce倾斜
		Join倾斜
		
	数据倾斜:
		并行计算中数据分布不均匀,数据大量集中到一点上,造成数据热点
		常见的:
		01:map端缓慢是因为数据本身的分布不合理性。
		02:reduce缓慢是因为partition造成的
		03:Jion  不同数据类型关联产生数据倾斜、空值数据倾斜
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部