文档章节

Mastering-Spark-SQL学习笔记01 SparkSQL

o
 osc_1ee7cxmx
发布于 2018/08/07 16:36
字数 1608
阅读 9
收藏 0

钉钉、微博极速扩容黑科技,点击观看阿里云弹性计算年度发布会!>>>

SparkSQL可以让开发人员使用关系化查询对大规模结构化数据进行处理。

像Apache Spark一样,Spark SQL特别适合大规模的分布式内存计算。SparkSQL将关系型处理与Spark的函数式编程API进行整合。

SparkSQL和SparkCore的计算模型的主要区别是注入、查询和持久化(半)结构化数据的关系化框架,使用可以由SQL(带有许多HiveQL的功能)和高级的类SQL的函数式定义的DataSet API进行关系型查询(即结构化查询)。

这里的(半)结构化数据是指可以由列名、列类型以及列是否为空组成的Schema所描述的数据集。

当使用SQL或Query DSL时,查询都会成为一个有强制Encoder的DataSet。

 

DataSet是带有transformation和action算子进行结构化查询执行管道编程的接口。结构化查询在内部是一棵(逻辑的和物理的)关系型算子和表达式组成的Catalyst tree。

当一个action算子(如直接的show、count,间接的save、saveAsTable)在DataSet上被执行时,(在DataSet背后的)结构化查询会通过以下执行阶段:

1)Logical Analysis 逻辑分析

2)Caching Replacement 缓存替换

3)Logical Query Optimization,逻辑查询优化使用 rule-based和cost-based优化器

4)Physical Planning 物理计划

5)Physical Optimization 物理优化(如 Whole-Stage Java Code Generation 或 Adaptive Query Execution)

6)Constructing the RDD of Internal Binary Rows(根据Spark Core的RDD API表示结构化查询)

 

从Spark 2.0开始,Spark SQL实际上就是Spark底层内存分布式平台的主要和功能丰富的接口。将Spark Core的RDDs隐藏在更高层次的抽象之后,即使没有您的同意,也可以使用逻辑和物理查询优化策略。

换句话说,Spark SQL的Dataset API描述了一个分布式计算,它最终将被转换为RDDs的DAG(有向无环图)来执行。在幕后,结构化查询被自动编译成相应的RDD操作。

Spark SQL支持批处理和流模式下的结构化查询(后者是Spark SQL的独立模块,称为Spark Structured Streaming)

 

Spark SQL编程模型:// Define the schema using a case class

case class Person(name: String, age: Int)
// you could read people from a CSV file
// It's been a while since you saw RDDs, hasn't it?
// Excuse me for bringing you the old past.
import org.apache.spark.rdd.RDD
val peopleRDD: RDD[Person] = sc.parallelize(Seq(Person("Jacek",10)))
// Convert RDD[Person] to Dataset[Person] and run a query
// Automatic schema inferrence from existing RDDs
scala> val people = peopleRDD.toDS
people: org.apache.spark.sql.Dataset[Person] = [name: string, ag
e: int]
// Query for teenagers using Scala Query DSL
scala> val teenagers = people.where('age >= 10').where('age <= 19').select('name').as[String]
teenagers: org.apache.spark.sql.Dataset[String] = [name: string]
scala> teenagers.show
+-----+
| name|
+-----+
|Jacek|
+-----+// You could however want to use good ol' SQL, couldn't you?
// 1. Register people Dataset as a temporary view in Catalog 要先将DataSet注册为View,才可以使用SQL
people.createOrReplaceTempView("people") // 2. Run SQL query val teenagers = sql("SELECT * FROM people WHERE age >= 10 AND age <= 19") scala> teenagers.show +-----+---+ | name|age| +-----+---+ |Jacek| 10| +-----+---+

Spark SQL提供使用了基于规则的查询优化器Wole-Stage Codegen(全流程代码生成,在运行时动态生成比手写更好的代码)、使用内部二进制行格式的Tungsten执行引擎

自Spark SQL 2.2起,结构化查询可以使用Hint框架进一步优化。

Spark SQL引入了一个名为Dataset(以前是DataFrame)的表格数据抽象。DataSet数据抽象的目的是使在Spark基础设施上处理大量结构化表格数据更加简单和快速。

// 处理JSON文件并将它的子集存为CSV
spark.read
.format("json")
.load("input-json")
.select("name", "score")
.where($"score" > 15)
.write
.format("csv")
.save("output-csv")
// 结构化流式处理代码
import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("score", DoubleType, nullable = false) :: Nil)

spark.readStream
  .format("json")
  .schema(schema)
  .load("input-json")
  .select("name", "score")
  .where('score > 15)
  .writeStream
  .format("console")
  .start
// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +-----+-----+
// | name|score|
// +-----+-----+
// |Jacek| 20.5|
// +-----+-----+

在Spark 2.0中,Spark SQL的主要数据抽象是Dataset。它表示具有已知模式的记录的结构化数据。这个结构化数据表示数据集支持使用压缩列格式的紧凑二进制表示,这种格式存储在JVM堆之外的托管对象中。

它应该通过减少内存的使用和GC来加快计算速度

Spark SQL支持谓词下推来优化DataSet查询的性能,也可以在运行时生成经过优化的代码。

 

Spark SQL附带了不同的API:

1. Dataset API(前身是DataFrame API)带有强类型的类LINQ的查询特定领域语言

2. 结构化流API(即流Dataset)用于持续增量地执行结构化查询

3. 非程序员可能会通过与Hive的直接集成来使用SQL作为查询语言。

4. JDBC/ODBC爱好者可以使用JDBC接口(通过Thrift JDBC/ODBC)并将它们的工具连接到Spark的分布式查询引擎。

 

Spark SQL使用特定的DataFrameReader和DataFrameWriter对象提供了统一的接口以访问分布式存储,如Cassandra、HDFS(Hive、Parquet、JSON)。

你可以使用类SQL在HDFS、S3上的大量数据,能访问的数据可以来自不同的数据源(文件或表)。

 

Spark SQL定义了以下类型的函数:
1. 标准函数或用户定义函数(UDF),从单个行获取值作为输入,为每个输入行生成单个返回值。

2. 对一组行进行操作并计算每个组的单个返回值的基本聚合函数

3. 窗口聚合函数,对一组行进行操作,并为一组中的每一行计算单个返回值。

 

有两个支持的目录实现—内存(默认)和hive—您可以使用spark.sql.catalogImplementation来设置。

 

您可以解析来自外部数据源的数据,并让模式推断者推断模式。

// Example 1
val df = Seq(1 -> 2).toDF("i", "j")
scala> df.show()
+---+---+
|  i|  j|
+---+---+
|  1|  2|
+---+---+

val query = df.groupBy("i").agg(max("j").as("aggOrdering")).orderBy(sum("j")).as[(Int, Int)]
scala> query.show()
+---+-----------+                                                               
|  i|aggOrdering|
+---+-----------+
|  1|          2|
+---+-----------+
query.collect contains (1, 2) // true

// Example 2
val df = Seq((1, 1), (-1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show
+-------------------+
|(IF((a > 0), a, 0))|
+-------------------+
| 1|
| 0|
+-------------------+

 

Spark SQL支持两种“模式”来编写结构化查询:Dataset API和SQL。RuntimeReplaceable表达式只能使用SQL模式,如nvl、nvl2、ifnull、nullif等SQL函数。

 

Spark SQL应用开发的需要以下步骤:

1. 建立开发环境(IDEA,Scala 和 sbt) 

2. 指定库依赖关系

3. 创建 SparkSession

4. 从外部数据源加载 Dataset

5. 转变Dataset

6. 保存Dataset以持久化存储

o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
sparkSQL与kudu学习

架构图 版本 spark 特性 spark 生态 YARN 测试是否成功 hive 注意 1. 连接的 数据库 配置 外网可以访问 https://blog.csdn.net/qq17810899/article/details/90642207 2. 需要在hive 目录下面...

之渊
07/08
15
0
Spark(十二)SparkSQL简单使用

一、SparkSQL的进化之路 1.0以前: Shark 1.1.x开始:SparkSQL(只是测试性的) SQL 1.3.x: SparkSQL(正式版本)+Dataframe 1.5.x: SparkSQL 钨丝计划 1.6.x: SparkSQL+DataFrame+DataSet(测试...

osc_fht8qhb4
2018/07/15
8
0
Spark2.x学习笔记:Spark SQL的SQL

Spark SQL所支持的SQL语法 select [distinct] [column names]|[wildcard]from tableName[join clause tableName on join condition][where condition][group by column name][having conditi......

osc_4l0h8in9
2018/07/02
3
0
【Spark-SQL学习之一】 SparkSQL

环境   虚拟机:VMware 10   Linux版本:CentOS-6.5-x86_64   客户端:Xshell4   FTP:Xftp4   jdk1.8   scala-2.10.4(依赖jdk1.8)   spark-1.6 一、Shark Shark是基于Spark计...

osc_u4dvv5qa
2019/04/11
12
0
总结:Hive,Hive on Spark和SparkSQL区别

Hive on Mapreduce Hive的原理大家可以参考这篇大数据时代的技术hive:hive介绍,实际的一些操作可以看这篇笔记:新手的Hive指南,至于还有兴趣看Hive优化方法可以看看我总结的这篇Hive性能优...

hblt-j
2019/01/15
800
0

没有更多内容

加载失败,请刷新页面

加载更多

SO_REUSEADDR和SO_REUSEPORT有何不同? - How do SO_REUSEADDR and SO_REUSEPORT differ?

问题: The man pages and programmer documentations for the socket options SO_REUSEADDR and SO_REUSEPORT are different for different operating systems and often highly confusing.......

法国红酒甜
55分钟前
28
0
asp.net core之SignalR

SignalR 是什么? ASP.NET Core SignalR 是一个开源的实时框架,它简化了向应用中添加实时 Web 功能的过程。 实时 Web 功能是服务器端能够即时的将数据推送到客户端,而无需让服务器等待客户端...

一介草民Coder
今天
24
0
如何通过日期属性对数组进行排序 - How to sort an array by a date property

问题: Say I have an array of a few objects: 说我有一些对象的数组: var array = [{id: 1, date: Mar 12 2012 10:00:00 AM}, {id: 2, date: Mar 8 2012 08:00:00 AM}]; How can I sort......

javail
今天
22
0
技术教程| 百度鹰眼历史轨迹查询:轨迹抽稀功能

本文作者:用****9 本篇教程中,我们将详细地说明鹰眼历史轨迹查询(gettrack接口)中,如何通过vacuate_grade选项对轨迹进行抽稀,以及不同的抽稀力度对轨迹产生的影响。 上一篇教程中,我们...

百度开发者中心
前天
24
0
Quartz的Misfire处理规则 错过任务执行时间的处理机制

调度(scheduleJob)或恢复调度(resumeTrigger,resumeJob)后不同的misfire对应的处理规则 CronTrigger withMisfireHandlingInstructionDoNothing ——不触发立即执行 ——等待下次Cron触发频率...

独钓渔
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部