文档章节

RDD编程

ksfzhaohui
 ksfzhaohui
发布于 2015/12/22 20:46
字数 1835
阅读 189
收藏 2
1.定义

Resilient Distributed Dataset (RDD) 弹性分布式数据集

Spark中的RDD,简单来说就是所有对象的一个不可变的分布式集合。每个RDD都被分割为多个分区,这就可以在集群的不同节点上进行计算。RDD可以包含任何Python,Java,Scala对象类型,包括用户自定义类型。

2.基础

在Spark 中,所有的工作都被表达为创建新RDD,对已存在的RDD做变换(1.变换),或者对RDD调用某些操作来计算得到一个结果(2.动作)

每个Spark程序或者 shell 会话都是像这样工作:
1. 从外部数据创建一些作为输入的RDD
2. 使用类似filter()之类的变换来定义出新的RDD    --变换
3. 要求Spark对需要重用的任何中间 RDD进行 persist()  注:每次你执行个动作,Spark的RDD默认会被重新计算
4. 启动类似count()和 first()的动作开始并行计算,然后 Spark 会优化并执行    --动作

注:在任何时候你都可以定义一个新的RDD,但是Spark总是以一种lazy的方式计算它们,也就是它们被第一次用于动作的时候。

说明:总是重新计算一个RDD的能力事实上就是为什么RDD 被称为“弹性”的原因。当拥有RDD数据的机
器发生故障,Spark就利用这个能力重新计算丢失的分区,这对用户来说是透明的。

3.创建RDD

1.普通数组创建出来

SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.parallelize(Arrays.asList("pandas","i like pandas"));
System.out.println(lines.count());

2.加载一个外部数据集

SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D:/systemInfo.log");
System.out.println(lines.count());
4.RDD操作

RDD支持两种类型的操作:变换(transformation)和动作(action)

变换:对一个RDD进行操作得到一个新的RDD,如map()和filter()
动作:是向应用程序返回值,或向存储系统导出数据的那些操作,如count()和first()

SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D:/systemInfo.log");
JavaRDD<String> errsRDD = lines.filter(new Function<String, Boolean>() {
	private static final long serialVersionUID = 1L;
		public Boolean call(String x) {
                        System.out.println("RDD变换计算");
			return x.contains("Exception");
		}
	});
System.out.println(lines.count());
System.out.println(errsRDD.count());

注:RDD的变换计算是会延迟的,直到你在一个动作中用到
以上例子中我们可以先注释掉2条println,此时是不会输出"RDD变换计算"的,只有在执行action如count()时才会输出"RDD变换计算"

5.传递函数
在Java中,函数是实现了org.apache.spark.api.java包中的Spark函数接口的对象

函数名 方法 用法
Function<T, R> 
R call(T)  
一个输入一个输出,用于map(),filter()之类的操作
Function2<T1, T2, R>
R call(T1, T2)
两个输入一个输出,用于aggregate(),fold()之类的操作
FlatMapFunction<T, R>
Iterable<R> call(T) 
一个输入零个或多个输出,用于 flagMap()之类的操作

注:在Java8中,你也可以用lambda来简洁的实现函数接口
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D:/systemInfo.log");
JavaRDD<String> errors = lines.filter(new Contains("Exception"));
System.out.println(errors.count());
class Contains implements Function<String, Boolean> {
	private static final long serialVersionUID = 1L;
	private String query;

	public Contains(String query) {
		this.query = query;
	}

	@Override
	public Boolean call(String v1) throws Exception {
		return v1.contains(query);
	}
}
6.常见的变换和动作
变换
两个最常见的变换是map()和filter(),二者的区别:返回的结果是新的值还是原有的值
map():返回结果就是变换后的每个元素构成的新RDD 
filter():返回的是该RDD中仅能通过该函数的元素构成的新RDD

通过查看接口也可以知道:map(Function<T,R>),filter(Function<T,Boolean>)
如果希望每个输入元素产生多个输出元素,这个操作叫做flatMap(FlatMapFunction<T,U>)

看一个flatMap的例子:
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.parallelize(Arrays.asList("hello world","hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
		private static final long serialVersionUID = 1L;
		public Iterable<String> call(String line) {
			return Arrays.asList(line.split(" "));
		}
});
System.out.println(StringUtils.join(words.collect(), ","));

结果:hello,world,hi    --hello world实现了产生多个输出元素

对应map的实例:


JavaRDD<String[]> words = lines.map(new Function<String, String[]>() {
	@Override
	public String[] call(String v1) throws Exception {
	        return v1.split(" ");
	}
});
RDD支持许多数学集合操作  
对包含{1, 2, 3}和{3, 4, 5}的两个 RDD 进行变换
函数名 描述 结果
union()
生成一个包含两个RDD中所有元素的RDD  
{1, 2, 3, 3, 4, 5}
intersection()
生成两个RDD中都有的元素组成的 RDD  
{3}
subtract() 
从一个RDD中去掉另一个RDD中存在的元素
{1, 2}
cartesian() 
生成两个RDD的笛卡尔积的 
{(1, 3), (1, 4), ..., (3,5)}

动作

以上例子中已经列出了一些动作,看看还有其他哪些常用的

collect()
返回RDD中的所有元素
count() 
返回RDD中元素个数 
countByValue()
RDD中每个元素出现的次数 
take(num) 
返回 RDD 中的 num个元素 
top(num)
返回 RDD 中前 num个元素
takeOrdered(num)(ording) 
返回RDD中基于给定顺序的num个元素
takeSample(withReplacement,num,[seed]) 
随机返回RDD中的num个元素
reduce(func) 
并行合并 RDD 中的元素(比如求和) 
fold(func) 
和reduce()一样,但是提供了一个初值  
aggregate(zeroValue)(seqOp, combOp)
类似 reduce(),但是用于返回不同的类型 
foreach(func)
对RDD中的每个元素应用函数func

7.持久化-persist()

每次对RDD执行动作时,Spark都会重新计算这个RDD和所有依赖的RDD,为避免多次计算同一个RDD,可以缓存该数据;当Spark缓存该RDD时,计算该RDD的节点都会保存它们的分区。如果缓存了该数据的节点出错了,Spark会在需要的时候重新计算丢失的分区

Spark 有多个级别的持久策略可选择

级别 空间占用 cpu 在内存 在硬盘 描述
MEMORY_ONLY


是  
MEMORY_ONLY_SE





MEMORY_AND_DISK

有时
有时
如果数据太多不能放在内存里,则溢出到磁盘
MEMORY_AND_DISK_SER


有时
有时
如果数据太多不能放在内存里,则溢出到磁盘。内存中的数据表现为序列化。
DISK_ONLY






如果你试图缓存太多的数据,当超出了内存,Spark会使用 LRU 缓存策略丢弃旧的分区。对于 memory-only 存储级别,Spark 会在需要访问数据时重新计算;而对于memory-and-disk级别,会将数据写到磁盘。无论哪种方式,你都不用担心是否缓存态度数据会使任务停止。然而,不必要的缓存数据会导致有用的数据
被丢弃而进行过多的计算。

看一个例子:
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1, 2));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
	private static final long serialVersionUID = 1L;
	public Integer call(Integer x) {
		System.err.println("重新计算:" + x);
		return x * x;
	}
});
result.persist(StorageLevel.MEMORY_ONLY());
System.out.println(result.count());
System.out.println(result.count());
当我们注释掉result.persist...
重新计算:1
重新计算:2
2
重新计算:1
重新计算:2
2
发现2个count,RDD变换计算了2次

启用persist:

重新计算:1
重新计算:2
2
2

--以上是Leaning Spark的笔记

© 著作权归作者所有

共有 人打赏支持
ksfzhaohui

ksfzhaohui

粉丝 312
博文 130
码字总数 165059
作品 3
南京
高级程序员
加载中

评论(3)

抗争的诗人
抗争的诗人

引用来自“抗争的诗人”的评论

太抽象。我只对MapReduce有所了解。RDD和MapReduce有没有联系呢?

引用来自“ksfzhaohui”的评论

MapReduce和RDD都是hadoop的两种计算引擎
soga
ksfzhaohui
ksfzhaohui

引用来自“抗争的诗人”的评论

太抽象。我只对MapReduce有所了解。RDD和MapReduce有没有联系呢?
MapReduce和RDD都是hadoop的两种计算引擎
抗争的诗人
抗争的诗人
太抽象。我只对MapReduce有所了解。RDD和MapReduce有没有联系呢?
spark RDD解读---第二谈

第二章:弹性分布式数据集 2.1 RDD的特性和目标:RDD提供良好的编程能力,并保持:自动位置感性性调度、自动容错、可伸缩性。同时具备工作集的良好的描述能力。目前数据恢复分为两部分:检查...

hhh666
2017/10/27
0
0
Spark RDD概念学习系列之RDD的缺点(二)

RDD的缺点? 为了有效地实现容错。 RDD提供了一种高度受限的共享内存,即RDD是只读的,并且,只能通过其他RDD上的批量操作来创建。   RDD只支持粗粒度转换,限制了编程模型。   但RDD仍然...

chenkangyao
2017/10/26
0
0
2.sparkSQL--DataFrames与RDDs的相互转换

Spark SQL支持两种RDDs转换为DataFrames的方式 使用反射获取RDD内的Schema 当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。 通过编程接口指定Schema 通过S...

刘洋intsmaze
2017/03/25
0
0
Spark RDD的分区

通过网络资源,整理出Spark RDD分区的一些总结。 分区是为了更好的利用集群中的众多CPU,提高并行度。 实际分区应该考虑处理问题的类型,如果是IO密集型,考虑等待的时间,每个CPU上对应的分...

开源中国驻成都办事处
2016/03/14
1K
0
Spark(三):弹性分布式数据集(RDD)

一:Scala Scala 是一门现代的多范式编程语言,志在以简练、优雅及类型安全的方式来表达常用编程模式。它平滑地集成了面向对象和函数语言的特性。Scala 运行于 Java 平台(JVM,Java 虚拟机)...

牧羊人Berg
2016/06/02
350
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

go语言学习总结

一、描述 go语言是直接将源码编译成二进制机器码的语言;它支持面向对象、也支持函数式编程;支持并发很容易; 二、基本语法学习路径 https://studygolang.com/subject/2...

盼望明天
15分钟前
1
0
JSP 九大内置对象及其作用域

JSP 中一共预先定义了 9 个这样的对象,分别为:request、response、session、application、out、pagecontext、config、page、exception ,下面就简单介绍下。 1、request 对象 request 对象...

几个栗子
26分钟前
0
0
Java中的坑之方括号

Java中的坑之方括号 这一段时间,在做项目的时候,发现了一个坑,这个坑说大不大,说小不小,不知道的足够喝一壶,知道的就可以轻松解决。 问题描述 在做数据统计的时候,遇见了如下形式的数...

星汉
36分钟前
1
0
[雪峰磁针石博客]python机器学习、web开发等书籍汇总

Building Machine Learning Systems with Python Third Edition - 2018.pdf 下载地址 Get more from your data by creating practical machine learning systems with Python Key Features ......

python测试开发人工智能安全
47分钟前
1
0
文件的压缩与解压(linux)

Linux下*.tar.gz文件解压缩命令 1.压缩命令:   命令格式:tar -zcvf 压缩后文件名.tar.gz 被压缩文件名 可先切换到当前目录下。压缩文件名和被压缩文件名都可加入路径。 2.解压缩命令: ...

qimh
今天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部