文档章节

10.Spark之RDD及编程接口

鞋底留个洞
 鞋底留个洞
发布于 2016/01/28 18:15
字数 3578
阅读 131
收藏 1
点赞 1
评论 0

1.起点Hello World

    val sc = new SparkContext("spark://...", "Hello World", "SPARK_HOME路径", "APP_JAR路径")

    val file = sc.textFile("hdfs:///root/Log")

    val filterRDD = file.filter(_.contains("Hello World"))

    filterRDD.cache()

    filterRDD.count()

  • 第 1 行:在Spark中做任何操作,首先要创建一个Spark的上下文。

  • 第 2 行:通过sc变量,利用textFile接口从HDFS文件系统读入Log文件,返回一个变量file。

  • 第 3 行:对file变量进行过滤操作。判断每一行字符串是否包含“Hello World”字符串,生成新的变量filterRDD。

  • 第 4 行:对filterRDD进行cache操作,以便后续操作重用filterRDD这个变量。

  • 第 5 行:对filterRDD进行count计数操作,最后返回包含“Hello World”字符串的文本行数。

    短短五行程序,却包含了Spark中很多重要的概念。下面逐一介绍Spark编程中的重要概念。

  1. 弹性分布式数据集RDD(Resilient Distributed DataSets):程序中的file和filterRDD变量都是RDD。

  2. 创建操作(creation operation):RDD的初始创建都是由SparkContext来负责的,将内存中的集合或者外部文件系统作为输入源。

  3. 转换操作(transformation operation):将一个RDD通过一定的操作变换成另一个RDD,比如file通过filter操作变换成filterRDD,所以filter就是一个转换操作。

  4. 控制操作(control operation):对RDD进行持久化。可以让RDD保存在磁盘或者内存中,以便后续重复使用。比如cache接口默认将filterRDD缓存在内存中。

  5. 行动操作 (action operation):由于Spark是惰性计算(lazy computing)的,所以对于任何RDD进行操作,都会出发Spark作业的运行,从而产生最终的结果。例如对filterRDD进行的count操作就是一个行动操作。Spark中的行动操作基本分为两类,一类操作结果变成Scala集合或者标量,另一类就将RDD保存到外部文件或者数据库系统中。

    对于一个Spark数据处理程序而言,一般情况下RDD与操作之间的关系如下图所示,经过创建操作、转换操作、控制操作、行动操作来完成一个作业。当然在一个Spark应用程序中,可以有多个行动操作,也就是有多个作业存在。

    

2.RDD的五个接口

    RDD是弹性分布式数据集,即一个RDD代表一个被分区的只读数据集。一个RDD的生成只有两种来源,在Hello World中已有所提现:

  1. 来自内存集合和外部存储系统

  2. 通过转换操作来自于其他RDD

    RDD没必要随时被实例化。由于RDD的接口只支持粗粒度的操作(即一个操作会被应用到RDD的所有数据上),所以只要通过记录下这些作用在RDD上的转换操作,来构建RDD的继承关系(lineage),就可以有效的进行容错处理,而不需要将实际的RDD数据进行拷贝。这对于RDD来说是一项非常强大的功能。也即是在一个Spark应用程序中,我们所用到的每个RDD,在丢失或者操作失败后都是可以重建的。

    应用程序开发者还可以对RDD进行另外两个方面的控制操作:持久化和分区。

  1. 开发者可以指明需要重用哪些RDD,选择一种存储策略(例如in-memory storage)将它们保存起来,以备使用。

  2. 开发者还可以让RDD根据记录中的键值在集群的机器之间重新分区。这对于RDD的位置优化是非常有作用的。例如让将要进行join操作的两个数据集以同样的方式进行哈希分区。

    如何表示这样一个分区、高效容错、支持持久化的分布式数据集呢?一般情况下抽象的RDD需要包含以下五个接口:

partition 分区,一个RDD会有一个或者多个分区
preferredLocations(p) 对于分区p而言,返回数据本地化计算的节点
dependencies() RDD的依赖关系
compute(p, context) 对于分区p而言,进行迭代计算
partitioner() RDD的分区函数

2.1RDD分区(partitions)

    对于一个RDD而言,分区的多少涉及对这个RDD进行并行计算的粒度,每一个RDD分区的计算操作都在一个单独的任务中被执行。

    对于RDD分区而言,用户可以自行指定多少分区,如果未指定就会使用默认值。可以利用RDD的成员变量partitions所返回的partition数组的大小来查询一个RDD被划分的分区数。例如,我们利用spark-shell交互式命令终端测试一下:

    指定分区数的情况:

    scala> val rdd = sc.parallelize(1 to 100, 2)

    scala> rdd.partitions.size

    

    未指定分区数的情况:(系统默认的分区数是这个程序所分配到的资源的CPU核的个数)

    scala> val rdd = sc.parallelize(1 to 100)

    scala> rdd.partitions.size

    

2.2RDD优先位置(preferredLocations)

    RDD优先位置属性与Spark中的调度有关,返回的是此RDD的每个partition所存储的位置,按照“移动数据不如移动计算”的理念,在Spark进行任务调度的时候,尽可能的将任务分配到数据块所存储的位置。

    以从Hadoop中读取数据生成RDD为例,preferredLocations返回每一个数据块所在的机器名或者IP地址,如果每一块数据是多份存储的,那么就会返回多个机器地址。

    scala> var rdd = sc.textFile("hdfs://master:9000/input/wordcount.txt")

    scala> val hadoopRDD = rdd.dependencies(0).rdd

    scala> hadoopRDD.partitions.size

    scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(0))

    

2.3RDD依赖关系(dependencies)

    由于RDD是粗粒度的操作数据集,每一个转换操作都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在Spark中存在两种类型的依赖:窄依赖(Narrow Dependencies)、宽依赖(Wide Dependencies)。

  • 窄依赖:每一个父RDD的分区最多只被子RDD的一个分区所使用。

  • 宽依赖:多个子RDD的分区依赖于同一个父RDD的分区。

    在Spark中明确区分这两种依赖关系有两个方面的原因:

  1. 窄依赖可以在集群的一个节点上如流水线般执行,可以计算所有父RDD的分区;

    宽依赖需要取得父RDD的所有分区上的数据进行计算,将会执行类似于MapReduce一样的Shuffle操作。

  2. 窄依赖在节点计算失败后的恢复会更加有效,只需重新计算对应的父RDD的分区,而且可以在其它节点并行计算;

    在宽依赖的继承关系中,一个节点的失败将会导致其父RDD的多个分区重新计算,这个代价是非常高的。

    

2.4RDD分区计算(compute)

    对于Spark中每个RDD的计算都是以partition(分区)为单位的,而且RDD中的compute函数都是在对迭代器进行复合,不需要保存每次计算的结果。

    在下面程序中,rdd变量是一个被分成两个分区的1~10集合,在rdd上连续进行转换操作map和filter,由于compute函数只返回相应分区数据的迭代器,所以只有最终实例化时才能显示出两个分区的最终计算结果。


2.5RDD分区函数(partitioner)

    在Spark中目前实现了两种类型的分区函数:HashPartitioner(哈希分区)和RangePartitioner(区域分区)。需要注意的是partitioner这个属性只存在于(K, V)类型的RDD中,对于非(K, V)类型的partitioner的值就是None。partitioner函数既决定了RDD本身的分区数量,也可以作为父RDD Shuffle输出(MapOutPut)中每个分区进行数据切割的依据。

    下面以HashPartitioner为例说明partitioner的功能。

?

3.RDD的四种操作

3.1创建操作

  1. 集合创建操作:RDD可以由内部集合来生成,Spark提供了两类函数来实现:parallelize和makeRDD。

  2. 存储创建操作:Spark整个生态系统与Hadoop完全兼容,Hadoop支持的文件类型或者数据库类型,它同样支持。

3.2转换操作

  1. 基本转换操作:

    map[U: classTag](f: T => U): RDD(U):将RDD中类型为T的元素一对一地映射为类型为U的元素。

    distinct(): RDD[T]:返回RDD中所有不一样的元素。

    flatMap[U: classTag](f: T => TraversableOnce[U]): RDD[U]:将RDD中的每一个元素进行一对多转换。


    repartition(numPartitions: Int): RDD[T]:相当于coalesce函数中shuffle=true时的简易实现。

    coalesce(numPartitions: Int,shuffle: Boolean=false): RDD[T]:对RDD的分区进行重新分区。

    randomSplit(weights: Array[Double],seed: Long=System.nanoTime): Array[RDD[T]]:根据weights权重将一个RDD切分成多个RDD。

    glom():RDD[Array[T]]:将RDD每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。

    union(other: RDD[T]): RDD[T]:将两个RDD集合中的数据进行合并,不会去重。

    intersection(other: RDD[T]): RDD[T]:返回两个RDD集合中的数据的交集,不含重复元素。

    intersection(other: RDD[T], partitioner: Partitioner): RDD[T]:同上

    subtract(other: RDD[T]): RDD[T]:返回在主RDD集合中出现但不在other中出现的元素。

    subtract(other: RDD[T], partitioner: Partitioner): RDD[T]:同上

    mapPatitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartiton: Boolean=false): RDD[U]

    mapPatitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartiton: Boolean=false): RDD[U]

    ?

    zip[U: ClassTag](other: RDD[U]): RDD[(T,U)]:将两个RDD组成Key/Value形式的RDD,但它们的分区数量和元素数量必须相同,否则相同系统会抛出异常

    zipPartitons(参数分多种情况,不一一列举了):将多个RDD按照分区组成新的RDD,分区数相同,元素数没有要求。

    zipWithIndex(): RDD[(T,Long)]:将RDD中的元素和这个元素的ID组成键/值对。

    zipWithUniqueId(): RDD[(T,Long)]:将RDD中的元素和一个唯一的ID组成键/值对。

  2. 键值RDD转换操作

    partitionBy(partitioner: Partitioner): RDD[(K,V)]:根据partitioner函数生成新的ShuffledRDD,原RDD重新分区

    mapValues[U](f: V=>U): RDD[(K,U)]:针对[K,V]中的V进行map操作。

    flatMapValues[U](f: V=>TraversableOnce[U]): RDD[(K,U)]针对[K,V]中的V进行flatMap操作。

    combineByKey(3个方法参数不同)

    foldByKey(3个方法参数不同)

    reduceByKey(3个方法参数不同)

    groupByKey(3个方法参数不同)

    cogroup(3个方法参数不同)

    join(3个方法参数不同)

    leftOuterJoin(3个方法参数不同)

    rightOuterJoin(3个方法参数不同)

    subtractByKey(3个方法参数不同)

3.3控制操作

  1. cache(): RDD[T]

    persist(): RDD[T]

    persist(level: StorageLevel): RDD[T]

    在Spark中对RDD进行持久化操作是一项非常重要的功能,可以将RDD持久化在不同层次的存储介质中,以便后续的操作能够重复使用,这对iterative(迭代)和interactive(交互)的应用来说会极大的提高性能。

  2. checkpoint

    checkpoint接口是将RDD持久化在HDFS中,与persist(如果也持久化在磁盘上)的一个区别是checkpoint将会切断此RDD之前的依赖关系,而persist接口依然保留着RDD的依赖关系。checkpoint的主要作用有如下两点:

    (1)如果一个Spark程序会长时间驻留运行(如Spark Streaming一般会7*24小时运行),过长的依赖将会占用很多系统资源,定期将RDD进行checkpoint操作,能够有效地节省系统资源。

    (2)维护过长的依赖关系还会出现一个问题,如果Spark在运行过程中出现节点失败的情况,那么RDD进行容错重算的成本会非常高。

3.4行动操作

    行动操作是和转换操作相对应的一种对RDD的操作类型,在Spark程序中每调用一次行动操作,都会触发一次Spark的调度     并返回相应的结果。从API来看,行动操作可以分为两种类型:

  1. 行动操作将标量或者集合返回给Spark的客户端程序,比如返回RDD中数据集的数量或者一部分符合条件的数据。

    first:返回RDD中的第一个元素。

    count:返回RDD中元素的个数。

    reduce(f: (T,T)=>T):对RDD中的元素进行二元计算,返回计算结果。

    collect()/toArray():以集合形式返回RDD的元素。

    take(num: Int):将RDD作为集合,返回集合中[0, num-1]下标的元素。

    top(num: Int):按照默认的或者指定的排序规则,返回前num个元素。

    takeOrdered(num: Int):以与top相反的排序规则,返回前num个元素。

    aggregate[U](zeroValue: U)(seqOp: (U,T)=>U, combOp(U,U)=>U):aggregate行动操作中主要需要提供两个函数。一个是seqOp函数,其将RDD(元素类型为T,可以和U为同一类型)中的每一个分区的数据聚合成类型为U的值。另一个函数combOp将各个分区聚合起来的值合并在一起得到最终类型为U的返回值。

    fold(zeroValue: T)(op: (T,T)=>T)aggregate的便利接口,op操作既是seqOp操作,也是combOp操作。

    lookup(key: K): Seq[V]:lookup是针对(K,V)类型RDD的行动操作,对于给定的键值,返回与此键值对应的所有值。


  2. 行动操作将RDD直接保存到外部文件系统或者数据库中,比如HDFS文件系统中。

    saveAsTextFile(path: String)

    saveAsTextFile(path: String, codec: Class[_<:CompressionCodec])

    saveAsObjectFile(path: String)

    saveAsHadoopFile[F<:OutputFormat[K,V]](path: String)

    saveAsHadoopFile[F<:OutputFormat[K,V]](path: String, codec: Class[_<:CompressionCodec])

    saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: class[_], outputFormatClass: Class[]......)

    saveAsHadoopDataset(conf: JobConf)

    这是旧版本API中提供的七个将RDD存储到外部介质的函数,前六个都是saveAsHadoopDataset的简易实现版本,仅支持将RDD存储到HDFS中,而saveAsHadoopDataset的参数类型是JobConf,所以它还可以将RDD保存到其它数据库中,例如Hbase、MongoDB、Cassandra等。

    Spark针对新版本Hadoop API提供了三个行动操作函数。

    saveAsNewAPIHadoopFile[F<:NewOutputFormat[K,V]](path: String)(implicit fm: ClassTag[F])

    saveAsNewAPIHadoopFile(path: String, keyClass: Class[_],......)

    saveAsNewAPIHadoopDataset(conf: Configuration)

    前两个API支持将RDD保存到HDFS中,第三个则支持所有MapReduce兼容的输入输出类型。

© 著作权归作者所有

共有 人打赏支持
鞋底留个洞
粉丝 2
博文 28
码字总数 28568
作品 0
朝阳
高级程序员
RDD转换成DataFrames

官方提供了2种方法 1.利用反射来推断包含特定类型对象的RDD的schema。这种方法会简化代码并且在你已经知道schema的时候非常适用。 先创建一个bean类 case class Person(name: String, age: I...

cjun1990
2016/04/08
3.3K
0
Spark(六):SparkSQLAndDataFrames对结构化数据集与非结构化数据的处理

一:简单了解SparkSQL。 Spark SQL 是结构化的数据处理一个Spark模块。与基本的Spark RDD API不同,Spark SQL 所提供的接口为Spark 提供有关数据和正在执行的计算的结构的详细信息。Spark S...

牧羊人Berg
2016/06/05
673
0
spark RDD解读---第一谈

总结:目前数据流预算对两种应用的处理并不高效,一种是:迭代式运算,这在图运算和机器学习中应用非常常见。另一种是:交互式数据挖掘工具。这两种情况下,数据存储在内存中运算的速度回大大...

hhh666
2017/10/25
0
0
Spark常用的算子以及Scala函数总结

Spark与Scala 首先,介绍一下scala语言: Scala 是一种把面向对象和函数式编程理念加入到静态类型语言中的混血儿。 为什么学scala? spark提供了R、Python等语言的接口,为什么还要重新学一门...

流川枫AI
2017/12/24
0
0
Spark技术内幕:Storage 模块整体架构

Storage模块负责了Spark计算过程中所有的存储,包括基于Disk的和基于Memory的。用户在实际编程中,面对的是RDD,可以将RDD的数据通过调用org.apache.spark.rdd.RDD#cache将数据持久化;持久化...

anzhsoft2008
2015/01/18
0
0
Spark SQL 中 dataFrame 学习总结

dataFrame多了数据的结构信息。就是schema。 RDD是分布式的 Java对象的集合。DataFrame是分布式的Row对象的集合。 DataFrame 提供了详细的结构信息,可以让sparkSQL清楚的知道数据集中包含哪...

ChinaUnicom110
2017/09/13
0
0
Spark中的编程模型

Spark中的基本概念 在Spark中,有下面的基本概念。 Application:基于Spark的用户程序,包含了一个driver program和集群中多个executor Driver Program:运行Application的main()函数并创建S...

闪电
2016/07/28
10
0
Apache Spark:大数据处理统一引擎

原文出处:Matei Zaharia 译文出处:Daisy 工业和研究中数据的大幅增长为计算机科学带来了巨大的机会与挑战。由于数据大小超过了单台机器的能力,用户需要新的系统将计算扩展到多个节点。因此...

Matei Zaharia
2016/12/05
0
0
spark RDD解读---第二谈

第二章:弹性分布式数据集 2.1 RDD的特性和目标:RDD提供良好的编程能力,并保持:自动位置感性性调度、自动容错、可伸缩性。同时具备工作集的良好的描述能力。目前数据恢复分为两部分:检查...

hhh666
2017/10/27
0
0
Apache Spark从入门到精通

源码阅读是一件非常容易的事,也是一件非常难的事。容易的是代码就在那里,一打开就可以看到。难的是要通过代码明白作者当初为什么要这样设计,设计之初要解决的主要问题是什么。 在对Spark...

瑺柈茬妳滴裑笾
2015/04/13
772
2

没有更多内容

加载失败,请刷新页面

加载更多

下一页

1.14 救援模式

确保开机启动时连接镜像文件,如果是真机服务器,就需要:U盘或光盘镜像启动进入BIOS 不同主板进入bios按键不同,一般是F12或Esc 光标:移动到Boot(开机启动项) 减号移动:光标选中行,按-...

小丑鱼00
5分钟前
0
0
ES11-全文检索

高级别全文检索通常用于在全文本字段(如电子邮件正文)上运行全文检索。 他们了解如何分析被查询的字段,并在执行之前将每个字段的分析器(或search_analyzer)应用于查询字符串。 1.term查...

贾峰uk
8分钟前
0
0
java 复制对象有哪些方式

java 复制对象有哪些方式 Apache的 Common beanutils库 org.apache.commons.beanutils.BeanUtils.copyProperties(dest,origin); Springframework 的BeanUtil 依赖: <dependency> ......

黄威
24分钟前
1
0
jstack的简单使用

公司测试反应, 一个java应用的机器,cpu始终是30%, 即使不做交易, 于是想到了之前看到的jstack文章, 实践步骤记录一下: 1, 找出java应用的进程号 ps -ef|grep 应用名|grep -v grep 2, 找出pid...

零二一七
31分钟前
1
0
崛起于Springboot2.X之项目war打包部署(18)

将springboot项目打包步骤: 1、启动类 extends SpringBootServletInitializer 2、启动类添加覆盖方法 @Overrideprotected SpringApplicationBuilder configure(SpringApplicationBuilder......

木九天
40分钟前
2
0
导入CSV文件就行数据整理分析

#-*-coding:utf-8-*-import csv,os,re,mathlocalPath=input("请输入所有群文件的根目录:") #所有QQ群文件的物理根目录路径def info(): info_dic=[] dirList=os.listdi...

Kefy
46分钟前
3
0
CoreText进阶(六)-内容大小计算和自动布局

CoreText进阶(六)-内容大小计算和自动布局 其它文章: CoreText 入门(一)-文本绘制 CoreText入门(二)-绘制图片 CoreText进阶(三)-事件处理 CoreText进阶(四)-文字行数限制和显示更...

aron1992
47分钟前
1
0
一个Unity高人的博客,涉猎范围很广,深度也很深。

https://blog.csdn.net/ecidevilin/article/list/

爽歪歪ES
49分钟前
0
0
Spring Cloud Config-Git后端

EnvironmentRepository的默认实现使用Git后端,这对于管理升级和物理环境以及审核更改非常方便。要更改存储库的位置,可以在Config Server中设置“spring.cloud.config.server.git.uri”配置...

itcloud
51分钟前
1
0
centos7 卸载mysql

[root@zyf ~]# rpm -qa|grep -i mysqlmysql-community-libs-5.6.34-2.el7.x86_64mysql-community-server-5.6.34-2.el7.x86_64mysql-community-release-el7-5.noarchmysql-community-......

Yao--靠自己
59分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部