Azkaban的Exec Server分析 28:Execute Server的任务真正执行过程
Azkaban的Exec Server分析 28:Execute Server的任务真正执行过程
强子哥哥 发表于2年前
Azkaban的Exec Server分析 28:Execute Server的任务真正执行过程
  • 发表于 2年前
  • 阅读 288
  • 收藏 0
  • 点赞 1
  • 评论 0

新睿云服务器60天免费使用,快来体验!>>>   

上节,我们看到了任务放在executor的一个线程池里,下面开始分析真正的执行过程!

================================================================================

jdb azkaban.execapp.AzkabanExecutorServer  -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in azkaban.execapp.FlowRunner.run

run

================================================================================

大家知道,关于一个拓扑图来说,有一个起点的说法

可能在拓扑图里有多个起点,看下azkaban的起点计算方法

 public List<String> getStartNodes() {

    if (startNodes == null) {

      startNodes = new ArrayList<String>();

      for (ExecutableNode node : executableNodes.values()) {

        if (node.getInNodes().isEmpty()) {

          startNodes.add(node.getId());

        }

      }

    }

 

    return startNodes;

  }

================================================================================

以及执行过程

for (String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) {

ExecutableNode startNode = flow.getExecutableNode(startNodeId);

runReadyJob(startNode);

}

================================================================================

经过一番操作后,最终执行的代码如下所示:

stop in azkaban.execapp.FlowRunner.runExecutableNode





private void runExecutableNode(ExecutableNode node) throws IOException {

// Collect output props from the job's dependencies.

prepareJobProperties(node);

 

node.setStatus(Status.QUEUED);

JobRunner runner = createJobRunner(node);

logger.info("Submitting job '" + node.getNestedId() + "' to run.");

try {

executorService.submit(runner);

activeJobRunners.add(runner);

} catch (RejectedExecutionException e) {

logger.error(e);

}

;

}


本质就是把各个job抛到线程池里运行,然后执行下面的代码!

while (!flowFinished) {

synchronized (mainSyncObj) {

if (flowPaused) {

try {

mainSyncObj.wait(CHECK_WAIT_MS);

} catch (InterruptedException e) {

}

 

continue;

} else {

if (retryFailedJobs) {

retryAllFailures();

} else if (!progressGraph()) {

try {

mainSyncObj.wait(CHECK_WAIT_MS);

} catch (InterruptedException e) {

}

}

}

}

}

这里面就是一些策略性的问题了,懒的看了。

后面重点去看具体的一个JobRunner的运行!

标签: azkaban
  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
强子哥哥
粉丝 840
博文 555
码字总数 635433
作品 8
×
强子哥哥
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: