文档章节

Spark源码分析(1) 从WordCount示例看Spark延迟计算原理

josephguan
 josephguan
发布于 2014/08/22 12:54
字数 1528
阅读 308
收藏 0

WordCount示例:

val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

在执行counts.saveAsTextFile("hdfs://...")前,spark其实没有进行真正的运算,只是在构造计算的过程——有点类似Scala中的lazy运算。当执行counts.saveAsTextFile才真正开始了整个计算。

源代码分析:

从Spark源码看,这种延迟计算的实现原理是如此的简单、明了!简单来说,原理如下:
每个运算(如flatMap、map)其实返回的都是一个RDD对象,可以认为最后形成了一个RDD对象的队列;直到最后需要计算时(例如调用了saveAsTextFile)才开始逐一调用各个RDD对象的compute方法,完成实际的运算。

具体分析:

1. spark.textFile("hdfs://...")返回了什么?
  文件:/core/src/main/scala/org/apache/spark/SparkContext.scala

def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minSplits).map(pair => pair._2.toString)
  }

  def hadoopFile[K, V](...): RDD[(K, V)] = {
    new HadoopRDD(...)
  }

  可见,执行spark.textFile("hdfs://...")其实并没有真正的打开文件或做什么实际的操作,而只是返回了一个HadoopRDD的对象。

2. map、flatMap做了什么?
  文件:core/src/main/scala/org/apache/spark/rdd/RDD.scala

def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
    new FlatMappedRDD(this, sc.clean(f))

  就是这么简单,map返回一个MappedRDD,flatMap返回一个FlatMappedRDD对象。他们都是RDD对象的子类。

3. reduceByKey做了什么?
  reduceByKey不是RDD的一个成员函数,它定义在PairRDDFunctions类中。RDD会被隐式转换为PairRDDFunctions。

  注:隐式转换的方法定义在文件:/core/src/main/scala/org/apache/spark/SparkContext.scala中。
  implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
    new PairRDDFunctions(rdd)

  reduceByKey的定义在文件:/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

  

def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
    reduceByKey(defaultPartitioner(self), func)
  }
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }
  def combineByKey[C](createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializerClass: String = null): RDD[(K, C)] = {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (getKeyClass().isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("Default partitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else if (mapSideCombine) {
      val combined = self.mapPartitionsWithContext((context, iter) => {
        aggregator.combineValuesByKey(iter, context)
      }, preservesPartitioning = true)
      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
        .setSerializer(serializerClass)
      partitioned.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      // Don't apply map-side combiner.
      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
      values.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    }
  }

  这个过程有点复杂,但简单的从函数值上我们可以看到,reduceByKey最终返回的结果是RDD[(K, V)]。其实,细致分析下去,最终返回的是个MapPartitionsRDD对象:上面的代码最终都是生成一个ShuffledRDD对象;然后调用该对象的mapPartitionsWithContext方法,这个方法返回MapPartitionsRDD对象。如下:

def mapPartitionsWithContext[U: ClassTag](
      f: (TaskContext, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = {
    val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter)
    new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
  }

4. 是时候开始真正的计算了!——compute()
 通过上面的几个步骤,我们可以看到,其实我们还什么都没有做,只是构建了一些列的RDD对象!每个RDD对象都有一个Parent,如下:

/** Construct an RDD with just a one-to-one dependency on one parent */
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context , List(new OneToOneDependency(oneParent)))


  通过这个Parent,实际上我们把一个个RDD对象串联了起来!这一串RDD,就是我们搭建起来的计算过程!具体如下:
 HadoopRDD->FlatMappedRDD->MappedRDD->MapPartitionsRDD


  在执行saveAsTextFile最终会调用runJob,在runJob的过程中会调用RDD的compute(具体过程另行分析)。它是RDD的一个abstract函数,由各个子类具体实现。我们以MappedRDD为例,看一下compute都做了什么事:

override def compute(split: Partition, context: TaskContext) =
    firstParent[T].iterator(split, context).map(f)

  它首先调用了parent RDD的iterator(split, context),然后就执行了map运算。可见,在这里我们才真正执行了map运算——这就实现了所谓的延迟计算!

  等等,前面还有个firstParent[T].iterator(split, context),猜猜它是干什么的? ——很简单,我们要想让前面搭建起来的运算过程顺序执行,必须得让“parent”RDD先compute完成,然后才执行用我们当前这个RDD要做的运算啊。所以他就是迭代的调用之前所有parent RDD的compute方法。如下:

/**
   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
   */
  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

总结:

这类延迟计算过程在Spark中称为“Transformation操作”。Transformation操作主要有:

map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集
filter(func) : 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD
flatMap(func):和map差不多,但是flatMap生成的是多个结果
mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition
mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index
sample(withReplacement,faction,seed):抽样
union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合
distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element
groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist
reduceByKey(func,[numTasks]):就是用一个给定的reduce func再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数
sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型
join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数
cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数
cartesian(otherDataset):笛卡尔积就是m*n




© 著作权归作者所有

josephguan
粉丝 1
博文 12
码字总数 11579
作品 0
深圳
私信 提问
Spark基本工作原理与RDD及wordcount程序实例和原理深度剖析

RDD以及其特点 1、RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。 2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每...

qq1137623160
2018/05/10
0
0
Kubernetes助力Spark大数据分析

Kubernetes 作为一个广受欢迎的开源容器协调系统,是Google于2014年酝酿的项目。从Google趋势上看到,Kubernetes自2014年以来热度一路飙升,短短几年时间就已超越了大数据分析领域的长老Had...

店家小二
2018/12/17
0
0
【Spark】Spark On Yarn 环境搭建及 WordCount 程序原理深度剖析

版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/gongxifacai_believe/article/details/86652088 1、Spark On Yarn 环境搭建 参考文献:http://spark.apache.org/doc...

魏晓蕾
01/31
0
0
SPARK 源码分析技术分享(带bilibili视频)

SPARK 源码分析技术分享 (带bilibili视频) 【本站点正在持续更新中…2018-12-05…】 SPARK 1.6.0-cdh5.15.0 Hadoop 2.6.0-cdh5.15.0 spark-scala-maven 微信(技术交流) : thinktothings SPA...

thinktothings
2018/12/02
0
0
Spark成为大数据高手进阶步骤

什么是Spark Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapRedu...

MoksMo
2015/11/05
2.1K
1

没有更多内容

加载失败,请刷新页面

加载更多

代理模式之JDK动态代理 — “JDK Dynamic Proxy“

动态代理的原理是什么? 所谓的动态代理,他是一个代理机制,代理机制可以看作是对调用目标的一个包装,这样我们对目标代码的调用不是直接发生的,而是通过代理完成,通过代理可以有效的让调...

code-ortaerc
今天
5
0
学习记录(day05-标签操作、属性绑定、语句控制、数据绑定、事件绑定、案例用户登录)

[TOC] 1.1.1标签操作v-text&v-html v-text:会把data中绑定的数据值原样输出。 v-html:会把data中值输出,且会自动解析html代码 <!--可以将指定的内容显示到标签体中--><标签 v-text=""></......

庭前云落
今天
8
0
VMware vSphere的两种RDM磁盘

在VMware vSphere vCenter中创建虚拟机时,可以添加一种叫RDM的磁盘。 RDM - Raw Device Mapping,原始设备映射,那么,RDM磁盘是不是就可以称作为“原始设备映射磁盘”呢?这也是一种可以热...

大别阿郎
今天
12
0
【AngularJS学习笔记】02 小杂烩及学习总结

本文转载于:专业的前端网站☞【AngularJS学习笔记】02 小杂烩及学习总结 表格示例 <div ng-app="myApp" ng-controller="customersCtrl"> <table> <tr ng-repeat="x in names | orderBy ......

前端老手
昨天
16
0
Linux 内核的五大创新

在科技行业,创新这个词几乎和革命一样到处泛滥,所以很难将那些夸张的东西与真正令人振奋的东西区分开来。Linux内核被称为创新,但它又被称为现代计算中最大的奇迹,一个微观世界中的庞然大...

阮鹏
昨天
20
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部