文档章节

Play Spark RDD

venps
 venps
发布于 2016/01/18 11:20
字数 1578
阅读 3361
收藏 59
点赞 5
评论 1


RDD

有关RDD的基础概念请阅读spark官方文档,或网上搜索其他内容。本文完全是实战经验的总结。

惰性求值

RDD的转换操作都是惰性求值的。
惰性求值意味着我们对RDD调用转化操做(例如map操作)并不会立即执行,相反spark会在内部记录下所要求执行的操作的相关信息。
把数据读取到RDD的操作同样也是惰性的,因此我们调用sc.textFile()时数据没有立即读取进来,而是有必要时才会读取。和转化操作一样读取数据操作也有可能被多次执行。这在写代码时要特别注意。

关于惰性求值,对新手来说可能有与直觉相违背之处。有接触过函数式语言类如haskell的应该不会陌生。
在最初接触spark时,我们也会有这样的疑问。
也参与过这样的讨论:

 val sc = new SparkContext("local[2]", "test")
 val f:Int ⇒ Int = (x:Int) ⇒ x + 1
 val g:Int ⇒ Int = (x:Int) ⇒ x + 1
 val rdd = sc.parallelize(Seq(1,2,3,4),1)
 //1
 val res1 = rdd.map(x ⇒ g(f(x))).collect
 //2
 val res2 = rdd.map(g).map(f).collect

第1和第2两种操作均能得到我们想要的结果,但那种操作更好呢?
直观上我们会觉得第1种操作更好,因为第一种操作可以仅仅需要一次迭代就能得到我们想要的结果。第二种操作需要两次迭代操作才能完成。
是我们想象的这样吗?让我们对函数f和g的调用加上打印。按照上面的假设。1和2的输出分别是这样的:

1:  f   g   f   g   f   g   f   g       
2:  g   g   g   g   f   f   f   f

代码:

val sc = new SparkContext("local[2]", "test")
val f:Int ⇒ Int = (x:Int) ⇒ {
    print("f\t")
    x + 1
    }
val g:Int ⇒ Int = (x:Int) ⇒ {
  print("g\t")
  x + 1
}
val rdd = sc.parallelize(Seq(1,2,3,4), 1
//1
val res1 = rdd.map(x ⇒ g(f(x))).collect()
//2
val res2 = rdd.map(f).map(g).collect()

将上面的代码copy试着运行一下吧,我们在控制台得到的结果是这样的。

f   g   f   g   f   g   f   g
f   g   f   g   f   g   f   g

是不是大大出乎我们的意料?这说明什么?说明spark是懒性求值的! 我们在调用map(f)时并不会真正去计算, map(f)只是告诉spark数据是怎么计算出来的。map(f).map(g)其实就是在告诉spark数据先通过f在通过g计算出来的。然后在collect()时,spark在一次迭代中先后对数据调用f、g。

继续回到我们最初的问题,既然两种调用方式,在性能上毫无差异,那种调用方式更好呢?我们更推荐第二种调用方式,除了api更加清晰之外。在调用链很长的情况下,我们可以利用spark的检查点机制,在中间添加检查点,这样数据恢复的代价更小。而第一种方式调用链一旦出错,数据只能从头计算。

那么spark到底施加了何种魔法,如此神奇?让我们来拨开spark的层层面纱。最好的方式当然是看源码了。以map为例:

RDD的map方法

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

和MapPartitionsRDD的compute方法

override def compute(split: Partition, context:
                TaskContext): Iterator[U] =
                f(context, split.index,
                firstParent[T].iterator(split, 
                context))

关键是这个 iter.map(cleanF)),我们调用一个map方法其实是在iter对象上调用一个map方法。iter对象是scala.collection.Iterator的一个实例。
在看一下Iterator的map方法

def map[B](f: A => B): Iterator[B]=  
    new AbstractIterator[B] {
    def hasNext = self.hasNext
    def next() = f(self.next())
}

联想到我们刚才说的我们在RDD上调用一个map方法只是告诉spark数据是怎么计算出来的,并不会真正计算。是不是恍然大悟了。

向spark传递函数

我们可以把定义好的内联函数、方法的引用或静态方法传递给spark。就像scala的其它函数式API一样。我们还要考虑一些细节,比如传递的函数及其引用的变量是可序列话的(实现了java的Serializable接口)。除此之外传递一个对象的方法或字段时,会包含对整个对象的引用。我们可以把该字段放到一个局部变量中,来避免传递包含该字段的整个对象。

scala中的函数传递

class SearchFunctions(val query:String){
    def isMatch(s:String) = s.contains(query)

    def getMatchFuncRef(rdd:RDD[String])
        :RDD[String]= {
        //isMatch 代表this.isMatch因此我们要传递整个this
        rdd.map(isMatch)
    }

    def getMatchFieldRef(rdd:RDD[String])={
    //query表示this.query因此我们要传递整个this
    rdd.map(x=>x.split(query))
    }

    def getMatchsNoRef(rdd:RDD[String])={
    //安全只要把我们需要的字段放到局部变量中
    val q = this.query
    rdd.map(x=>x.split(query))
    }
}

如果在scala中出现了NotSerializableException,通常问题就在我们传递了一个不可序列化类中的函数或字段。传递局部可序列变量或顶级对象中的函数始终是安全的。

持久化

如前所述,spark的RDD是惰性求值的,有时我们希望能过多次使用同一个RDD。如果只是简单的对RDD调用行动操作,spark每次都会重算RDD和它的依赖。这在迭代算法中消耗巨大。 可以使用RDD.persist()让spark把RDD缓存下来。

避免GroupByKey

让我们来看看两种workCount的方式,一种使用reduceByKey,另一种使用groupByKey。

val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

val wordCountsWithReduce = wordPairsRDD
  .reduceByKey(_ + _)
  .collect()

val wordCountsWithGroup = wordPairsRDD
  .groupByKey()
  .map(t => (t._1, t._2.sum))
  .collect()

虽然两种方式都能产生正确的结果,但reduceByKey在大数据集时工作的更好。这时因为spark会在shuffling数据之前,为每一个分区添加一个combine操作。这将大大减少shuffling前的数据。

看下图来理解 reduceBykey的过程

而groupBykey会shuff所有的数据,这大大加重了网络传输的数据量。另外如果一个key对应很多value,这样也可能引起out of memory。

如图,groupby的过程

© 著作权归作者所有

共有 人打赏支持
venps
粉丝 7
博文 2
码字总数 3885
作品 0
朝阳
高级程序员
加载中

评论(1)

souo
souo
79
Spark 的Core深入(二)

Spark 的 Core 深入(二) 标签(空格分隔): Spark的部分 一、日志清洗的优化: 1.1 日志清洗有脏数据问题 rdd.partitions.length rdd.cacherdd.count 一个分区默认一个task 分区去处理默认...

flyfish225
05/08
0
0
Spark基本工作原理与RDD及wordcount程序实例和原理深度剖析

RDD以及其特点 1、RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。 2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每...

qq1137623160
05/10
0
0
Spark笔记整理(二):RDD与spark核心概念名词

[TOC] Spark RDD 非常基本的说明,下面一张图就能够有基本的理解: Spark RDD基本说明 1、Spark的核心概念是RDD (resilient distributed dataset,弹性分布式数据集),指的是一个只读的,可分...

xpleaf
04/25
0
0
Spark 入门(Python、Scala 版)

本文中,我们将首先讨论如何在本地机器上利用Spark进行简单分析。然后,将在入门级水平探索Spark,了解Spark是什么以及它如何工作(希望可以激发更多探索)。最后两节将开始通过命令行与Spa...

大数据之路
2015/05/07
0
0
Spark2.1.0之基础知识

在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文将对Spark的基础知识进行介绍。但在此之前,读者先跟随本人来一次简单的时光穿梭,最后还将对Java与Scala在语言上进...

beliefer
05/24
0
0
关于Spark的基本概念和特性简介

1、Spark是什么? ○ 高可伸缩性 ○ 高容错 ○ 基于内存计算 2、Spark的生态体系(BDAS,中文:伯利克分析栈) ○ MapReduce属于Hadoop生态体系之一,Spark则属于BDAS生态体系之一 ○ Hadoop...

openthings
2015/06/25
0
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
0
1
Spark2.0-RDD分区原理分析

Spark分区原理分析 介绍 分区是指如何把RDD分布在spark集群的各个节点的操作。以及一个RDD能够分多少个分区。 一个分区是大型分布式数据集的逻辑块。 那么思考一下:分区数如何映射到spark的...

xiaomin0322
06/06
0
0
【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第3节①

一、以RDD为基石的Spark编程模型 在Spark中一切都是基于RDD的: 什么是RDD呢?官方给出的解释是: 也就是说每个RDD都至少有以下三个函数实现: Spark自带了非常多的RDD: RDD主要分为两种: 其...

Spark亚太研究院
2014/12/29
0
0
Apache Flink和Apache Spark有什么异同?它们的发展前景分别怎样?

============================= object WordCount { def main(args: Array[String]) {val env = new SparkContext("local","wordCount")val data = List("hi","how are you","hi")val dataSe......

justlpf
05/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

SpringBoot | 第十章:Swagger2的集成和使用

前言 前一章节介绍了mybatisPlus的集成和简单使用,本章节开始接着上一章节的用户表,进行Swagger2的集成。现在都奉行前后端分离开发和微服务大行其道,分微服务及前后端分离后,前后端开发的...

oKong
今天
2
0
Python 最小二乘法 拟合 二次曲线

Python 二次拟合 随机生成数据,并且加上噪声干扰 构造需要拟合的函数形式,使用最小二乘法进行拟合 输出拟合后的参数 将拟合后的函数与原始数据绘图后进行对比 import numpy as npimport...

阿豪boy
今天
1
0
云拿 无人便利店

附近(上海市-航南路)开了家无人便利店.特意进去体验了一下.下面把自己看到的跟大家分享下. 经得现场工作人员同意后拍了几张照片.从外面看是这样.店门口的指导里强调:不要一次扫码多个人进入....

周翔
昨天
1
0
Java设计模式学习之工厂模式

在Java(或者叫做面向对象语言)的世界中,工厂模式被广泛应用于项目中,也许你并没有听说过,不过也许你已经在使用了。 简单来说,工厂模式的出现源于增加程序序的可扩展性,降低耦合度。之...

路小磊
昨天
158
1
npm profile 新功能介绍

转载地址 npm profile 新功能介绍 npm新版本新推来一个功能,npm profile,这个可以更改自己简介信息的命令,以后可以不用去登录网站来修改自己的简介了 具体的这个功能的支持大概是在6这个版...

durban
昨天
1
0
Serial2Ethernet Bi-redirection

Serial Tool Serial Tool is a utility for developing serial communications, custom protocols or device testing. You can set up bytes to send accordingly to your protocol and save......

zungyiu
昨天
1
0
python里求解物理学上的双弹簧质能系统

物理的模型如下: 在这个系统里有两个物体,它们的质量分别是m1和m2,被两个弹簧连接在一起,伸缩系统为k1和k2,左端固定。假定没有外力时,两个弹簧的长度为L1和L2。 由于两物体有重力,那么...

wangxuwei
昨天
0
0
apolloxlua 介绍

##项目介绍 apolloxlua 目前支持javascript到lua的翻译。可以在openresty和luajit里使用。这个工具分为两种模式, 一种是web模式,可以通过网页使用。另外一种是tool模式, 通常作为大规模翻...

钟元OSS
昨天
2
0
Mybatis入门

简介: 定义:Mybatis是一个支持普通SQL查询、存储过程和高级映射的持久层框架。 途径:MyBatis通过XML文件或者注解的形式配置映射,实现数据库查询。 特性:动态SQL语句。 文件结构:Mybat...

霍淇滨
昨天
2
0
开发技术瓶颈期,如何突破

前言 读书、学习的那些事情,以前我也陆续叨叨了不少,但总觉得 “学习方法” 就是一个永远在路上的话题。个人的能力、经验积累与习惯方法不尽相同,而且一篇文章甚至一本书都很难将学习方法...

_小迷糊
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部