文档章节

Spark学习笔记-RDD简单算子

Endless2010
 Endless2010
发布于 2017/07/19 20:40
字数 1218
阅读 11
收藏 0
点赞 0
评论 0

collect

返回RDD的所有元素

scala> var input=sc.parallelize(Array(-1,0,1,2,2))
input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:27
scala> var result=input.collect
result: Array[Int] = Array(-1, 0, 1, 2, 2)

count,coutByValue

count返回RDD的元素数量,countByValue返回每个值的出现次数

scala> var input=sc.parallelize(Array(-1,0,1,2,2))
scala> var result=input.count
result: Long = 5
scala> var result=input.countByValue
result: scala.collection.Map[Int,Long] = Map(0 -> 1, 1 -> 1, 2 -> 2, -1 -> 1)

take,top,takeOrdered

take返回RDD的前N个元素 takeOrdered默认返回升序排序的前N个元素,可以指定排序算法 Top返回降序排序的前N个元素

var input=sc.parallelize(Array(1,2,3,4,9,8,7,5,6))

scala> var result=input.take(6)
result: Array[Int] = Array(1, 2, 3, 4, 9, 8)
scala> var result=input.take(20)
result: Array[Int] = Array(1, 2, 3, 4, 9, 8, 7, 5, 6)

scala> var result=input.takeOrdered(6)
result: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> var result=input.takeOrdered(6)(Ordering[Int].reverse)
result: Array[Int] = Array(9, 8, 7, 6, 5, 4)

scala> var result=input.top(6)
result: Array[Int] = Array(9, 8, 7, 6, 5, 4
)

Filter

传入返回值为boolean的函数,返回改函数结果为true的RDD

scala> var input=sc.parallelize(Array(-1,0,1,2))
scala> var result=input.filter(_>0).collect()
result: Array[Int] = Array(1, 2)

map,flatmap

map对每个元素执行函数,转换为新的RDD,flatMap和map类似,但会把map的返回结果做flat处理,就是把多个Seq的结果拼接成一个Seq输出

scala> var input=sc.parallelize(Array(-1,0,1,2))
scala> var result=input.map(_+1).collect
result: Array[Int] = Array(0, 1, 2, 3)

scala>var result=input.map(x=>x.to(3)).collect
result: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(-1, 0, 1, 2, 3), Range(0, 1, 2, 3), Range(1, 2, 3), Range(2, 3))

scala>var result=input.flatMap(x=>x.to(3)).collect
result: Array[Int] = Array(-1, 0, 1, 2, 3, 0, 1, 2, 3, 1, 2, 3, 2, 3)

distinct

RDD去重

scala>var input=sc.parallelize(Array(-1,0,1,2,2))
scala>var result=input.distinct.collect
result: Array[Int] = Array(0, 1, 2, -1)

Reduce

通过函数聚集RDD中的所有元素

scala> var input=sc.parallelize(Array(-1,0,1,2))
scala> var result=input.reduce((x,y)=>{println(x,y);x+y})
(-1,1)  //处理-1,1,结果为0,RDD剩余元素为{0,2}
(0,2)   //上面的结果为0,在处理0,2,结果为2,RDD剩余元素为{0}
(2,0)   //上面结果为2,再处理(2,0),结果为2,RDD剩余元素为{}
result: Int = 2

sample,takeSample

sample就是从RDD中抽样,第一个参数withReplacement是指是否有放回的抽样,true为放回,为false为不放回,放回就是抽样结果可能重复,第二个参数是fraction,0到1之间的小数,表明抽样的百分比 takeSample类似,但返回类型是Array,第一个参数是withReplacement,第二个参数是样本个数

var rdd=sc.parallelize(1 to 20)

scala> rdd.sample(true,0.5).collect
res33: Array[Int] = Array(6, 8, 13, 15, 17, 17, 17, 18, 20)

scala> rdd.sample(false,0.5).collect
res35: Array[Int] = Array(1, 3, 10, 11, 12, 13, 14, 17, 18)

scala> rdd.sample(true,1).collect
res44: Array[Int] = Array(2, 2, 3, 5, 6, 6, 8, 9, 9, 10, 10, 10, 14, 15, 16, 17, 17, 18, 19, 19, 20, 20)

scala> rdd.sample(false,1).collect
res46: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

scala> rdd.takeSample(true,3)
res1: Array[Int] = Array(1, 15, 19)

scala> rdd.takeSample(false,3)
res2: Array[Int] = Array(7, 16, 6)

collectAsMap,countByKey,lookup

collectAsMap把PairRDD转为Map,如果存在相同的key,后面的会覆盖前面的。 countByKey统计每个key出现的次数 Lookup返回给定key的所有value

scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four")))

scala> var result=input.collectAsMap
result: scala.collection.Map[Int,String] = Map(2 -> two, 4 -> four, 1 -> one, 3 -> three)

scala> var result=input.countByKey
result: scala.collection.Map[Int,Long] = Map(1 -> 2, 2 -> 1, 3 -> 1, 4 -> 1)

scala> var result=input.lookup(1)
result: Seq[String] = WrappedArray(1, one)

scala> var result=input.lookup(2)
result: Seq[String] = WrappedArray(two)

groupBy,keyBy

groupBy根据传入的函数产生的key,形成元素为K-V形式的RDD,然后对key相同的元素分组 keyBy对每个value,为它加上key

scala> var rdd=sc.parallelize(List("A1","A2","B1","B2","C"))
scala> var result=rdd.groupBy(_.substring(0,1)).collect
result: Array[(String, Iterable[String])] = Array((A,CompactBuffer(A1, A2)), (B,CompactBuffer(B1, B2)), (C,CompactBuffer(C)))

scala> var rdd=sc.parallelize(List("hello","world","spark","is","fun"))
scala> var result=rdd.keyBy(_.length).collect
result: Array[(Int, String)] = Array((5,hello), (5,world), (5,spark), (2,is), (3,fun))

keys,values

scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four")))
scala> var result=input.keys.collect
result: Array[Int] = Array(1, 1, 2, 3, 4)
scala> var result=input.values.collect
result: Array[String] = Array(1, one, two, three, four)

mapvalues
mapvalues对K-V形式的RDD的每个Value进行操作
scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four")))
scala> var result=input.mapValues(_*2).collect
result: Array[(Int, String)] = Array((1,11), (1,oneone), (2,twotwo), (3,threethree), (4,fourfour))

union,intersection,subtract,cartesian

union合并2个集合,不去重 subtract将第一个集合中的同时存在于第二个集合的元素去掉 intersection返回2个集合的交集 cartesian返回2个集合的笛卡儿积

scala> var rdd1=sc.parallelize(Array(-1,1,1,2,3))
scala> var rdd2=sc.parallelize(Array(0,1,2,3,4))

scala> var result=rdd1.union(rdd2).collect
result: Array[Int] = Array(-1, 1, 1, 2, 3, 0, 1, 2, 3, 4)

scala> var result=rdd1.intersection(rdd2).collect
result: Array[Int] = Array(1, 2, 3)

scala> var result=rdd1.subtract(rdd2).collect
result: Array[Int] = Array(-1)

scala> var result=rdd1.cartesian(rdd2).collect
result: Array[(Int, Int)] = Array((-1,0), (-1,1), (-1,2), (-1,3), (-1,4), (1,0), (1,1), (1,2), (1,3), (1,4), (1,0), (1,1), (1,2), (1,3), (1,4), (2,0), (2,1), (2,2), (2,3), (2,4), (3,0), (3,1), (3,2), (3,3), (3,4))

© 著作权归作者所有

共有 人打赏支持
Endless2010
粉丝 1
博文 25
码字总数 23027
作品 0
南京
程序员
Spark性能优化篇二: 开发调优

Spark性能优化篇二: 开发调优 1、前言 在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、...

u010262291 ⋅ 05/30 ⋅ 0

Spark作业基本运行原理解析

欢迎关注天善智能,我们是专注于商业智能BI,人工智能AI,大数据分析与挖掘领域的垂直社区,学习,问答、求职一站式搞定! 对商业智能BI、大数据分析挖掘、机器学习,python,R等数据领域感兴...

天善智能 ⋅ 06/14 ⋅ 0

Spark作业基本运行原理解析!

1、基本原理 Spark作业的运行基本原理如下图所示: 我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。提交作业的节点称为Master节点,Driver进程就是开始...

石晓文 ⋅ 06/07 ⋅ 0

Spark笔记:RDD基本操作(上)

本文主要是讲解spark里RDD的基础操作。RDD是spark特有的数据模型,谈到RDD就会提到什么弹性分布式数据集,什么有向无环图,本文暂时不去展开这些高深概念,在阅读本文时候,大家可以就把RDD...

jingyang07 ⋅ 05/07 ⋅ 0

『 Spark 』3. spark 编程模式

写在前面 本系列是综合了自己在学习spark过程中的理解记录 + 对参考文章中的一些理解 + 个人实践spark过程中的一些心得而来。写这样一个系列仅仅是为了梳理个人学习spark的笔记记录,所以一...

董黎明 ⋅ 06/11 ⋅ 0

Apache Flink和Apache Spark有什么异同?它们的发展前景分别怎样?

============================= object WordCount { def main(args: Array[String]) {val env = new SparkContext("local","wordCount")val data = List("hi","how are you","hi")val dataSe......

justlpf ⋅ 05/12 ⋅ 0

Spark性能优化篇三:数据倾斜调优

Spark性能优化篇三:数据倾斜调优 前言 1.数据倾斜调优 调优概述 数据倾斜发生时的现象 数据倾斜发生的原理 如何定位导致数据倾斜的代码 某个task执行特别慢的情况 某个task莫名其妙内存溢出...

u010262291 ⋅ 05/30 ⋅ 0

Spark笔记:RDD基本操作(下)

上一篇里我提到可以把RDD当作一个数组,这样我们在学习spark的API时候很多问题就能很好理解了。上篇文章里的API也都是基于RDD是数组的数据模型而进行操作的。   Spark是一个计算框架,是对...

jingyang07 ⋅ 05/07 ⋅ 0

Spark基本工作原理与RDD及wordcount程序实例和原理深度剖析

RDD以及其特点 1、RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。 2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每...

qq1137623160 ⋅ 05/10 ⋅ 0

Spark及Spark Streaming核心原理及实践

  【IT168 技术】Spark 已经成为广告、报表以及推荐系统等大数据计算场景中首选系统,因效率高,易用以及通用性越来越得到大家的青睐,我自己最近半年在接触spark以及spark streaming之后,...

中国大数据 ⋅ 05/31 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

JavaScript零基础入门——(十)JavaScript的DOM基础

JavaScript零基础入门——(十)JavaScript的DOM基础 欢迎大家回到我们的JavaScript零基础入门,上一节课,我们了解了JavaScript中的函数,这一节课,我们来了解一下JavaScript的DOM。 第一节...

JandenMa ⋅ 35分钟前 ⋅ 0

Spring mvc DispatchServlet 实现原理

在Spring中, ContextLoaderListener只是辅助类,在web 容器启动的时候查找并创建WebApplicationContext对象,通过该对象进行加载spring的配置文件。而真正的逻辑实现其实是在DispatcherSer...

轨迹_ ⋅ 46分钟前 ⋅ 0

Weex起步

本教程假设你已经在你的本地环境安装了node 其实weex起步教程在 https://github.com/lilugirl/incubator-weex 项目说明文件中都已经有了,但为了有些同学看到英文秒变文盲,所以这里我重新写...

lilugirl ⋅ 54分钟前 ⋅ 0

Jenkins实践1 之安装

1 下载 http://mirrors.jenkins.io/war/latest/jenkins.war 2 启动 java -jar jenkins.war 前提:安装jdk并配置环境变量 启动结果节选: ************************************************......

晨猫 ⋅ 今天 ⋅ 0

组合数学 1-2000 中,能被6或10整除的数的个数

1--2000 中,能被6或10整除的数的个数 利用集合的性质 能被6整除的个数 2000/6 = 333 能被10整除的个数 2000/10 = 200 能被6和10整除的个数 2000/30 = 66 能被6或10整除的个数 333+200-66 =...

阿豪boy ⋅ 今天 ⋅ 0

一篇文章学懂Shell脚本

Shell脚本,就是利用Shell的命令解释的功能,对一个纯文本的文件进行解析,然后执行这些功能,也可以说Shell脚本就是一系列命令的集合。 Shell可以直接使用在win/Unix/Linux上面,并且可以调用...

Jake_xun ⋅ 今天 ⋅ 0

大数据工程师需要精通算法吗,要达到一个什么程度呢?

机器学习是人工智能的一个重要分支,而机器学习下最重要的就是算法,本文讲述归纳了入门级的几个机器学习算法,加大数据学习群:716581014一起加入AI技术大本营。 1、监督学习算法 这个算法由...

董黎明 ⋅ 今天 ⋅ 0

Kylin 对维度表的的要求

1.要具有数据一致性,主键值必须是唯一的;Kylin 会进行检查,如果有两行的主键值相同则会报错。 2.维度表越小越好,因为 Kylin 会将维度表加载到内存中供查询;过大的表不适合作为维度表,默...

无精疯 ⋅ 今天 ⋅ 0

58到家数据库30条军规解读

军规适用场景:并发量大、数据量大的互联网业务 军规:介绍内容 解读:讲解原因,解读比军规更重要 一、基础规范 (1)必须使用InnoDB存储引擎 解读:支持事务、行级锁、并发性能更好、CPU及...

kim_o ⋅ 今天 ⋅ 0

代码注释中顺序更改 文件读写换行

`package ssh; import com.xxx.common.log.LogFactory; import com.xxx.common.log.LoggerUtil; import org.apache.commons.lang3.StringUtils; import java.io.*; public class DirErgodic ......

林伟琨 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部