文档章节

RDD编程

ksfzhaohui
 ksfzhaohui
发布于 2015/12/22 20:46
字数 1835
阅读 200
收藏 2
1.定义

Resilient Distributed Dataset (RDD) 弹性分布式数据集

Spark中的RDD,简单来说就是所有对象的一个不可变的分布式集合。每个RDD都被分割为多个分区,这就可以在集群的不同节点上进行计算。RDD可以包含任何Python,Java,Scala对象类型,包括用户自定义类型。

2.基础

在Spark 中,所有的工作都被表达为创建新RDD,对已存在的RDD做变换(1.变换),或者对RDD调用某些操作来计算得到一个结果(2.动作)

每个Spark程序或者 shell 会话都是像这样工作:
1. 从外部数据创建一些作为输入的RDD
2. 使用类似filter()之类的变换来定义出新的RDD    --变换
3. 要求Spark对需要重用的任何中间 RDD进行 persist()  注:每次你执行个动作,Spark的RDD默认会被重新计算
4. 启动类似count()和 first()的动作开始并行计算,然后 Spark 会优化并执行    --动作

注:在任何时候你都可以定义一个新的RDD,但是Spark总是以一种lazy的方式计算它们,也就是它们被第一次用于动作的时候。

说明:总是重新计算一个RDD的能力事实上就是为什么RDD 被称为“弹性”的原因。当拥有RDD数据的机
器发生故障,Spark就利用这个能力重新计算丢失的分区,这对用户来说是透明的。

3.创建RDD

1.普通数组创建出来

SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.parallelize(Arrays.asList("pandas","i like pandas"));
System.out.println(lines.count());

2.加载一个外部数据集

SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D:/systemInfo.log");
System.out.println(lines.count());
4.RDD操作

RDD支持两种类型的操作:变换(transformation)和动作(action)

变换:对一个RDD进行操作得到一个新的RDD,如map()和filter()
动作:是向应用程序返回值,或向存储系统导出数据的那些操作,如count()和first()

SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D:/systemInfo.log");
JavaRDD<String> errsRDD = lines.filter(new Function<String, Boolean>() {
	private static final long serialVersionUID = 1L;
		public Boolean call(String x) {
                        System.out.println("RDD变换计算");
			return x.contains("Exception");
		}
	});
System.out.println(lines.count());
System.out.println(errsRDD.count());

注:RDD的变换计算是会延迟的,直到你在一个动作中用到
以上例子中我们可以先注释掉2条println,此时是不会输出"RDD变换计算"的,只有在执行action如count()时才会输出"RDD变换计算"

5.传递函数
在Java中,函数是实现了org.apache.spark.api.java包中的Spark函数接口的对象

函数名 方法 用法
Function<T, R> 
R call(T)  
一个输入一个输出,用于map(),filter()之类的操作
Function2<T1, T2, R>
R call(T1, T2)
两个输入一个输出,用于aggregate(),fold()之类的操作
FlatMapFunction<T, R>
Iterable<R> call(T) 
一个输入零个或多个输出,用于 flagMap()之类的操作

注:在Java8中,你也可以用lambda来简洁的实现函数接口
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D:/systemInfo.log");
JavaRDD<String> errors = lines.filter(new Contains("Exception"));
System.out.println(errors.count());
class Contains implements Function<String, Boolean> {
	private static final long serialVersionUID = 1L;
	private String query;

	public Contains(String query) {
		this.query = query;
	}

	@Override
	public Boolean call(String v1) throws Exception {
		return v1.contains(query);
	}
}
6.常见的变换和动作
变换
两个最常见的变换是map()和filter(),二者的区别:返回的结果是新的值还是原有的值
map():返回结果就是变换后的每个元素构成的新RDD 
filter():返回的是该RDD中仅能通过该函数的元素构成的新RDD

通过查看接口也可以知道:map(Function<T,R>),filter(Function<T,Boolean>)
如果希望每个输入元素产生多个输出元素,这个操作叫做flatMap(FlatMapFunction<T,U>)

看一个flatMap的例子:
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.parallelize(Arrays.asList("hello world","hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
		private static final long serialVersionUID = 1L;
		public Iterable<String> call(String line) {
			return Arrays.asList(line.split(" "));
		}
});
System.out.println(StringUtils.join(words.collect(), ","));

结果:hello,world,hi    --hello world实现了产生多个输出元素

对应map的实例:


JavaRDD<String[]> words = lines.map(new Function<String, String[]>() {
	@Override
	public String[] call(String v1) throws Exception {
	        return v1.split(" ");
	}
});
RDD支持许多数学集合操作  
对包含{1, 2, 3}和{3, 4, 5}的两个 RDD 进行变换
函数名 描述 结果
union()
生成一个包含两个RDD中所有元素的RDD  
{1, 2, 3, 3, 4, 5}
intersection()
生成两个RDD中都有的元素组成的 RDD  
{3}
subtract() 
从一个RDD中去掉另一个RDD中存在的元素
{1, 2}
cartesian() 
生成两个RDD的笛卡尔积的 
{(1, 3), (1, 4), ..., (3,5)}

动作

以上例子中已经列出了一些动作,看看还有其他哪些常用的

collect()
返回RDD中的所有元素
count() 
返回RDD中元素个数 
countByValue()
RDD中每个元素出现的次数 
take(num) 
返回 RDD 中的 num个元素 
top(num)
返回 RDD 中前 num个元素
takeOrdered(num)(ording) 
返回RDD中基于给定顺序的num个元素
takeSample(withReplacement,num,[seed]) 
随机返回RDD中的num个元素
reduce(func) 
并行合并 RDD 中的元素(比如求和) 
fold(func) 
和reduce()一样,但是提供了一个初值  
aggregate(zeroValue)(seqOp, combOp)
类似 reduce(),但是用于返回不同的类型 
foreach(func)
对RDD中的每个元素应用函数func

7.持久化-persist()

每次对RDD执行动作时,Spark都会重新计算这个RDD和所有依赖的RDD,为避免多次计算同一个RDD,可以缓存该数据;当Spark缓存该RDD时,计算该RDD的节点都会保存它们的分区。如果缓存了该数据的节点出错了,Spark会在需要的时候重新计算丢失的分区

Spark 有多个级别的持久策略可选择

级别 空间占用 cpu 在内存 在硬盘 描述
MEMORY_ONLY


是  
MEMORY_ONLY_SE





MEMORY_AND_DISK

有时
有时
如果数据太多不能放在内存里,则溢出到磁盘
MEMORY_AND_DISK_SER


有时
有时
如果数据太多不能放在内存里,则溢出到磁盘。内存中的数据表现为序列化。
DISK_ONLY






如果你试图缓存太多的数据,当超出了内存,Spark会使用 LRU 缓存策略丢弃旧的分区。对于 memory-only 存储级别,Spark 会在需要访问数据时重新计算;而对于memory-and-disk级别,会将数据写到磁盘。无论哪种方式,你都不用担心是否缓存态度数据会使任务停止。然而,不必要的缓存数据会导致有用的数据
被丢弃而进行过多的计算。

看一个例子:
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1, 2));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
	private static final long serialVersionUID = 1L;
	public Integer call(Integer x) {
		System.err.println("重新计算:" + x);
		return x * x;
	}
});
result.persist(StorageLevel.MEMORY_ONLY());
System.out.println(result.count());
System.out.println(result.count());
当我们注释掉result.persist...
重新计算:1
重新计算:2
2
重新计算:1
重新计算:2
2
发现2个count,RDD变换计算了2次

启用persist:

重新计算:1
重新计算:2
2
2

--以上是Leaning Spark的笔记

© 著作权归作者所有

共有 人打赏支持
上一篇: Eclipse翻译插件
ksfzhaohui

ksfzhaohui

粉丝 371
博文 140
码字总数 191478
作品 3
南京
高级程序员
私信 提问
加载中

评论(3)

lnwazg
lnwazg

引用来自“抗争的诗人”的评论

太抽象。我只对MapReduce有所了解。RDD和MapReduce有没有联系呢?

引用来自“ksfzhaohui”的评论

MapReduce和RDD都是hadoop的两种计算引擎
soga
ksfzhaohui
ksfzhaohui

引用来自“抗争的诗人”的评论

太抽象。我只对MapReduce有所了解。RDD和MapReduce有没有联系呢?
MapReduce和RDD都是hadoop的两种计算引擎
lnwazg
lnwazg
太抽象。我只对MapReduce有所了解。RDD和MapReduce有没有联系呢?
spark RDD解读---第二谈

第二章:弹性分布式数据集 2.1 RDD的特性和目标:RDD提供良好的编程能力,并保持:自动位置感性性调度、自动容错、可伸缩性。同时具备工作集的良好的描述能力。目前数据恢复分为两部分:检查...

hhh666
2017/10/27
0
0
Spark RDD概念学习系列之RDD的缺点(二)

RDD的缺点? 为了有效地实现容错。 RDD提供了一种高度受限的共享内存,即RDD是只读的,并且,只能通过其他RDD上的批量操作来创建。   RDD只支持粗粒度转换,限制了编程模型。   但RDD仍然...

chenkangyao
2017/10/26
0
0
Spark RDD的分区

通过网络资源,整理出Spark RDD分区的一些总结。 分区是为了更好的利用集群中的众多CPU,提高并行度。 实际分区应该考虑处理问题的类型,如果是IO密集型,考虑等待的时间,每个CPU上对应的分...

开源中国驻成都办事处
2016/03/14
1K
0
【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第3节①

一、以RDD为基石的Spark编程模型 在Spark中一切都是基于RDD的: 什么是RDD呢?官方给出的解释是: 也就是说每个RDD都至少有以下三个函数实现: Spark自带了非常多的RDD: RDD主要分为两种: 其...

Spark亚太研究院
2014/12/29
0
0
零基础入门大数据挖掘之spark的rdd

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/on2way/article/details/84348172 本节简单介绍一下spark下的基本数据结构RDD,方便理解后续的更多操作。 那么...

我i智能
2018/11/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

大数据教程(11.9)hive操作基础知识

上一篇博客分享了hive的简介和初体验,本节博主将继续分享一些hive的操作的基础知识。 DDL操作 (1)创建表 #建表语法CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name [(col_name ...

em_aaron
今天
2
0
OSChina 周四乱弹 —— 我家猫真会后空翻

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @我没有抓狂 :#今天听这个# 我艇牛逼,百听不厌,太好听辣 分享 Led Zeppelin 的歌曲《Stairway To Heaven》 《Stairway To Heaven》- Led Z...

小小编辑
今天
2
0
node调用dll

先安装python2.7 安装node-gyp cnpm install node-gyp -g 新建一个Electron-vue项目(案例用Electron-vue) vue init simulatedgreg/electron-vue my-project 安装electron-rebuild cnpm ins......

Chason-洪
今天
3
0
scala学习(一)

学习Spark之前需要学习Scala。 参考学习的书籍:快学Scala

柠檬果过
今天
3
0
通俗易懂解释网络工程中的技术,如STP,HSRP等

导读 在面试时,比如被问到HSRP的主备切换时间时多久,STP几个状态的停留时间,自己知道有这些东西,但在工作中不会经常用到,就老是记不住,觉得可能还是自己基础不够牢固,知识掌握不够全面...

问题终结者
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部