文档章节

Azkaban的Exec Server分析 28:Execute Server的任务真正执行过程

强子哥哥
 强子哥哥
发布于 2016/04/11 16:33
字数 289
阅读 309
收藏 0

上节,我们看到了任务放在executor的一个线程池里,下面开始分析真正的执行过程!

================================================================================

jdb azkaban.execapp.AzkabanExecutorServer  -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in azkaban.execapp.FlowRunner.run

run

================================================================================

大家知道,关于一个拓扑图来说,有一个起点的说法

可能在拓扑图里有多个起点,看下azkaban的起点计算方法

 public List<String> getStartNodes() {

    if (startNodes == null) {

      startNodes = new ArrayList<String>();

      for (ExecutableNode node : executableNodes.values()) {

        if (node.getInNodes().isEmpty()) {

          startNodes.add(node.getId());

        }

      }

    }

 

    return startNodes;

  }

================================================================================

以及执行过程

for (String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) {

ExecutableNode startNode = flow.getExecutableNode(startNodeId);

runReadyJob(startNode);

}

================================================================================

经过一番操作后,最终执行的代码如下所示:

stop in azkaban.execapp.FlowRunner.runExecutableNode





private void runExecutableNode(ExecutableNode node) throws IOException {

// Collect output props from the job's dependencies.

prepareJobProperties(node);

 

node.setStatus(Status.QUEUED);

JobRunner runner = createJobRunner(node);

logger.info("Submitting job '" + node.getNestedId() + "' to run.");

try {

executorService.submit(runner);

activeJobRunners.add(runner);

} catch (RejectedExecutionException e) {

logger.error(e);

}

;

}


本质就是把各个job抛到线程池里运行,然后执行下面的代码!

while (!flowFinished) {

synchronized (mainSyncObj) {

if (flowPaused) {

try {

mainSyncObj.wait(CHECK_WAIT_MS);

} catch (InterruptedException e) {

}

 

continue;

} else {

if (retryFailedJobs) {

retryAllFailures();

} else if (!progressGraph()) {

try {

mainSyncObj.wait(CHECK_WAIT_MS);

} catch (InterruptedException e) {

}

}

}

}

}

这里面就是一些策略性的问题了,懒的看了。

后面重点去看具体的一个JobRunner的运行!

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 859
博文 900
码字总数 615641
作品 8
南京
架构师
Azkaban的Exec Server分析 27:Execute Server接收任务的过程分析

之前,我们说过,web server分发任务,是通过如下的URL来的 uri = "http://x.x.x.x:port/executor?action=execute&execid=12&user" // 构造URI URI uri = ExecutorApiClient.buildUri(host, ......

强子哥哥
2016/04/11
244
0
Azkaban 任务调度系统(安装搭建)

无论是在业务开发还是在大数据开发中,脚本都是必不可少的存在,在初期我们会使用crontab来解决问题,那么当发现规模变大监控需求可视化需求的到来Crontab已经显然满足不了需求,抱着一颗解决大数...

喵了_个咪
07/26
0
0
hadoop工作流引擎azkaban

介绍 Azkaban是twitter出的一个任务调度系统,操作比Oozie要简单很多而且非常直观,提供的功能比较简单。Azkaban以Flow为执行单元进行定时调度,Flow就是预定义好的由一个或多个可存在依赖关...

wayhk
2016/05/13
104
0
Azkaban 任务调度系统(使用和小技巧)

上节主要介绍了安装搭建和简单的运行一个job,但是真正使用到实际场景还是需要了解更多的知识,比如任务怎么互相依赖,可以选择指定的节点来执行吗等等一些相关的操作会在本节中进行介绍 附上:...

喵了_个咪
07/30
0
0
Azkaban-2.5及Plugins的安装配置

Azkaban是由LinkedIn开发的调度工具,可以用于调度Hadoop中的相互依赖的Job。有时候,在Hadoop集群中运行的Job是相互依赖的,某些任务需要顺序的执行,这种场景下使用Azkaban能够很好的解决问...

lulongzhou_llz
04/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

kernel version does not match DSO version

错误信息: kernel version 384.11 does not match DSO version 384.130.0 原因是: cuda driver版本太低,不匹配DSO 简单有效的修复方法,升级nvidia driver, 步骤如下: 1. google seach ...

刘小米
今天
0
0
maven坐标和依赖

一、maven坐标详解 <groupId>com.fgt.club</groupId><artifactId>club-common-service-facade</artifactId><version>3.0.0</version><packaging>jar</packaging> maven的坐标元素说......

老韭菜
今天
1
0
springmvc-servlet.xml配置表功能解释

问:<?xml version="1.0" encoding="UTF-8" ?> 答: xml version="1.0"表示是此xml文件的版本是1.0 encoding="UTF-8"表示此文件的编码方式是UTF-8 问:<!DOCTYPE beans PUBLIC "-//SPRING//......

隐士族隐逸
今天
1
0
基于TP5的微信的公众号获取登录用户信息

之前讲过微信的公众号自动登录的菜单配置,这次记录一下在TP5项目中获取自动登录的用户信息并存到数据库的操作 基本的流程为:微信设置自动登录的菜单—>访问的URL指定的函数里获取用户信息—...

月夜中徘徊
今天
0
0
youTrack

package jetbrains.teamsys.license.runtime; 计算lis package jetbrains.ring.license.reader; 验证lis 安装后先不要生成lis,要把相关文件进行替换 ring-license-checker-1.0.41.jar char......

max佩恩
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部