文档章节

Spark源码分析心得

守望者之父
 守望者之父
发布于 2017/03/12 11:09
字数 1579
阅读 85
收藏 0

要想学习理解一款流行分布式系统的源码不是一件容易的事情,一定要多次迭代,看无数遍并且领悟其设计思想。第一次看不要纠结于细节,每次迭代过程中增加一点点细节的理解,最终达到豁然开朗的地步。

学习优秀的源代码是提高自身技能的最好途径,比做无数个低水平的项目效果要显著的多,好了,闲话少说,让我们试图来理解Spark的世界吧。

1、大框架

首先要掌握几个基本概念,Spark是分布式计算框架,核心思想是通过将计算任务尽量分配到源数据一致的机器上执行,降低网络延时;同时引入Dag图依赖关系生成一系列计算任务,当然缓存等机制是不可避免要用到的,为了提高性能嘛。

大部分分布式计算的核心思想是类似的,通过成熟的分布式框架在集群间通信和同步消息,提供横向扩展能力,满足大数据计算的需求。在Spark中具体的分布式消息传递是通过Akka模块来支持的。

核心类:

SparkContext,DagSchedule,TaskScheduleImp(TaskSchedule的实现),Stage,Task,TaskDescription,TaskInfo,RDD,BlockManager等

(1)SparkContext

创建spark任务主要是通过它来完成的,Spark程序上下文,里面包含了DagSchedule,TaskSchedule,BlockManager等等。

(2)DagSchedule

矢量图计算,解析整个Spark任务生成Stage调用树,每个Stage的划分主要是看该Stage是否包含Shuffle过程来决定,Stage在调用的时候生成TaskSet,并通过TaskSchedule分配到具体的Executor上,每个TaskSet包含1到多个Task,Task具体分成ResultTask和ShuffleResultTask两种,两种的区别从命名上就可以区分出来,前者直接计算得到结果,后者是和Shuffle过程相关的。

(3)TaskScheduleImp

通过具体的TaskScheduleEndpoint在集群间通信,发布任务等。

与集群通信是通过TaskScheduleEndpoint来执行的。最常用的是CoarseGrainedSchedulerBackend

(4)Executor

执行单元,一般每台集群机器会分配一个Executor,每个Executor管理本地的多个TaskSet。

Executor通过ExecutorBackend来和Master的ScheduleEndpoint通信,相应的最常用的Endpoint是CoarseGrainedExecutorBackend。

(5)RDD

数据集,Spark定义了很多的数据集(RDD),比如HadoopRDD,JdbcRDD等。

RDD中的具体的数据有时是通过BlockManager来管理的,RDD中能寻址到数据所在的机器。

(6)Task

具体的任务,Master定义好Task后发布到集群中对应机器的Exector去执行,执行结果通过DirectResult和IndirectResult返回,后者通过包含了结果数据所在的Shuffle地址或者块地址等寻址信息。

(7)BlockManager

管理整个集群的Block,默认Block大小是128M,内存和磁盘数据的对应关系等也是通过相关的类来管理的。

以上是初步的比较笼统的一个框架结构,主要用于加强理解,要想更好的理解Spark必须要通过不断的读源码,后续时间笔者会依次和大家分享更具体的源码心得。

(8)MapOutTracker

跟踪整个集群的MapStatus,不同集群之间通过MapOutTrackerMaster等通信来同步信息。

 

2、DagSchedule任务调度

class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging

(1)Dag接受到的各种命令都通过Dag内部事件的方式被执行(而不是直接执行),便于Dag内部做一些排序调度的判断准备工作。

命令的种类:心跳、Exector的注册注销、Task的执行结束失败

(2)两种主要的stage:ResultStage,ShuffleMapStage

getShuffleMapStage:生成或者获得Shuffle类型的stage,同时要生成Shuffle的locs位置信息在里面

getParentStage:根据Rdd各依赖Rdd的Shuffle属性,获取所有Stage列表,这些Stage执行完成之后才能执行本Stage。

getMissingParentStages:获取所有缺失的父Stage,这里只判断Shuffle Stage的情况,ResultStage不考虑,如果缺失则生成该ShuffleToMapStage。

其他一系列维护Stage、Job关系的属性和方法。

(3)主要的Submit方式

SubmitJob:提交作业

生成该作业的finalStage,然后再生成ActiveJob,组装各状态HashMap,提交该finalStage

SubmitStage:提交某个Stage,与SubmitJob的区别是不生成新的ActiveJob。

submitMissingTasks:真正的提交Stage,当所有父Stage都准备就绪时执行,要重点看。stage的所有partitions生成多个Task。最后将这些tasks合并成TaskSet并提交到TaskSchedule(taskScheduler.submitTasks( new TaskSet(...))).

(4)TaskCompleted事件处理

广播事件到listenerBus;

找到Task所在的Stage:

a、如果是ResultTask,则更新该Stage属于的Job的状态,并判断Stage是否所有Task都执行完成,如果是则触发StageCompleted事件。好像也可能是JobCompleted事件,还要再看一次。

b、ShuffleMapTask,则找到对应的ShuffleStage,更新对应的Task所在分区的MapStatus(或者位置信息等),更新mapOutputTracker状态属性,最后启动所有满足提交状态的等待Stage(waitingStages)。

(5)handleExecutorAdded

当有额外的Executor加进来的时候,只是执行SubmitWaitingStage命令,这时候可能会有等待Stage满足了执行条件。

 

3、Task

ResultTask:直接在RDD上执行func

ShuffleMapTask:执行ShuffleDependcy的shuffleHandler方法,主要是一些聚合函数的计算,然后将对应的RDD分区数据执行这些聚合计算之后的结果写入到shuffleManager管理的writer中去,可能是写入到内存也可能写入到disk(猜测),shuffleManager一般会用到BlockManager来管理数据的存储。

Shuffle结果一般存储到集群的临时目录中,具体规则可参见DiskBlockManager和FileShuffleBlockResolver等类实现。

具体内容参见代码。

shuffleManager有这么几种:

(1)HashShuffleManager

(2)SortShuffleManager

当然也可以自定义

 

 

 

© 著作权归作者所有

守望者之父
粉丝 13
博文 126
码字总数 123871
作品 0
南京
私信 提问
SPARK 源码分析技术分享(带bilibili视频)

SPARK 源码分析技术分享 (带bilibili视频) 【本站点正在持续更新中…2018-12-05…】 SPARK 1.6.0-cdh5.15.0 Hadoop 2.6.0-cdh5.15.0 spark-scala-maven 微信(技术交流) : thinktothings SPA...

thinktothings
2018/12/02
0
0
Spark成为大数据高手进阶步骤

什么是Spark Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapRedu...

MoksMo
2015/11/05
2.1K
1
Spark源码分析调试环境搭建

目前常用的Spark版本有三种Cloudera、HDP和Apache,源码的获取方式可以在各自官网下载。本文选择Apache版本。 搭建环境所需要的工具如下: CentOS 7 maven 3.5.0 Java 1.8.0 Scala 2.12.2 I...

火力全開
2017/10/26
40
0
OSC 第 69 期高手问答 — Apache Spark 源码剖析

OSCHINA 本期高手问答 ( 4月20日- 4月26日) 我们请来了@eagleonline(许鹏)为大家解答关于 Apache Spark 方面的问题。 许鹏,长期致力于电信领域和互联网的软件研发,在数据处理方面积累了大...

叶秀兰
2015/04/20
2.9K
17
Spark2.1.0之剖析spark-shell

通过在spark-shell中执行word count的过程,让读者了解到可以使用spark-shell提交Spark作业。现在读者应该很想知道spark-shell究竟做了什么呢? 脚本分析 在Spark安装目录的bin文件夹下可以找...

beliefer
2018/04/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

总结

一、设计模式 简单工厂:一个简单而且比较杂的工厂,可以创建任何对象给你 复杂工厂:先创建一种基础类型的工厂接口,然后各自集成实现这个接口,但是每个工厂都是这个基础类的扩展分类,spr...

BobwithB
37分钟前
3
0
java内存模型

前言 Java作为一种面向对象的,跨平台语言,其对象、内存等一直是比较难的知识点。而且很多概念的名称看起来又那么相似,很多人会傻傻分不清楚。比如本文我们要讨论的JVM内存结构、Java内存模...

ls_cherish
41分钟前
3
0
友元函数强制转换

友元函数强制转换 p522

天王盖地虎626
昨天
5
0
js中实现页面跳转(返回前一页、后一页)

本文转载于:专业的前端网站➸js中实现页面跳转(返回前一页、后一页) 一:JS 重载页面,本地刷新,返回上一页 复制代码代码如下: <a href="javascript:history.go(-1)">返回上一页</a> <a h...

前端老手
昨天
5
0
JAVA 利用时间戳来判断TOKEN是否过期

import java.time.Instant;import java.time.LocalDateTime;import java.time.ZoneId;import java.time.ZoneOffset;import java.time.format.DateTimeFormatter;/** * @descri......

huangkejie
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部