文档章节

Spark新特性(DataFrame/DataSet、Structured Streaming和Spark Session)

o
 osc_1ee7cxmx
发布于 2018/08/06 15:00
字数 1697
阅读 0
收藏 0

spark 新特性主要增加DataFrame/DataSet、Structured Streaming和Spark Session

1. DataFrame/DataSet主要替换之前的RDD,主要优势在执行效率、集群间通信、执行优化和GC开销比RDD有优势。

2. Structured Streaming大部分场景替换之前的Streaming,比之前的优势集中中简洁的模型、一致的API、卓越的性能和Event Time的支持

3. SparkSession的概念,它为用户提供了一个统一的切入点来使用Spark的各项功能,SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中。

 

1. DataFrame、Dataset

1.1 RDD与DataFrame、Dataset对比

 

RDD

Dataset/DataFrame

执行效率

创建大量临时对象,对GC造成压力,优化需要对Spark运行时机制有一定的了解,门槛较高

利用Spark SQL引擎,自动优化

集群间的通信

集群间的通信,还是IO操作都需要对对象的结构和数据进行序列化和反序列化

当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象

执行优化

 

需要用户自己优化(join之后又做了一次filter操作。如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作)

自动优化,能把能将filter下推到 join下方

GC开销

频繁的创建和销毁对象,GC开销高

引入off-heap:能使用JVM堆以外的内存,off-heap就像地盘,schema就像地图,Spark有地图又有自己地盘了,就可以自己说了算了,不再受JVM的限制,也就不再收GC的困扰了

 

 

 

1.2 快速理解Spark Dataset

case class People(id:Long, name:String, age:Int)

sc.makeRDD(seq(People(1, "zhangshan", 23),People(2, "lisi", 35)))

 

 

RDD中的两行数据

People(id =1, name=“zhangshan”,age=23)

People(id =1, name=“lisi”,age=35)

 

DataFrame的数据

id: bigint

name:String

age:bigint

1

zhangshan

23

2

lisi

35

 

DataFrame比RDD多了一个表头信息(Schema),像一张表了,DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where ...),通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。

DataSet的数据。

Value:People[id:bingint, name:String,age:bigint]

People(id =1, name=“zhangshan”,age=23)

People(id =1, name=“lisi”,age=35)

 

相对于RDD,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束。使用Dataset API的程序,会经过Spark SQL的优化器进行优化,相比DataFrame,Dataset提供了编译时类型检查。

RDD转换DataFrame后不可逆,但RDD转换Dataset是可逆的

1.3 实践

创建RDD

image.png 

通过RDD创建DataFrame,再通过DataFrame转换成RDD,发现RDD的类型变成了Row类型

image.png 

image.png 

通过RDD创建Dataset,再通过Dataset转换为RDD,发现RDD还是原始类型

image.png 

经典的wordCount举例

使用Dataset API做转换操作

image.png 

使用SQL进行单词统计

image.png 

使用SQL进行排名分析

image.png 

 

2. Structured Streaming

Structured Streaming 是一个可拓展,容错的,基于Spark SQL执行引擎的流处理引擎。使用小量的静态数据模拟流处理。伴随流数据的到来,Spark SQL引擎会逐渐连续处理数据并且更新结果到最终的Table中。你可以在Spark SQL上引擎上使用DataSet/DataFrame API处理流数据的聚集,事件窗口,和流与批次的连接操作等。最后Structured Streaming 系统快速,稳定,端到端的恰好一次保证,支持容错的处理。

2.1 Structured Streaming的Spark优势

Ø 简洁的模型。Structured Streaming的模型很简洁,易于理解。用户可以直接把一个流想象成是无限增长的表格。

Ø 一致的API。由于和Spark SQL共用大部分API,对Spark SQL熟悉的用户很容易上手,代码也十分简洁。同时批处理和流处理程序还可以共用代码,不需要开发两套不同的代码,显著提高了开发效率。

Ø 卓越的性能。Structured Streaming在与Spark SQL共用API的同时,也直接使用了Spark SQL的Catalyst优化器和Tungsten,数据处理性能十分出色。此外,Structured Streaming还可以直接从未来Spark SQL的各种性能优化中受益。

Ø Event Time的支持,Stream-Stream Join(2.3.0新增的功能),毫秒级延迟(2.3.0即将加入的Continuous Processing)

 

2.2 Structured Streaming介绍

Structured Streaming则是在Spark 2.0加入的经过重新设计的全新流式引擎。它的模型十分简洁,易于理解。一个流的数据源从逻辑上来说就是一个不断增长的动态表格,随着时间的推移,新数据被持续不断地添加到表格的末尾。用户可以使用Dataset/DataFrame或者SQL来对这个动态数据源进行实时查询。每次查询在逻辑上就是对当前的表格内容执行一次SQL查询。如何执行查询则是由用户通过触发器(Trigger)来设定。用户既可以设定定期执行,也可以让查询尽可能快地执行,从而达到实时的效果。一个流的输出有多种模式,既可以是基于整个输入执行查询后的完整结果,也可以选择只输出与上次查询相比的差异,或者就是简单地追加最新的结果。这个模型对于熟悉SQL的用户来说很容易掌握,对流的查询跟查询一个表格几乎完全一样。

image.png 

对输入的查询将生成 “Result Table” (结果表)。每个 trigger interval (触发间隔)(例如,每 1 秒),新 row (行)将附加到 Input Table ,最终更新 Result Table 。无论何时更新 result table ,我们都希望将 changed result rows (更改的结果行)写入 external sink (外部接收器)。

2.3 Structured Streaming实践

创建一个 streaming DataFrame ,从监听 localhost:9999 的服务器上接收的 text data (文本数据),并且将 DataFrame 转换以计算 word counts 。

Structured Streaming 里,outputMode,现在有complete,append,update 三种,现在的版本只实现了前面两种。

complete,每次计算完成后,你都能拿到全量的计算结果。

append,每次计算完成后,你能拿到增量的计算结果。

image.png
输入:image.png

Complete模式结果

image.png 

 


Append 模式结果

image.png 

3. SparkSession

SparkSession的概念,它为用户提供了一个统一的切入点来使用Spark的各项功能,用户不但可以使用DataFrame和Dataset的各种API,学习Spark2的难度也会大大降低。

SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中

o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。

暂无文章

如何在Ruby中生成随机字符串 - How to generate a random string in Ruby

问题: I'm currently generating an 8-character pseudo-random uppercase string for "A" .. "Z": 我目前正在为“ A” ..“ Z”生成一个8个字符的伪随机大写字符串: value = ""; 8.times{......

法国红酒甜
36分钟前
12
0
Python中的mkdir -p功能[重复] - mkdir -p functionality in Python [duplicate]

问题: This question already has an answer here: 这个问题在这里已有答案: How can I safely create a nested directory? 如何安全地创建嵌套目录? 25 answers 25个答案 Is there a way...

技术盛宴
今天
15
0
原价500元的认证证书,限时免费考取!

本文作者:y****n 百度云智学院致力于为百度ABC战略(人工智能、大数据、云计算)提供人才生态体系建设,包括基于百度ABC、IoT的课程体系,整合百度优势技术能力的深度学习技术、Apollo无人车...

百度开发者中心
昨天
17
0
在virtualenv中使用Python 3 - Using Python 3 in virtualenv

问题: Using virtualenv , I run my projects with the default version of Python (2.7). 使用virtualenv ,我使用默认版本的Python(2.7)运行项目。 On one project, I need to use Pyth......

富含淀粉
今天
16
0
Python的__init__和self是做什么的? - What __init__ and self do on Python?

问题: I'm learning the Python programming language and I've came across something I don't fully understand. 我正在学习Python编程语言,遇到了一些我不太了解的东西。 In a method ......

javail
今天
15
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部