文档章节

spark.streaming.concurrentJobs参数解密

v1daddy
 v1daddy
发布于 2016/04/23 15:30
字数 695
阅读 41
收藏 0

最近,在spark streaming 调优时,发现个增加job并行度的参数spark.streaming.concurrentJobs,spark 默认值为1,当增加为2时(在spark-default中配置),如遇到处理速度慢 streaming application UI 中会有两个Active Jobs(默认值时为1),也就是在同一时刻可以执行两个批次的streaming job,下文分析这个参数是如何影响streaming 的执行的。 ##参数引入 在spark streaming 的JobScheduler line 47,读取了该参数:

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor =  ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")

使用concurrentJobs参数初始化jobExecutor线程池,也就是这个参数直接影响了job executor线程池中的线程数目。

job executor

job executor 线程池用来execute JobHandler线程;在jobSchedule中有个job容器jobSets:

private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]

用来保存不同的时间点生成的JobSet,而JobSet中包含多个Job; JobSet submit逻辑:

  def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

不难看出jobExecutor的容量决定了池子中同时可以被处理的JobHandler线程数,JobHandler是job的执行线程,因此决定了可以被同时被提交的Job数目

使用方法

可以通过集中方法为streaming job配置此参数。

  • spark-default中修改 全局性修改,所有的streaming job都会受到影响。
  • 提交streaming job是 --conf 参数添加(推荐) 在提交job时,可以使用--conf 参数为该job添加个性化的配置。例如: bin/spark-submit --master yarn --conf spark.streaming.concurrentJobs=5 设置该streaming job的job executor 线程池大小为5,在资源充足的情况下可以同时执行5个batch job。
  • 代码设置 在代码中通过sparkConf设置: sparkConf.set("spark.streaming.concurrentJobs", "5"); 或者 System.setProperty("spark.streaming.concurrentJobs", "5");

scheduler mode的使用建议

在配置多个concurrentJob时,多个批次job被同时提交到集群中,也就需要更多的计算资源;当没有更多的计算资源(Executor)被分配个该streaming job时,可将schedul 调整为FAIR(公平调度)来达到被提交的多个job可公平的共享计算资源。 当调整为公平调度时,job可以共享计算资源,而job的提交仍然是有时间顺序的(虽然时间间隔很小),容易造成task在executor间分配的倾斜,拉长job的整体执行时间。 当使用fifo调度方式,先到的job优先获得计算资源,当executor数目不足时,job会等待executor被释放,task数目反而不易倾斜。 在实际使用时,如果executor数目足够,建议使用FIFO模式,如在concurrentJob为默认配置时,executor分配数目为m,则当concurrentJobs配置为n时,executor建议分配为 n*m。

© 著作权归作者所有

v1daddy
粉丝 1
博文 2
码字总数 695
作品 0
武汉
高级程序员
私信 提问
Spark Streaming源码解读之JobScheduler内幕实现和深度思考

一JobScheduler内幕实现 我们从StreamingContext start开始: 启动的时候有三种状态 INITIALIZED、ACTIVE、STOPPED,这里在主线程中启动了scheduler(scheduler.start()) 我们可以看到JobSche...

荀道子
2016/05/15
65
0
spark典型优化方式

1:reduce task数目不合适 解决方式: 设置合理的并行度有利于充分利用集群资源,提升spark应用的性能。Spark官方的推荐,给集群中的每个cpu core设置2~3个task。数量太大造成很多小任务,增...

JPblog
2016/08/08
97
0
spark内核揭秘-14-Spark性能优化的10大问题及其解决方案

问题1:reduce task数目不合适 解决方案: 需要根据实际情况调整默认配置,调整方式是修改参数spark.default.parallelism。通常的,reduce数目设置为core数目的2-3倍。数量太大,造成很多小任...

stark_summer
2015/01/26
141
0
Spark Streaming源码解析之Job动态生成

---title: sparkStreaming源码解析之Job动态生成subtitle: sparkStream的Job动态生成思维脑图description: sparkStream的Job动态生成思维脑图keywords: [spark,streaming,源码,JOB]author: ...

freeli
2018/12/07
43
0
Spark性能优化的10大问题及其解决方案

Spark性能优化的10大问题及其解决方案 问题1:reduce task数目不合适 解决方式: 需根据实际情况调节默认配置,调整方式是修改参数spark.default.parallelism。通常,reduce数目设置为core数...

片刻
2015/12/15
723
0

没有更多内容

加载失败,请刷新页面

加载更多

Handler简解

Handler 这里简化一下代码 以便理解 Handler不一定要在主线程建 但如Handler handler = new Handler(); 会使用当前的Looper的, 由于要更新UI 所以最好在主线程 new Handler() { mLooper = Lo...

shzwork
27分钟前
3
0
h5获取摄像头拍照功能

完整代码展示: <!DOCTYPE html> <head> <title>HTML5 GetUserMedia Demo</title> <meta charset="utf-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0, maximum......

诗书易经
30分钟前
3
0
正向代理和反向代理

文章来源 运维公会:正向代理和反向代理 1、正向代理 (1)服务对象不同 正向代理服务器的服务对象是客户端,可以将客户端和代理服务器看作一个整体。 (2)配置方法不同 需要在客户端配置代...

运维团
46分钟前
4
0
5个避免意外论文重复率高的方法

即使你不是故意抄袭,但你可能在无意中抄袭了别人的论文, 这个叫做意外抄袭,它可能正发生在你身上,如果你不熟悉学术 道德规范,这里将告诉你5个基本的方法来避免意外抄袭。 Tip1 熟悉其他...

论文辅导员
47分钟前
4
0
Maven通过profiles标签读取不同的配置

<profiles> <profile> <id>dev</id> <properties> <profiles.active>dev</profiles.active> </properties> ......

时刻在奔跑
53分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部