文档章节

RDD

仟昭
 仟昭
发布于 02/17 16:55
字数 2419
阅读 5
收藏 0

1.概念:

RDD是spark整个体系中最基础核心的概念,RDD(Resilient Distributed DataSet)即弹性分布式数据集

弹性:

RDD支持横向多分区,纵向操作内存不足写入磁盘,hdfs等,实现数据在内存和外存的灵活切换。

RDD可以在存储在内存和磁盘之间,并且自动或者手动切换

RDD具有良好的容错性(即RDD可以通过血统转化为其他RDD)

Task如果失败,会进行特定次数的重试(default 4)

Stage如果失败会就行特定次数的重试

RDD可以存储任意类型的数据

RDD的分区数目可以自行设定

分布式:

RDD可以存储在多台主机的内存或者磁盘之上。每个RDD可以分为多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而在集群中进行分布式并行计算。

数据集:

RDD是数据集合的抽象,从外部看RDD就是封装之后的可容错的数据集

RDD相当于是一个代理,对RDD进行操作其实就是对分区进行操作,就是对每一台机器上的迭代器进行操作,因为迭代器引用着我们要操作的数据。
RDD存储的是逻辑数据结构,不存储真实数据,像关系数据库中的view 视图,只是表结构。

 

2.RDD的五个特征:

A list of partitioner     一系列分区
A function for computing each split     会有一个函数作用在每个切片上
A list of depedencies on other RDDs    即RDD具有血统,RDD和RDD之间存在依赖关系
Optionally, a Partitioner for key-value RDDs (可选)如果是RDD中装的是KV类型的,那么Shuffle时会有一个分区器。默认是HashPartitioner。目前只有HashPartitioner 和RangeRartitioner
Optionally, a list of preferred locations to compute each split on (可选)如果只从HDFS中读取数据,会感知数据则位置,将Executor启动在数据所在的机器上

 

3.生成RDD的方式:

执行Transform操作(变换操作),根据已有的RDD计算得到

读取外部存储系统的数据集,如HDFS,HBase,或任何与Hadoop有关的数据源。

将Driver的Scala集合通过并行化的方式变成RDD(试验、测验)
 

4.RDD的两种操作:

针对RDD的操作,分两种,一种是Transformation(变换),一种是Actions(执行)。

Transformation(变换)操作属于懒操作(算子),不会真正触发RDD的处理计算。

Actions(执行)操作才会真正触发。前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系。两类操作的主要区别是,Transformation转换操作(比如map、filter、join等)接受RDD并返回RDD,而Actions行动操作(比如count、collect等)接受RDD但是返回非RDD(即输出一个值或结果)。

RDD采用了惰性调用,即在RDD的执行过程中,真正的计算发生在RDD的“行动”操作,对于“行动”之前的所有“转换”操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算。

<img  data-cke-saved-src='1.jpg' src='1.jpg'>


转换操作:对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
下面列出一些常见的转换操作(Transformation API):

filter(func):筛选出满足函数func的元素,并返回一个新的数据集
map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合

行动操作:行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。 
下面列出一些常见的行动操作(Action API):

count() 返回数据集中的元素个数
collect() 以数组的形式返回数据集中的所有元素
first() 返回数据集中的第一个元素
take(n) 以数组的形式返回数据集中的前n个元素
reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
foreach(func) 将数据集中的每个元素传递到函数func中运行
 

5.RDD的依赖关系:

RDD的依赖关系是spark计算优于hadoop的重要原因之一。

RDD中不同的操作会使得不同RDD中的分区会产生不同的依赖。

RDD中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency)

窄依赖:对于窄依赖操作,它们只是将Partition的数据根据转换的规则进行转化,并不涉及其他的处理,可以简单地认为只是将数据从一个形式转换到另一个形式。  如 map  filter  union等 

窄依赖表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区

宽依赖:表现为存在一个父RDD的一个分区对应一个子RDD的多个分区。宽依赖典型的操作包括groupByKey、sortByKey等

窄依赖源码:

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

    //返回子RDD的partitionId依赖的所有的parent RDD的Partition(s)

    def getParents(partitionId: Int): Seq[Int]

    override def rdd: RDD[T] = _rdd

}

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {

    override def getParents(partitionId: Int) = List(partitionId)

}

宽依赖源码:

class ShuffleDependency[K, V, C](

    @transient _rdd: RDD[_ <: Product2[K, V]],

    val partitioner: Partitioner,

    val serializer: Option[Serializer] = None,

    val keyOrdering: Option[Ordering[K]] = None,

    val aggregator: Option[Aggregator[K, V, C]] = None,

    val mapSideCombine: Boolean = false)

extends Dependency[Product2[K, V]] {

 

override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

//获取新的shuffleId

val shuffleId: Int = _rdd.context.newShuffleId()

//向ShuffleManager注册Shuffle的信息

val shuffleHandle: ShuffleHandle =

_rdd.context.env.shuffleManager.registerShuffle(

    shuffleId, _rdd.partitions.size, this)

    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))

}

spark中一旦遇到宽依赖就需要进行shuffle的操作,所谓的shuffle的操作的本质就是将数据汇总后重新分发的过程。

这个过程数据要汇总到一起,数据量可能很大所以不可避免的需要进行数据落磁盘的操作,会降低程序的性能,所以spark并不是完全内存不读写磁盘,只能说它尽力避免这样的过程来提高效率 。

spark中的shuffle,在早期的版本中,会产生多个临时文件,但是这种多临时文件的策略造成大量文件的同时的读写,磁盘的性能被分摊给多个文件,每个文件读写效率都不高,影响spark的执行效率。所以在后续的spark中(1.2.0之后的版本)的shuffle中,只会产生一个文件,并且数据会经过排序再附加索引信息,减少了文件的数量并通过排序索引的方式提升了性能。

 

6.RDD的运行流程

<img  data-cke-saved-src='5.jpg' src='5.jpg'>

1)Driver端 创建RDD对象 SparkContext根据用户提交的程序计算RDD之间的依赖关系,构建DAG

2)Driver端 DAGScheduler将DAG 切分Stage(切分的依据是遇到宽依赖shuffle),将stage中生成的Task以TaskSet的形式给TaskScheduler

3)Driver端 TaskScheduler调度Task(根据资源情况将Task调度到对应的Executor中)

 4)Executor接收Task,然后用实现了Runnable接口的包装类将Task包装起来丢入到线程池中执行。

7、RDD底层实现原理
RDD是一个分布式数据集,顾名思义,其数据应该分部存储于多台机器上。事实上,每个RDD的数据都以Block的形式存储于多台机器上,下图是Spark的RDD存储架构图,其中每个Executor会启动一个BlockManagerSlave,并管理一部分Block;而Block的元数据由Driver节点的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系,当RDD不再需要存储的时候,将向BlockManagerSlave发送指令删除相应的Block。

8、RDD cache的原理
RDD的转换过程中,并不是每个RDD都会存储,如果某个RDD会被重复使用,或者计算其代价很高,那么可以通过显示调用RDD提供的cache()方法,把该RDD存储下来。那RDD的cache是如何实现的呢?

RDD中提供的cache()方法只是简单的把该RDD放到cache列表中。当RDD的iterator被调用时,通过CacheManager把RDD计算出来,并存储到BlockManager中,下次获取该RDD的数据时便可直接通过CacheManager从BlockManager读出。
 

© 著作权归作者所有

共有 人打赏支持
上一篇: Spark 架构图
仟昭
粉丝 7
博文 72
码字总数 56545
作品 0
杨浦
私信 提问
Spark算子:RDD行动Action操作(1)–first、count、reduce、collect

first def first(): T first返回RDD中的第一个元素,不排序。 scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) rdd1: org.apache.spark.rdd.RDD[(String, String)] ......

chensanti234
2016/12/20
15
0
Spark Rdd coalesce()方法和repartition()方法

在Spark的Rdd中,Rdd是分区的。 有时候需要重新设置Rdd的分区数量,比如Rdd的分区中,Rdd分区比较多,但是每个Rdd的数据量比较小,需要设置一个比较合理的分区。或者需要把Rdd的分区数量调大...

Zero零_度
2016/07/18
19
0
Spark算子:RDD基本转换操作(2)–coalesce、repartition

coalesce def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T] 该函数用于将RDD进行重分区,使用HashPartitioner。 第一个参数为重分区......

chensanti234
2016/12/19
4
0
Spark算子:RDD键值转换操作(4)–cogroup、join

cogroup ##参数为1个RDD def cogroupW]): RDD[(K, (Iterable[V], Iterable[W]))] def cogroupW], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] def cogroupW], partitioner:......

chensanti234
2016/12/20
20
0
spark源码阅读笔记RDD(一)RDD的基本概念

spark源码阅读笔记RDD(一)RDD的基本概念 什么是RDD? 从文献1我们知道,Matei Zaharia博士给RDD下的定义是: Formally, an RDD is aread-only,partitionedcollection ofrecords。关键词有只...

u014236541
2018/05/10
0
0

没有更多内容

加载失败,请刷新页面

加载更多

webstorm 常用设置

###常用快捷键 ctrl+D//复制粘贴当前行ctrl+W//选中单词ctrl+←/→ //以单词作为边界跳光标位置ctrl+alt+L//格式化代码shift+tab/tab//减少/扩大缩进(可以在代码中减少行缩进)ct...

niuhongxia
15分钟前
1
0
Web前端自动化单元测试

Web前端自动化单元测试 单元测试的很多,比如辅助开发,预测开发,提高模块可靠性等。 还可以提高项目开发思路。 前提需求 本文基于nodejs环境,需要jasmine和 Karma。 nodejs:不多说,很突出...

DrChenXX
17分钟前
1
0
漫漫优化路,总会错几步(记一次接口优化)

最近做了一个搜索接口的优化,反复压测了四次,终于达到要求了,记录一下,晚上加个鸡腿🍗 业务逻辑 从OpenSearch中检索出数据,然后各种填充组装数据,最后返回 逻辑看似很简单,当初我也...

java菜分享
21分钟前
0
0
springboot 统一异常处理(包含统一数据校验)

1、统一异常处理的优势 在开发中,我们是否遇到过如下两种奇葩现象: (1)只要没有成功,不管什么原因,前端界面给出提示:服务端错误/异常。哪怕是数据校验不过,也这样提示(嗯,反正先把...

编程SHA
24分钟前
2
0
基于arcface 人脸识别demo使用教程

最近在研究虹软家的arcface 人脸识别 demo,现在就给大家分享一下官方的demo ** 工程如何使用? ** 1.下载代码: git clone https://github.com/asdfqwrasdf/ArcFaceDemo.git 或者直接下载压缩...

是哇兴哥棒棒哒
25分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部