文档章节

Azkaban的Exec Server分析 29:JobRunner的真正执行过程(前奏)

强子哥哥
 强子哥哥
发布于 2016/04/11 19:16
字数 545
阅读 285
收藏 0
点赞 1
评论 0

重点来了,现在看看Job Runner的运行过程!

==========================================================================================

jdb azkaban.execapp.AzkabanExecutorServer  -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in  azkaban.execapp.JobRunner.run

run

==========================================================================================

之前做了一些校验工作,然后

fireEvent(Event.create(this, Type.JOB_STARTED, null, false));

让我们来看有哪些Listeners

JobRunner-Hello-17[1] print listeners

 listeners = "[

azkaban.execapp.FlowRunner$JobRunnerEventListener@2a32e7d1, 

azkaban.execapp.event.JobCallbackManager@2b274bce, 

azkaban.execapp.jmx.JmxJobMBeanManager@53976f5c

]"

那么,这3个Listener分别做了什么事情呢?

==========================================================================================

第一个,不好意思,这里是start,所以下面的代码都没有执行!

private class JobRunnerEventListener implements EventListener {

public JobRunnerEventListener() {

}

 

@Override

public synchronized void handleEvent(Event event) {

//看到这里了

JobRunner runner = (JobRunner) event.getRunner();

//看到这里了

if (event.getType() == Type.JOB_STATUS_CHANGED) {

updateFlow();

} else if (event.getType() == Type.JOB_FINISHED) {

ExecutableNode node = runner.getNode();

long seconds = (node.getEndTime() - node.getStartTime()) / 1000;

synchronized (mainSyncObj) {

logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in "

+ seconds + " seconds");

 

// Cancellation is handled in the main thread, but if the

// flow is

// paused, the main thread is paused too.

// This unpauses the flow for cancellation.

if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {

flowPaused = false;

}

 

finishedNodes.add(node);

node.getParentFlow().setUpdateTime(System.currentTimeMillis());

interrupt();

fireEventListeners(event);

}

}

//看到这里了

}

==========================================================================================

第二个呢?

@Override

public void handleEvent(Event event) {

// 看到这里了

if (!isInitialized) {

return;

}

 

if (event.getRunner() instanceof JobRunner) {

// 看到这里了

try {

if (event.getType() == Event.Type.JOB_STARTED) {

// 这里开始执行

processJobCallOnStart(event);

} else if (event.getType() == Event.Type.JOB_FINISHED) {

processJobCallOnFinish(event);

}

} catch (Throwable e) {

// Use job runner logger so user can see the issue in their job

// log

JobRunner jobRunner = (JobRunner) event.getRunner();

jobRunner.getLogger().error("Encountered error while hanlding job callback event", e);

}

} else {

logger.warn("((( Got an unsupported runner: " + event.getRunner().getClass().getName() + " )))");

}

 

}

进去debug,发现还是什么都没有执行。。。怪我咯?

==========================================================================================

JmxJobMBeanManager

这个Listener具体做了啥呢?

if (event.getType() == Event.Type.JOB_STARTED) {

// 只是增加了一个计数器的值。。。

runningJobCount.incrementAndGet();


紧接着

loader.uploadExecutableNode(node, props);

这个到底干嘛的???

final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs "

+ "(exec_id, project_id, version, flow_id, job_id, start_time, "

+ "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";

执行了这个语句先!

==========================================================================================

stop in azkaban.execapp.JobRunner.insertJobMetadata

==========================================================================================

 下面会构造一个JVM参数

/**

 * Add useful JVM arguments so it is easier to map a running Java process to

 * a flow, execution id and job

 */

private void insertJVMAargs() {

// 看到这里了

String flowName = node.getParentFlow().getFlowId();//

String jobId = node.getId();

// "-Dazkaban.flowid=World -Dazkaban.execid=17 -Dazkaban.jobid=Hello"

 

String jobJVMArgs = String.format("-Dazkaban.flowid=%s -Dazkaban.execid=%s -Dazkaban.jobid=%s", flowName,

executionId, jobId);

// "-Dazkaban.flowid=World -Dazkaban.execid=17 -Dazkaban.jobid=Hello"

 

String previousJVMArgs = props.get(JavaProcessJob.JVM_PARAMS);

jobJVMArgs += (previousJVMArgs == null) ? "" : " " + previousJVMArgs;

//存起来

logger.info("job JVM args: " + jobJVMArgs);

props.put(JavaProcessJob.JVM_PARAMS, jobJVMArgs);

}

==========================================================================================

然后发现,JobRunner实例,需要构造一个具体的job对象

这是在

job = jobtypeManager.buildJobExecutor(this.jobId, props, logger); 实现的

那么到底怎么构造这个对象呢?

鉴于这部分比较重要,我还是另开一个小节吧。


© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 859
博文 926
码字总数 640946
作品 8
南京
架构师
Azkaban的Exec Server分析 28:Execute Server的任务真正执行过程

上节,我们看到了任务放在executor的一个线程池里,下面开始分析真正的执行过程! ================================================================================ jdb azkaban.execapp...

强子哥哥
2016/04/11
287
0
Azkaban的Exec Server分析 30:JobRunner如何构造一个Job大揭秘

重点来了,JobRunner对象是有了,下面就真的要生成一个Job了,如何生成?本节揭秘! =========================================================================================== String...

强子哥哥
2016/04/11
341
0
hadoop工作流引擎azkaban

介绍 Azkaban是twitter出的一个任务调度系统,操作比Oozie要简单很多而且非常直观,提供的功能比较简单。Azkaban以Flow为执行单元进行定时调度,Flow就是预定义好的由一个或多个可存在依赖关...

wayhk
2016/05/13
104
0
Azkaban的Exec Server分析 27:Execute Server接收任务的过程分析

之前,我们说过,web server分发任务,是通过如下的URL来的 uri = "http://x.x.x.x:port/executor?action=execute&execid=12&user" // 构造URI URI uri = ExecutorApiClient.buildUri(host, ......

强子哥哥
2016/04/11
244
0
Azkaban-2.5及Plugins的安装配置

Azkaban是由LinkedIn开发的调度工具,可以用于调度Hadoop中的相互依赖的Job。有时候,在Hadoop集群中运行的Job是相互依赖的,某些任务需要顺序的执行,这种场景下使用Azkaban能够很好的解决问...

lulongzhou_llz
04/20
0
0
Azkaban的Exec Server分析 31:FlowRunner如何推动Graph的前进

现在,我们知道一个job的执行过程,那么整个flow是一个图,图是如何推进的,如何推进下一个job的执行呢?这就是本节需要解决的问题! ====================================================...

强子哥哥
2016/04/12
151
0
Azkaban的Web Server源码探究系列10 : /对应的servlet解析

如果请求的URI是"/",对应的servlet如何响应? =================================================================== ServletHolder indexRedirect = new ServletHolder(new IndexRedirectS......

强子哥哥
2016/04/08
221
0
Azkaban的线程系列 37:QueueProcessor线程的任务处理&executor存活监控

有个线程,AzkabanWebServer-QueueProcessor-Thread. 下面分析下这个线程到底干嘛的!!! ==================================================================================== stop in......

强子哥哥
2016/04/13
286
0
Azkaban的定时调度任务分析34:定时执行下文

下面说线程azkaban.trigger.TriggerManager.TriggerScannerThread是如何运作的 1)初始化 stop in azkaban.trigger.TriggerManager$TriggerScannerThread.xxx run 初始化的上下文是: java.l......

强子哥哥
2016/04/12
189
0
工作流调度器Azkaban的安装配置

Azkaban安装部署 准备工作 Azkaban Web服务器 azkaban-web-server-2.5.0.tar.gz Azkaban执行服务器 azkaban-executor-server-2.5.0.tar.gz MySQL 目前azkaban只支持 mysql,需安装mysql服务器......

ChinaUnicom110
2017/09/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

java 重写排序规则,用于代码层级排序

1.dataList 是个List<Map<String,Object>> 类型的数据,所以比较的时候是冲map中获取数据,并且数据不能为空。 2.dataList 类型是由自己定义的,new Comparator<Map<String,Object>> 也是对应......

轻量级赤影
3分钟前
0
0
分布式大型互联网企业架构!

摘要: 开发工具 1.Eclipse IDE:采用Maven项目管理,模块化。 2.代码生成:通过界面方式简单配置,自动生成相应代码,目前包括三种生成方式(增删改查):单表、一对多、树结构。生成后的代码...

明理萝
3分钟前
0
1
对MFC程序的一点逆向分析:定位按钮响应函数的办法

因为消息响应函数保存在AFX_MSGMAP_ENTRY数组中, 观察nMessage、nCode、nID、pfn利用IDA在rdata段中搜索即可, 在IDA中找到代码段基址0x401000,函数地址0x403140, 在WinDbg中运行!addre...

oready
4分钟前
0
0
阻抗匹配与史密斯(Smith)圆图基本原理

参考:http://bbs.eeworld.com.cn/thread-650695-1-1.html

whoisliang
9分钟前
0
0
maven配置文件分离

一、 简介 遇到很多次别人处理的项目,测试环境,本地开发和线上环境的配置不一样,每一次部署都要重新修改配置文件,提交审核代码,才能打包,非常不方便。 其实相信很多人都知道可以使用m...

trayvon
9分钟前
0
0
MacOS和Linux内核的区别

导读 有些人可能认为MacOS和Linux内核有相似之处,因为它们可以处理类似的命令和类似的软件。甚至有人认为苹果的MacOS是基于linux的。事实上,这两个内核的历史和特性是非常不同的。今天,我...

问题终结者
25分钟前
1
0
SpringBoot | 第八章:统一异常、数据校验处理

前言 在web应用中,请求处理时,出现异常是非常常见的。所以当应用出现各类异常时,进行异常的捕获或者二次处理(比如sql异常正常是不能外抛)是非常必要的,比如在开发对外api服务时,约定了响...

oKong
33分钟前
0
0
mysql高级

一、存储引擎 InnoDB MyISAM 比较 二、数据类型 整型 浮点数 字符串 时间和日期 三、索引 索引分类 索引的优点 索引优化 B-Tree 和 B+Tree 原理 四、查询性能优化 五、切分 垂直切分 水平切分...

丁典
54分钟前
1
0
rsync通过同步服务、系统日志、screen工具

rsync通过后台服务同步 在远程主机中建立一个rsync服务器,在服务器上配置好rsync的各种应用,然后将本机作为rsync的一个客户端连接远程的rsync服务器。 首先在A机器上建立并且配置rsync的配...

黄昏残影
今天
5
0
Spring Cloud Gateway 接口文档聚合实现

在微服务架构下,通常每个微服务都会使用Swagger来管理我们的接口文档,当微服务越来越多,接口查找管理无形中要浪费我们不少时间,毕竟懒是程序员的美德。 由于swagger2暂时不支持webflux 走...

冷冷gg
今天
137
2

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部