Spark(Scheduler)

原创
2017/06/17 11:32
阅读数 32

资源调度

standalone
yarn
mesos


  不同应用之间的资源调度
    在standalone模式,yarn模式,和粗粒度的mesos模式下,都是使用static partition的方式,设定每个应用占用的cpu和mem数
    在mesos模式下还有一种模式(默认细粒度的模式)是dynamic sharing CPU cores,但是mem还是固定独立分配的

  同一个应用下的资源调度
    在同一个SparkContext里,jobs是可以并行跑的,spark的调度器是线程安全的
    默认的调度器处理jobs是FIFO的。每个job有若干个stages(map和reduce的阶段),如果前面的stage把mem和cpu占满了,那后续来的job里的stage可能就卡住不能跑了
    fair scheduler调度模式,对jobs的tasks采用轮询的方式,短的任务在长任务跑的情况下也可以得到资源并行进行,适合多用户使用的情况
      sc.set("spark.scheduler.mode", "FAIR")    //默认FIFO
      sc.setLocalProperty("spark.scheduler.pool", "test") 
      sc.setLocalProperty("spark.scheduler.allocation.file", "//SPARK_HOME/conf/fairscheduler.xml") 

 

任务调度

Actor模型+线程模型

 

两大组件

  TaskSchedulerListener
    TaskSchedulerListener部分的主要功能是监听用户提交的job,将job分解为不同的类型的stage以及相应的task,并向TaskScheduler提交task。
      DAGScheduler: 主要任务是基于Stage构建DAG,决定每个task的最佳位置

  TaskScheduler
    TaskScheduler接收用户提交的task并执行。
      ClusterScheduler
        YarnClusterScheduler
      LocalScheduler
      MesosScheduler
      


ClusterScheduler功能
    创建和管理schedulable tree, 向cluster申请workers(每个core可以分配一个task),当得到可用的workers后, 用于从schedulable tree中schedule合适的被执行的tasks 
    对于schedulable tree中的TaskSets(排过序的tasksets), 并对于每个Taskset, 遍历TaskLocality, 判断TaskLocality, 越local越优先, 找不到(launchedTask为false)
    才会到下个locality级别, 在多次遍历offer list, 因为一次taskSet.resourceOffer只会占用一个core, 而不是一次用光所有的core, 这样有助于一个taskset中的task比较均匀
    的分布在workers上, 只有在该taskset和该locality下, 去执行task, 对所有worker都找不到合适的task时, 才跳到下个locality级别

 

消息通信(Actor模型)

    driver to executor
    executor to driver

 

Job生命周期

application运行时,由一系列job组成

  每一个job在接受DAGScheduler的调度,依据job中rdd的依赖关系先后产生不同的stage,这些stage组成DAG(result stage, shuffle map stage)
      每一个RDD是对于数据集在某一状态下的表现形式,这个状态有可能是从前一状态转换而来的,即一个RDD有可能与之前的RDD(s)有依赖关系
      窄依赖,child RDD只依赖于parent RDD(s)固定数量的partition
      宽依赖,child RDD的每一个partition都依赖于parent RDD(s)所有partition
      对于窄依赖,Spark会尽量多地将RDD转换放在同一个stage中
      对于宽依赖,宽依赖通常意味着shuffle操作,Spark会将此stage定义为ShuffleMapStage,以便于向MapOutputTracker注册shuffle操作,即shuffle操作定义为stage的边界
      
  每一个stage内部,根据stage产生出相应的task(ResultTask, ShuffleMapTask), 这些task会根据RDD中partition的数量和分布,产生出一组相应的task, 组成TaskSet由TaskScheduler调度

  每一个task,接受TaskScheduler的调度.TaskScheduler可以认为是一个线程池(本地线程池或者分布式线程池),一个task一个线程,在线程中执行rdd.iterator(),执行结果包装为TaskResult
      LocalScheduler(local threadpool,Actor模型)
      ClusterScheduler(distributed/multiple threadpools,Actor模型),driver(actor) + executors(local threadpools)
      MesosScheduler

  相关的task result组装为stage的result, 一系列stage执行完后获得最后的result,即job的result

 

Job/Stage/Task

执行一个 rdd 的 action 会生成一个 job

DAGScheduler
一个Job会被拆分为多组Task,每组Task被称为一个Stage
即一个Job会被切分成 1 个或 1 个以上的 stage,然后各个 stage 会按照执行顺序依次执行

TaskScheduler
一个stage包含多个tasks,task的个数即该stage的finalRDD的partition数
一个stage中的task完全相同,ShuffleMapStage包含的都是ShuffleMapTask;ResultStage包含的都是ResultTask

划分stage的依据:数据是否需要进行重组
spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中
在Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据
  shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage
  result之前的操作只有一个stage
一个Job中包含一个ResultStage及多个ShuffleMapStage


YARN(资源管理)
  client + master + workers
 
Spark:driver+executors
  Driver(即SparkContext: DAGScheduler + TaskScheduler)

    client mode: client本地创建
    cluster mode: client远程调度worker用linux命令启动spark driver进程

  RDD的Action会调用SparkContext.runJob
  SparkContext.runJob调用DAGScheduler.runJob, DAGScheduler中,根据rdd的Dependency生成stage,stage分为ShuffleMapStage和ResultStage
两种类型,根据stage类型生成对应的task,分别是ShuffleMapTask(transformation操作的task)、ResultTask(action操作的task),最后调用TaskScheduler提交任务
  即spark driver序列化task,并传输给spark executor
  
  spark executor接收task并反序列化,然后使用Executor.launchTask执行
  Executor使用线程池ThreadPoolExecutor来执行任务,并按结果大小将结果(或结果信息)序列化发回给driver
  至此RDD的一个Action结束
  


  Executor(ThreadPool)

    master远程调度worker用linux命令启动spark executor进程, executor进程的主要类Executor
    维护了一个线程池newCachedThreadPool来执行driver传输过来的的任务(需要返回序列化)

      类Executor
      private val threadPool = {
        val threadFactory = new ThreadFactoryBuilder()
          .setDaemon(true)
          .setNameFormat("Executor task launch worker-%d")
          .setThreadFactory(new ThreadFactory {
            override def newThread(r: Runnable): Thread =
              new UninterruptibleThread(r, "unused")
          })
          .build()
        Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
      }

    newCachedThreadPool线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程. (SynchronousQueue)
    

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部