RDD编程

原创
2015/12/22 20:46
阅读数 1.2K
AI总结
1.定义

Resilient Distributed Dataset (RDD) 弹性分布式数据集

Spark中的RDD,简单来说就是所有对象的一个不可变的分布式集合。每个RDD都被分割为多个分区,这就可以在集群的不同节点上进行计算。RDD可以包含任何Python,Java,Scala对象类型,包括用户自定义类型。

2.基础

在Spark 中,所有的工作都被表达为创建新RDD,对已存在的RDD做变换(1.变换),或者对RDD调用某些操作来计算得到一个结果(2.动作)

每个Spark程序或者 shell 会话都是像这样工作:
1. 从外部数据创建一些作为输入的RDD
2. 使用类似filter()之类的变换来定义出新的RDD    --变换
3. 要求Spark对需要重用的任何中间 RDD进行 persist()  注:每次你执行个动作,Spark的RDD默认会被重新计算
4. 启动类似count()和 first()的动作开始并行计算,然后 Spark 会优化并执行    --动作

注:在任何时候你都可以定义一个新的RDD,但是Spark总是以一种lazy的方式计算它们,也就是它们被第一次用于动作的时候。

说明:总是重新计算一个RDD的能力事实上就是为什么RDD 被称为“弹性”的原因。当拥有RDD数据的机
器发生故障,Spark就利用这个能力重新计算丢失的分区,这对用户来说是透明的。

3.创建RDD

1.普通数组创建出来

SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.parallelize(Arrays.asList("pandas","i like pandas"));
System.out.println(lines.count());

2.加载一个外部数据集

SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D:/systemInfo.log");
System.out.println(lines.count());
4.RDD操作

RDD支持两种类型的操作:变换(transformation)和动作(action)

变换:对一个RDD进行操作得到一个新的RDD,如map()和filter()
动作:是向应用程序返回值,或向存储系统导出数据的那些操作,如count()和first()

SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D:/systemInfo.log");
JavaRDD<String> errsRDD = lines.filter(new Function<String, Boolean>() {
	private static final long serialVersionUID = 1L;
		public Boolean call(String x) {
                        System.out.println("RDD变换计算");
			return x.contains("Exception");
		}
	});
System.out.println(lines.count());
System.out.println(errsRDD.count());

注:RDD的变换计算是会延迟的,直到你在一个动作中用到
以上例子中我们可以先注释掉2条println,此时是不会输出"RDD变换计算"的,只有在执行action如count()时才会输出"RDD变换计算"

5.传递函数
在Java中,函数是实现了org.apache.spark.api.java包中的Spark函数接口的对象

函数名 方法 用法
Function<T, R> 
R call(T)  
一个输入一个输出,用于map(),filter()之类的操作
Function2<T1, T2, R>
R call(T1, T2)
两个输入一个输出,用于aggregate(),fold()之类的操作
FlatMapFunction<T, R>
Iterable<R> call(T) 
一个输入零个或多个输出,用于 flagMap()之类的操作

注:在Java8中,你也可以用lambda来简洁的实现函数接口
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile("D:/systemInfo.log");
JavaRDD<String> errors = lines.filter(new Contains("Exception"));
System.out.println(errors.count());
class Contains implements Function<String, Boolean> {
	private static final long serialVersionUID = 1L;
	private String query;

	public Contains(String query) {
		this.query = query;
	}

	@Override
	public Boolean call(String v1) throws Exception {
		return v1.contains(query);
	}
}
6.常见的变换和动作
变换
两个最常见的变换是map()和filter(),二者的区别:返回的结果是新的值还是原有的值
map():返回结果就是变换后的每个元素构成的新RDD 
filter():返回的是该RDD中仅能通过该函数的元素构成的新RDD

通过查看接口也可以知道:map(Function<T,R>),filter(Function<T,Boolean>)
如果希望每个输入元素产生多个输出元素,这个操作叫做flatMap(FlatMapFunction<T,U>)

看一个flatMap的例子:
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.parallelize(Arrays.asList("hello world","hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
		private static final long serialVersionUID = 1L;
		public Iterable<String> call(String line) {
			return Arrays.asList(line.split(" "));
		}
});
System.out.println(StringUtils.join(words.collect(), ","));

结果:hello,world,hi    --hello world实现了产生多个输出元素

对应map的实例:


JavaRDD<String[]> words = lines.map(new Function<String, String[]>() {
	@Override
	public String[] call(String v1) throws Exception {
	        return v1.split(" ");
	}
});
RDD支持许多数学集合操作  
对包含{1, 2, 3}和{3, 4, 5}的两个 RDD 进行变换
函数名 描述 结果
union()
生成一个包含两个RDD中所有元素的RDD  
{1, 2, 3, 3, 4, 5}
intersection()
生成两个RDD中都有的元素组成的 RDD  
{3}
subtract() 
从一个RDD中去掉另一个RDD中存在的元素
{1, 2}
cartesian() 
生成两个RDD的笛卡尔积的 
{(1, 3), (1, 4), ..., (3,5)}

动作

以上例子中已经列出了一些动作,看看还有其他哪些常用的

collect()
返回RDD中的所有元素
count() 
返回RDD中元素个数 
countByValue()
RDD中每个元素出现的次数 
take(num) 
返回 RDD 中的 num个元素 
top(num)
返回 RDD 中前 num个元素
takeOrdered(num)(ording) 
返回RDD中基于给定顺序的num个元素
takeSample(withReplacement,num,[seed]) 
随机返回RDD中的num个元素
reduce(func) 
并行合并 RDD 中的元素(比如求和) 
fold(func) 
和reduce()一样,但是提供了一个初值  
aggregate(zeroValue)(seqOp, combOp)
类似 reduce(),但是用于返回不同的类型 
foreach(func)
对RDD中的每个元素应用函数func

7.持久化-persist()

每次对RDD执行动作时,Spark都会重新计算这个RDD和所有依赖的RDD,为避免多次计算同一个RDD,可以缓存该数据;当Spark缓存该RDD时,计算该RDD的节点都会保存它们的分区。如果缓存了该数据的节点出错了,Spark会在需要的时候重新计算丢失的分区

Spark 有多个级别的持久策略可选择

级别 空间占用 cpu 在内存 在硬盘 描述
MEMORY_ONLY


是  
MEMORY_ONLY_SE





MEMORY_AND_DISK

有时
有时
如果数据太多不能放在内存里,则溢出到磁盘
MEMORY_AND_DISK_SER


有时
有时
如果数据太多不能放在内存里,则溢出到磁盘。内存中的数据表现为序列化。
DISK_ONLY






如果你试图缓存太多的数据,当超出了内存,Spark会使用 LRU 缓存策略丢弃旧的分区。对于 memory-only 存储级别,Spark 会在需要访问数据时重新计算;而对于memory-and-disk级别,会将数据写到磁盘。无论哪种方式,你都不用担心是否缓存态度数据会使任务停止。然而,不必要的缓存数据会导致有用的数据
被丢弃而进行过多的计算。

看一个例子:
SparkConf conf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1, 2));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
	private static final long serialVersionUID = 1L;
	public Integer call(Integer x) {
		System.err.println("重新计算:" + x);
		return x * x;
	}
});
result.persist(StorageLevel.MEMORY_ONLY());
System.out.println(result.count());
System.out.println(result.count());
当我们注释掉result.persist...
重新计算:1
重新计算:2
2
重新计算:1
重新计算:2
2
发现2个count,RDD变换计算了2次

启用persist:

重新计算:1
重新计算:2
2
2

--以上是Leaning Spark的笔记

展开阅读全文
加载中
点击加入讨论🔥(3) 发布并加入讨论🔥
3 评论
2 收藏
0
分享
AI总结
返回顶部
顶部