spark内核揭秘-06-TaskSceduler启动源码解析初体验
spark内核揭秘-06-TaskSceduler启动源码解析初体验
stark_summer 发表于3年前
spark内核揭秘-06-TaskSceduler启动源码解析初体验
  • 发表于 3年前
  • 阅读 55
  • 收藏 2
  • 点赞 0
  • 评论 0

【腾讯云】如何购买服务器最划算?>>>   

摘要: spark内核揭秘-06-TaskSceduler启动源码解析初体验 spark内核揭秘-06-TaskSceduler启动源码解析初体验 spark内核揭秘-06-TaskSceduler启动源码解析初体验 spark内核揭秘-06-TaskSceduler启动源码解析初体验

TaskScheduler实例对象启动源代码如下所示:


从上面代码可以看出来,taskScheduler的启动是在SparkContext


找到TaskSchedulerImpl实现类中的start方法实现:

1、从上代码看到,先启动CoarseGrainedSchedulerBackend,


从上面CoarseGrainedSchedulerBackend类的代码,可以看出spark启动了DriverActor,名称为CoarseGrainedScheduler,这是一个akka消息通信类,会先运行preStart()方法


从上面代码可以看到,初始化了akka客户端监听,还有最重要的是调用了系统的scheduler调度,参数函数是立即执行调度,间隔1000毫秒,运行ReviveOffers方法


进入makeOffers()方法:


运行launchTask方法:




这段代码是spark序列号任务大小超过akkaFrameSize - AkkaUtils.reservedSizeBytes大小,那就报错为”Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values.
“ ,此刻会将该任务终止,并将任务从任务列表中移除,这样推荐使用broadcast广播方式

否则,将获取执行任务数据,并减少空闲cpu数,发送消息执行 LaunchTask(new SerializableBuffer(serializedTask))方法,即CoarsedGrainedExecutorBackend类的LaunchTask方法:


上面代码 会运行executor 的launchTask方法:


TaskRunner就是一个多线程:




代码太多,我就不截图了,其实实际就是根据机器状况,运行task任务

2、然后我们回到TaskSchedulerImpl实现类中的start方法


如果isLocal=false and spark.speculation=true,不是local模式,那就要dispatcher分发任务了,默认是100毫秒后立即启动,并间隔100毫秒循环运行,


CoarseGrainedSchedulerBackend的reviveOffers:

共有 人打赏支持
粉丝 59
博文 75
码字总数 51050
×
stark_summer
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: