文档章节

Spark学习笔记-RDD简单算子

Endless2010
 Endless2010
发布于 2017/07/19 20:40
字数 1218
阅读 15
收藏 0

collect

返回RDD的所有元素

scala> var input=sc.parallelize(Array(-1,0,1,2,2))
input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:27
scala> var result=input.collect
result: Array[Int] = Array(-1, 0, 1, 2, 2)

count,coutByValue

count返回RDD的元素数量,countByValue返回每个值的出现次数

scala> var input=sc.parallelize(Array(-1,0,1,2,2))
scala> var result=input.count
result: Long = 5
scala> var result=input.countByValue
result: scala.collection.Map[Int,Long] = Map(0 -> 1, 1 -> 1, 2 -> 2, -1 -> 1)

take,top,takeOrdered

take返回RDD的前N个元素 takeOrdered默认返回升序排序的前N个元素,可以指定排序算法 Top返回降序排序的前N个元素

var input=sc.parallelize(Array(1,2,3,4,9,8,7,5,6))

scala> var result=input.take(6)
result: Array[Int] = Array(1, 2, 3, 4, 9, 8)
scala> var result=input.take(20)
result: Array[Int] = Array(1, 2, 3, 4, 9, 8, 7, 5, 6)

scala> var result=input.takeOrdered(6)
result: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> var result=input.takeOrdered(6)(Ordering[Int].reverse)
result: Array[Int] = Array(9, 8, 7, 6, 5, 4)

scala> var result=input.top(6)
result: Array[Int] = Array(9, 8, 7, 6, 5, 4
)

Filter

传入返回值为boolean的函数,返回改函数结果为true的RDD

scala> var input=sc.parallelize(Array(-1,0,1,2))
scala> var result=input.filter(_>0).collect()
result: Array[Int] = Array(1, 2)

map,flatmap

map对每个元素执行函数,转换为新的RDD,flatMap和map类似,但会把map的返回结果做flat处理,就是把多个Seq的结果拼接成一个Seq输出

scala> var input=sc.parallelize(Array(-1,0,1,2))
scala> var result=input.map(_+1).collect
result: Array[Int] = Array(0, 1, 2, 3)

scala>var result=input.map(x=>x.to(3)).collect
result: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(-1, 0, 1, 2, 3), Range(0, 1, 2, 3), Range(1, 2, 3), Range(2, 3))

scala>var result=input.flatMap(x=>x.to(3)).collect
result: Array[Int] = Array(-1, 0, 1, 2, 3, 0, 1, 2, 3, 1, 2, 3, 2, 3)

distinct

RDD去重

scala>var input=sc.parallelize(Array(-1,0,1,2,2))
scala>var result=input.distinct.collect
result: Array[Int] = Array(0, 1, 2, -1)

Reduce

通过函数聚集RDD中的所有元素

scala> var input=sc.parallelize(Array(-1,0,1,2))
scala> var result=input.reduce((x,y)=>{println(x,y);x+y})
(-1,1)  //处理-1,1,结果为0,RDD剩余元素为{0,2}
(0,2)   //上面的结果为0,在处理0,2,结果为2,RDD剩余元素为{0}
(2,0)   //上面结果为2,再处理(2,0),结果为2,RDD剩余元素为{}
result: Int = 2

sample,takeSample

sample就是从RDD中抽样,第一个参数withReplacement是指是否有放回的抽样,true为放回,为false为不放回,放回就是抽样结果可能重复,第二个参数是fraction,0到1之间的小数,表明抽样的百分比 takeSample类似,但返回类型是Array,第一个参数是withReplacement,第二个参数是样本个数

var rdd=sc.parallelize(1 to 20)

scala> rdd.sample(true,0.5).collect
res33: Array[Int] = Array(6, 8, 13, 15, 17, 17, 17, 18, 20)

scala> rdd.sample(false,0.5).collect
res35: Array[Int] = Array(1, 3, 10, 11, 12, 13, 14, 17, 18)

scala> rdd.sample(true,1).collect
res44: Array[Int] = Array(2, 2, 3, 5, 6, 6, 8, 9, 9, 10, 10, 10, 14, 15, 16, 17, 17, 18, 19, 19, 20, 20)

scala> rdd.sample(false,1).collect
res46: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

scala> rdd.takeSample(true,3)
res1: Array[Int] = Array(1, 15, 19)

scala> rdd.takeSample(false,3)
res2: Array[Int] = Array(7, 16, 6)

collectAsMap,countByKey,lookup

collectAsMap把PairRDD转为Map,如果存在相同的key,后面的会覆盖前面的。 countByKey统计每个key出现的次数 Lookup返回给定key的所有value

scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four")))

scala> var result=input.collectAsMap
result: scala.collection.Map[Int,String] = Map(2 -> two, 4 -> four, 1 -> one, 3 -> three)

scala> var result=input.countByKey
result: scala.collection.Map[Int,Long] = Map(1 -> 2, 2 -> 1, 3 -> 1, 4 -> 1)

scala> var result=input.lookup(1)
result: Seq[String] = WrappedArray(1, one)

scala> var result=input.lookup(2)
result: Seq[String] = WrappedArray(two)

groupBy,keyBy

groupBy根据传入的函数产生的key,形成元素为K-V形式的RDD,然后对key相同的元素分组 keyBy对每个value,为它加上key

scala> var rdd=sc.parallelize(List("A1","A2","B1","B2","C"))
scala> var result=rdd.groupBy(_.substring(0,1)).collect
result: Array[(String, Iterable[String])] = Array((A,CompactBuffer(A1, A2)), (B,CompactBuffer(B1, B2)), (C,CompactBuffer(C)))

scala> var rdd=sc.parallelize(List("hello","world","spark","is","fun"))
scala> var result=rdd.keyBy(_.length).collect
result: Array[(Int, String)] = Array((5,hello), (5,world), (5,spark), (2,is), (3,fun))

keys,values

scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four")))
scala> var result=input.keys.collect
result: Array[Int] = Array(1, 1, 2, 3, 4)
scala> var result=input.values.collect
result: Array[String] = Array(1, one, two, three, four)

mapvalues
mapvalues对K-V形式的RDD的每个Value进行操作
scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four")))
scala> var result=input.mapValues(_*2).collect
result: Array[(Int, String)] = Array((1,11), (1,oneone), (2,twotwo), (3,threethree), (4,fourfour))

union,intersection,subtract,cartesian

union合并2个集合,不去重 subtract将第一个集合中的同时存在于第二个集合的元素去掉 intersection返回2个集合的交集 cartesian返回2个集合的笛卡儿积

scala> var rdd1=sc.parallelize(Array(-1,1,1,2,3))
scala> var rdd2=sc.parallelize(Array(0,1,2,3,4))

scala> var result=rdd1.union(rdd2).collect
result: Array[Int] = Array(-1, 1, 1, 2, 3, 0, 1, 2, 3, 4)

scala> var result=rdd1.intersection(rdd2).collect
result: Array[Int] = Array(1, 2, 3)

scala> var result=rdd1.subtract(rdd2).collect
result: Array[Int] = Array(-1)

scala> var result=rdd1.cartesian(rdd2).collect
result: Array[(Int, Int)] = Array((-1,0), (-1,1), (-1,2), (-1,3), (-1,4), (1,0), (1,1), (1,2), (1,3), (1,4), (1,0), (1,1), (1,2), (1,3), (1,4), (2,0), (2,1), (2,2), (2,3), (2,4), (3,0), (3,1), (3,2), (3,3), (3,4))

© 著作权归作者所有

共有 人打赏支持
Endless2010
粉丝 1
博文 36
码字总数 23027
作品 0
南京
程序员
私信 提问
Spark--大数据的“电光石火”

Spark已正式申请加入Apache孵化器,从灵机一闪的实验室“电火花”成长为大数据技术平台中异军突起的新锐。本文主要讲述Spark的设计思想。Spark如其名,展现了大数据不常见的“电光石火”。具...

一枚Sir
2014/08/15
0
1
Spark 【数据挖掘平台介绍】 - Spark 1.1.0

一:Spark Spark已正式申请加入Apache孵化器,从灵机一闪的实验室“电火花”成长为大数据技术平台中异军突起的新锐。本文主要讲述Spark的设计思想。Spark如其名,展现了大数据不常见的“电光...

止静
2014/09/16
0
0
Spark Core介绍以及架构

应用场景 Apache Spark 是加州大学伯克利分校的 AMP Labs 开发的开源分布式轻量级通用计算框架。由于 Spark 基于内存设计,使得它拥有比 Hadoop 更高的性能(极端情况下可以达到 100x),并且...

wsc449
01/17
0
0
高性能Spark作业基础:你必须知道的调优原则及建议

在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操...

李雪蕤
08/22
0
0
Spark性能优化篇二: 开发调优

Spark性能优化篇二: 开发调优 1、前言 在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、...

u010262291
05/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

[LintCode] Binary Tree Level Order Traversal(二叉树的层次遍历)

描述 给出一棵二叉树,返回其节点值的层次遍历(逐层从左往右访问) 样例 给一棵二叉树 {3,9,20,#,#,15,7} : 3 / \9 20 / \ 15 7 返回他的分层遍历结果: [ [3], [...

honeymose
14分钟前
1
0
Spring Cloud Config 规范

Spring Cloud Config 规范 首先Spring Cloud 是基于 Spring 来扩展的,Spring 本身就提供当创建一个Bean时可从Environment 中将一些属性值通过@Value的形式注入到业务代码中的能力。那Sprin...

阿里云官方博客
23分钟前
1
0
renderer

renderer Function 单元格绘制处理函数,同drawcell事件。

architect刘源源
34分钟前
5
0
jdk1.8 lambda表达式过滤重复的对象

//操作类public class BizAgentAudit {    private String supplierOrgId;    private int age;    private String name;        public BizAgentAudit() { ...

INSISTQIAO
35分钟前
6
0
DataUtil的日期处理

SimpleDateFormat 是线程不安全的类,一般不要定义为 static 变量,如果定义为static,必须加锁,或者使用 DateUtils 工具类。 解决方法 1、将SimpleDateFormat定义成局部变量。 缺点:每调用...

DoLo-lty
39分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部