文档章节

再谈DAG任务分解和Shuffle RDD

守望者之父
 守望者之父
发布于 09/16 18:00
字数 1346
阅读 8
收藏 0

1、DagScheduler分析

DagScheduler功能主要是负责RDD的各个stage的分解和任务提交。Stage分解是从触发任务调度过程的finalStage开始倒推寻找父stage,如果父stage没有提交任务则循环提交缺失的父stage。每个stage有一个父RDD的概念,根据分区数的多少创建多个任务(Task)。

Task的调度实际是通过TaskSchedulerImp来完成的,TaskSchedulerImp里根据环境部署的不同又会使用不同的Backend,比如Yarn集群、独立集群等其Backend是不一样的,这里先有个概念,先不深究Backend。

这里先看看DagScheduler的核心逻辑把。里面首先要研究的一个方法:

def submitMissingTasks(stage: Stage, jobId: Int)

该方法就是提交stage执行,为什么叫这个名称呢?说明这里的stage是需先需要提交执行的,没有其他依赖的stage还未执行了。

submitMissingTasks方法会根据RDD的依赖关系创建两种task,ResultTask和ShuffleMapTask。

一步步来,只看关键代码,因为整体代码太多了不利于理解关键逻辑。

1.1 生成序列化的taskBinary

taskBinaryBytes = stage match {
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

taskBinaryBytes待会是要封装成对像分发到远端Executor上执行的,所以必须是可序列化的。

两者最主要区别就是:ShuffleMapStage的入参是依赖的shuffleDep;而ResultStage的入参是函数的定义func。

1.2 生成task

现在有了taskBinaryBytes,下一步就是生成Task了。

val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

两种Task类型:ShuffleMapTask和ResultTask。这里要主要的是对Task而言,有多少分区(partition)就会生成多少个Task,Task是到分区维度的,而不是到RDD维度的,这个概念一定要明确。

1.3 提交Task

最后一步就是提交任务执行。这里就要用到taskScheduler了,当然了,这里的taskScheduler目前就是指TaskSchedulerImp。

taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))

DagScheduler里还有一个方法这里可以提一下,就是:

submitWaitingChildStages(stage)

这个方法是提交等待当前stage执行的等待stage,这样DAG的整个调度过程就完整了。

 

2、Task执行

两种Task类型:ShuffleMapTask和ResultTask。

2.1 ResultTask

我们先看ResultTask的执行,它相对比较简单,核心方式是runTask,核心代码:

override def runTask(context: TaskContext): U = {   
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    func(context, rdd.iterator(partition, context))
  }

反序列化出来RDD和func,然后执行rdd的iterator方法获取数据集,并在此数据集上执行func函数,要注意实际上这是一次迭代过程而不是多次迭代过程。

2.2 ShuffleMapTask

ShuffleMapTask任务的执行相对复杂些。

核心方法还是runTask,核心代码:

override def runTask(context: TaskContext): MapStatus = {    
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    val rdd = rddAndDep._1
    val dep = rddAndDep._2
    dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
  }

首先反序列化出RDD和依赖项ShuffleDependency。然后用ShuffleWriterProcessor写数据到RDD。

这里的dep其实没太大意义,主要就是来判断是否要进行合并使用的,不影响理解整个shuffle流程,所以我们可以先不要管dep:

dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)

这里的rdd实际就是ShuffleMapTask所要生成的数据集。这句代码到底是什么意思呢? ShuffleWriterProcessor实际上是将数据集写到了BlockManager上去的,先看看ShuffleWriterProcessor的含义。

2.3 ShuffleWriterProcessor

ShuffleWriterProcessor的关键方法的定义先看一下。

def write(rdd: RDD[_],dep: ShuffleDependency[_, _, _],
      partitionId: Int, context: TaskContext,partition: Partition): MapStatus = {
    var writer: ShuffleWriter[Any, Any] = null   
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](
        dep.shuffleHandle,
        partitionId,
        context,
        createMetricsReporter(context))
    writer.write(
        rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    writer.stop(success = true).get    
  }

ShuffleManager实际上就是BlockManager,管理块空间的。

Write是Shuffle写入器,写到BlockManager去;rdd.iterator(partition, context)就是当前Shuffle类型的RDD定义的数据集,dep是rdd计算数据集时依赖的RDD(这里的dep没多大意思先不管)。

这段代码的作用就是将shuffle rdd数据集输出到BlockManager上,在读取RDD的数据时,如果该RDD是shuffle类型,则需要到BlockManager上去读取,这里就是这个作用。

2.4 Shuffle RDD的相关概念

Shuffle类的RDD是指这类RDD的compute方法是依赖于其他RDD的,这里的其他RDD可以是多个。执行shuffle的RDD的计算过程的时候,是将一到多个依赖RDD的迭代器的输出作为数据源迭代器,在此之上执行自己的操作。所以shuffle RDD的compute方法里一定会用到依赖RDD的iterator方法。

可以看看CoGroupedRDD的源码,就能很快的理解shuffle的含义。

附录:github源码地址:

https://github.com/apache/spark/tree/master/core/src/main/scala/org/apache/spark

 

© 著作权归作者所有

守望者之父
粉丝 13
博文 126
码字总数 123871
作品 0
南京
私信 提问
Spark (二) 架构详解

Spark主要模块包括调度与任务分配、I/O模块、通信控制模块、容错模块 以及Shuffle模块。Spark按照应用、作业、Stage和Task几个层次分别进行调度,采用了经 典的FIFO和FAIR等调度算法。在Spa...

bigsloth
2016/10/06
609
0
spark核心构件之Dependency(依赖)

之前的文章说一个spark任务其实就是一系列rdd构成的有向无环图(dag),今天我们来看看,spark是如何表示rdd之间的依赖关系建立这个dag的。 一、rdd如何构成dag 上篇文章讲到了Partition和P...

曾二爷耶
03/23
0
0
Spark(六) -- Spark计算模型

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45788083 整个Spark框架都是基于RDD算子来进行计算的。 What is RDD? Resili...

jchubby
2015/05/17
0
0
30分钟理解Spark的基本原理

文章发布于公号【数智物语】 (ID:decision_engine),关注公号不错过每一篇干货。 作者 | 梁云1991 转载自Python与算法之美(ID:PythonAiRoad) 01Spark优势特点 作为大数据计算框架 MapR...

数智物语说
07/17
0
0
spark内核揭秘-04-spark任务调度系统个人理解

spark的任务调度系统如下所示: 从上图中可以看出来由RDD Objects产生DAG,然后进入了DAGScheduler阶段,DAGScheduler是面向state的高层次的调度器,DAGScheduler把DAG拆分成很多的tasks,每...

stark_summer
2015/01/18
2.4K
2

没有更多内容

加载失败,请刷新页面

加载更多

浅谈Visitor访问者模式

一、前言 什么叫访问,如果大家学过数据结构,对于这点就很清晰了,遍历就是访问的一般形式,单独读取一个元素进行相应的处理也叫作访问,读取到想要查看的内容+对其进行处理就叫作访问,那么...

青衣霓裳
36分钟前
6
0
JS内嵌多个页面,页面之间如何更快捷的查找相关联的页面

假设parent为P页面, P页面有两个子页面,分别为B页面和C页面; B页面和C页面分别内嵌一个iframe,分别为:D页面和E页面 现在通过B页面的内嵌页面D的方法refreshEpage(eUrl)来加载内嵌页面E的内容...

文文1
37分钟前
7
0
Hibernate 5 升级后 getProperties 错误

升级到 Hibernate 5 后,提示有错误: org.hibernate.engine.spi.SessionFactoryImplementor.getProperties()Ljava/util/Map; 完整的错误栈为: java.lang.NoSuchMethodError: org.hibernate......

honeymoose
38分钟前
6
0
mysql-connector-java升级到8.0后保存时间到数据库出现了时差

在一个新项目中用到了新版的mysql jdbc 驱动 <dependency>     <groupId>mysql</groupId>     <artifactId>mysql-connector-java</artifactId>     <version>8.0.18</version> ......

ValSong
42分钟前
7
0
Spring中BeanFactory与FactoryBean的区别

在Spring中有BeanFactory和FactoryBean这2个接口,从名字来看很相似,比较容易搞混。 一、BeanFactory BeanFactory是一个接口,它是Spring中工厂的顶层规范,是SpringIoc容器的核心接口,它定...

大王叫下
44分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部