文档章节

Spark(Scheduler)

赵-猛
 赵-猛
发布于 2017/06/17 11:32
字数 1554
阅读 5
收藏 0

资源调度

standalone
yarn
mesos


  不同应用之间的资源调度
    在standalone模式,yarn模式,和粗粒度的mesos模式下,都是使用static partition的方式,设定每个应用占用的cpu和mem数
    在mesos模式下还有一种模式(默认细粒度的模式)是dynamic sharing CPU cores,但是mem还是固定独立分配的

  同一个应用下的资源调度
    在同一个SparkContext里,jobs是可以并行跑的,spark的调度器是线程安全的
    默认的调度器处理jobs是FIFO的。每个job有若干个stages(map和reduce的阶段),如果前面的stage把mem和cpu占满了,那后续来的job里的stage可能就卡住不能跑了
    fair scheduler调度模式,对jobs的tasks采用轮询的方式,短的任务在长任务跑的情况下也可以得到资源并行进行,适合多用户使用的情况
      sc.set("spark.scheduler.mode", "FAIR")    //默认FIFO
      sc.setLocalProperty("spark.scheduler.pool", "test") 
      sc.setLocalProperty("spark.scheduler.allocation.file", "//SPARK_HOME/conf/fairscheduler.xml") 

 

任务调度

Actor模型+线程模型

 

两大组件

  TaskSchedulerListener
    TaskSchedulerListener部分的主要功能是监听用户提交的job,将job分解为不同的类型的stage以及相应的task,并向TaskScheduler提交task。
      DAGScheduler: 主要任务是基于Stage构建DAG,决定每个task的最佳位置

  TaskScheduler
    TaskScheduler接收用户提交的task并执行。
      ClusterScheduler
        YarnClusterScheduler
      LocalScheduler
      MesosScheduler
      


ClusterScheduler功能
    创建和管理schedulable tree, 向cluster申请workers(每个core可以分配一个task),当得到可用的workers后, 用于从schedulable tree中schedule合适的被执行的tasks 
    对于schedulable tree中的TaskSets(排过序的tasksets), 并对于每个Taskset, 遍历TaskLocality, 判断TaskLocality, 越local越优先, 找不到(launchedTask为false)
    才会到下个locality级别, 在多次遍历offer list, 因为一次taskSet.resourceOffer只会占用一个core, 而不是一次用光所有的core, 这样有助于一个taskset中的task比较均匀
    的分布在workers上, 只有在该taskset和该locality下, 去执行task, 对所有worker都找不到合适的task时, 才跳到下个locality级别

 

消息通信(Actor模型)

    driver to executor
    executor to driver

 

Job生命周期

application运行时,由一系列job组成

  每一个job在接受DAGScheduler的调度,依据job中rdd的依赖关系先后产生不同的stage,这些stage组成DAG(result stage, shuffle map stage)
      每一个RDD是对于数据集在某一状态下的表现形式,这个状态有可能是从前一状态转换而来的,即一个RDD有可能与之前的RDD(s)有依赖关系
      窄依赖,child RDD只依赖于parent RDD(s)固定数量的partition
      宽依赖,child RDD的每一个partition都依赖于parent RDD(s)所有partition
      对于窄依赖,Spark会尽量多地将RDD转换放在同一个stage中
      对于宽依赖,宽依赖通常意味着shuffle操作,Spark会将此stage定义为ShuffleMapStage,以便于向MapOutputTracker注册shuffle操作,即shuffle操作定义为stage的边界
      
  每一个stage内部,根据stage产生出相应的task(ResultTask, ShuffleMapTask), 这些task会根据RDD中partition的数量和分布,产生出一组相应的task, 组成TaskSet由TaskScheduler调度

  每一个task,接受TaskScheduler的调度.TaskScheduler可以认为是一个线程池(本地线程池或者分布式线程池),一个task一个线程,在线程中执行rdd.iterator(),执行结果包装为TaskResult
      LocalScheduler(local threadpool,Actor模型)
      ClusterScheduler(distributed/multiple threadpools,Actor模型),driver(actor) + executors(local threadpools)
      MesosScheduler

  相关的task result组装为stage的result, 一系列stage执行完后获得最后的result,即job的result

 

Job/Stage/Task

执行一个 rdd 的 action 会生成一个 job

DAGScheduler
一个Job会被拆分为多组Task,每组Task被称为一个Stage
即一个Job会被切分成 1 个或 1 个以上的 stage,然后各个 stage 会按照执行顺序依次执行

TaskScheduler
一个stage包含多个tasks,task的个数即该stage的finalRDD的partition数
一个stage中的task完全相同,ShuffleMapStage包含的都是ShuffleMapTask;ResultStage包含的都是ResultTask

划分stage的依据:数据是否需要进行重组
spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中
在Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据
  shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage
  result之前的操作只有一个stage
一个Job中包含一个ResultStage及多个ShuffleMapStage


YARN(资源管理)
  client + master + workers
 
Spark:driver+executors
  Driver(即SparkContext: DAGScheduler + TaskScheduler)

    client mode: client本地创建
    cluster mode: client远程调度worker用linux命令启动spark driver进程

  RDD的Action会调用SparkContext.runJob
  SparkContext.runJob调用DAGScheduler.runJob, DAGScheduler中,根据rdd的Dependency生成stage,stage分为ShuffleMapStage和ResultStage
两种类型,根据stage类型生成对应的task,分别是ShuffleMapTask(transformation操作的task)、ResultTask(action操作的task),最后调用TaskScheduler提交任务
  即spark driver序列化task,并传输给spark executor
  
  spark executor接收task并反序列化,然后使用Executor.launchTask执行
  Executor使用线程池ThreadPoolExecutor来执行任务,并按结果大小将结果(或结果信息)序列化发回给driver
  至此RDD的一个Action结束
  


  Executor(ThreadPool)

    master远程调度worker用linux命令启动spark executor进程, executor进程的主要类Executor
    维护了一个线程池newCachedThreadPool来执行driver传输过来的的任务(需要返回序列化)

      类Executor
      private val threadPool = {
        val threadFactory = new ThreadFactoryBuilder()
          .setDaemon(true)
          .setNameFormat("Executor task launch worker-%d")
          .setThreadFactory(new ThreadFactory {
            override def newThread(r: Runnable): Thread =
              new UninterruptibleThread(r, "unused")
          })
          .build()
        Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
      }

    newCachedThreadPool线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程. (SynchronousQueue)
    

 

© 著作权归作者所有

上一篇: Spark(Storage)
赵-猛
粉丝 6
博文 820
码字总数 500010
作品 0
深圳
技术主管
私信 提问
Apache Spark 0.9.1 发布,集群计算环境

Apache Spark 0.9.1 发布,这是一个维护版本,主要是 bug 修复、性能提升以及 YARN 的稳定性提升。 下载地址:http://d3kbcqa49mib13.cloudfront.net/spark-0.9.1.tgz Spark 是一种与 Hadoop...

oschina
2014/04/19
2.6K
4
Spark中的一些基本概念,及运行流程

1 Spark中的一些基本概念 话不多说,先来一个官网地址:链接 1. Application:用户基于spark的代码,由一个Driver和多个Excutor组成。 2. Application jar:将用户基于spark的代码打包成的jar...

yu0_zhang0
2018/04/23
0
0
Apache Spark 3.0 将内置支持 GPU 调度

如今大数据和机器学习已经有了很大的结合,在机器学习里面,因为计算迭代的时间可能会很长,开发人员一般会选择使用 GPU、FPGA 或 TPU 来加速计算。在 Apache Hadoop 3.1 版本里面已经开始内...

Spark
03/10
0
0
在idea中访问spark集群报错

通过spark-submit或者idea中提交jar包报错 问题原因 服务器上版本号和本地版本号不对应,导致进行序列化的UID不一致: 解决方法 如果是通过submit提交的程序报错,则是本地编译spark程序的sca...

freeli
2018/09/29
33
0
Spark(五) -- Spark Streaming介绍与基本执行过程

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45770881 Spark Streaming作为Spark上的四大子框架之一,肩负着实时流计算的重...

jchubby
2015/05/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
6
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
8
0
详解箭头函数和普通函数的区别以及箭头函数的注意事项、不适用场景

箭头函数是ES6的API,相信很多人都知道,因为其语法上相对于普通函数更简洁,深受大家的喜爱。就是这种我们日常开发中一直在使用的API,大部分同学却对它的了解程度还是不够深... 普通函数和...

OBKoro1
昨天
7
0
轻量级 HTTP(s) 代理 TinyProxy

CentOS 下安装 TinyProxy yum install -y tinyproxy 启动、停止、重启 # 启动service tinyproxy start# 停止service tinyproxy stop# 重启service tinyproxy restart 相关配置 默认...

Anoyi
昨天
2
0
Linux创建yum仓库

第一步、搞定自己的光盘 #创建文件夹 mkdir -p /media/cdrom #挂载光盘 mount /dev/cdrom /media/cdrom #编辑配置文件使其永久生效 vim /etc/fstab 第二步,编辑yun源 vim /ect yum.repos.d...

究极小怪兽zzz
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部