文档章节

Azkaban的Exec Server分析 29:JobRunner的真正执行过程(前奏)

强子1985
 强子1985
发布于 2016/04/11 19:16
字数 545
阅读 301
收藏 0

重点来了,现在看看Job Runner的运行过程!

==========================================================================================

jdb azkaban.execapp.AzkabanExecutorServer  -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in  azkaban.execapp.JobRunner.run

run

==========================================================================================

之前做了一些校验工作,然后

fireEvent(Event.create(this, Type.JOB_STARTED, null, false));

让我们来看有哪些Listeners

JobRunner-Hello-17[1] print listeners

 listeners = "[

azkaban.execapp.FlowRunner$JobRunnerEventListener@2a32e7d1, 

azkaban.execapp.event.JobCallbackManager@2b274bce, 

azkaban.execapp.jmx.JmxJobMBeanManager@53976f5c

]"

那么,这3个Listener分别做了什么事情呢?

==========================================================================================

第一个,不好意思,这里是start,所以下面的代码都没有执行!

private class JobRunnerEventListener implements EventListener {

public JobRunnerEventListener() {

}

 

@Override

public synchronized void handleEvent(Event event) {

//看到这里了

JobRunner runner = (JobRunner) event.getRunner();

//看到这里了

if (event.getType() == Type.JOB_STATUS_CHANGED) {

updateFlow();

} else if (event.getType() == Type.JOB_FINISHED) {

ExecutableNode node = runner.getNode();

long seconds = (node.getEndTime() - node.getStartTime()) / 1000;

synchronized (mainSyncObj) {

logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in "

+ seconds + " seconds");

 

// Cancellation is handled in the main thread, but if the

// flow is

// paused, the main thread is paused too.

// This unpauses the flow for cancellation.

if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {

flowPaused = false;

}

 

finishedNodes.add(node);

node.getParentFlow().setUpdateTime(System.currentTimeMillis());

interrupt();

fireEventListeners(event);

}

}

//看到这里了

}

==========================================================================================

第二个呢?

@Override

public void handleEvent(Event event) {

// 看到这里了

if (!isInitialized) {

return;

}

 

if (event.getRunner() instanceof JobRunner) {

// 看到这里了

try {

if (event.getType() == Event.Type.JOB_STARTED) {

// 这里开始执行

processJobCallOnStart(event);

} else if (event.getType() == Event.Type.JOB_FINISHED) {

processJobCallOnFinish(event);

}

} catch (Throwable e) {

// Use job runner logger so user can see the issue in their job

// log

JobRunner jobRunner = (JobRunner) event.getRunner();

jobRunner.getLogger().error("Encountered error while hanlding job callback event", e);

}

} else {

logger.warn("((( Got an unsupported runner: " + event.getRunner().getClass().getName() + " )))");

}

 

}

进去debug,发现还是什么都没有执行。。。怪我咯?

==========================================================================================

JmxJobMBeanManager

这个Listener具体做了啥呢?

if (event.getType() == Event.Type.JOB_STARTED) {

// 只是增加了一个计数器的值。。。

runningJobCount.incrementAndGet();


紧接着

loader.uploadExecutableNode(node, props);

这个到底干嘛的???

final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs "

+ "(exec_id, project_id, version, flow_id, job_id, start_time, "

+ "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";

执行了这个语句先!

==========================================================================================

stop in azkaban.execapp.JobRunner.insertJobMetadata

==========================================================================================

 下面会构造一个JVM参数

/**

 * Add useful JVM arguments so it is easier to map a running Java process to

 * a flow, execution id and job

 */

private void insertJVMAargs() {

// 看到这里了

String flowName = node.getParentFlow().getFlowId();//

String jobId = node.getId();

// "-Dazkaban.flowid=World -Dazkaban.execid=17 -Dazkaban.jobid=Hello"

 

String jobJVMArgs = String.format("-Dazkaban.flowid=%s -Dazkaban.execid=%s -Dazkaban.jobid=%s", flowName,

executionId, jobId);

// "-Dazkaban.flowid=World -Dazkaban.execid=17 -Dazkaban.jobid=Hello"

 

String previousJVMArgs = props.get(JavaProcessJob.JVM_PARAMS);

jobJVMArgs += (previousJVMArgs == null) ? "" : " " + previousJVMArgs;

//存起来

logger.info("job JVM args: " + jobJVMArgs);

props.put(JavaProcessJob.JVM_PARAMS, jobJVMArgs);

}

==========================================================================================

然后发现,JobRunner实例,需要构造一个具体的job对象

这是在

job = jobtypeManager.buildJobExecutor(this.jobId, props, logger); 实现的

那么到底怎么构造这个对象呢?

鉴于这部分比较重要,我还是另开一个小节吧。


© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 862
博文 941
码字总数 648349
作品 8
南京
架构师
Azkaban的Exec Server分析 28:Execute Server的任务真正执行过程

上节,我们看到了任务放在executor的一个线程池里,下面开始分析真正的执行过程! ================================================================================ jdb azkaban.execapp...

强子哥哥
2016/04/11
287
0
Azkaban的Exec Server分析 30:JobRunner如何构造一个Job大揭秘

重点来了,JobRunner对象是有了,下面就真的要生成一个Job了,如何生成?本节揭秘! =========================================================================================== String...

强子哥哥
2016/04/11
341
0
Azkaban 任务调度系统(安装搭建)

无论是在业务开发还是在大数据开发中,脚本都是必不可少的存在,在初期我们会使用crontab来解决问题,那么当发现规模变大监控需求可视化需求的到来Crontab已经显然满足不了需求,抱着一颗解决大数...

喵了_个咪
07/26
0
0
hadoop工作流引擎azkaban

介绍 Azkaban是twitter出的一个任务调度系统,操作比Oozie要简单很多而且非常直观,提供的功能比较简单。Azkaban以Flow为执行单元进行定时调度,Flow就是预定义好的由一个或多个可存在依赖关...

wayhk
2016/05/13
104
0
Azkaban的Exec Server分析 27:Execute Server接收任务的过程分析

之前,我们说过,web server分发任务,是通过如下的URL来的 uri = "http://x.x.x.x:port/executor?action=execute&execid=12&user" // 构造URI URI uri = ExecutorApiClient.buildUri(host, ......

强子哥哥
2016/04/11
244
0

没有更多内容

加载失败,请刷新页面

加载更多

Web系统大规模并发:电商秒杀与抢购

一、大规模并发带来的挑战 在过去的工作中,我曾经面对过5w每秒的高并发秒杀功能,在这个过程中,整个Web系统遇到了很多的问题和挑战。如果Web系统不做针对性的优化,会轻而易举地陷入到异常...

xtof
今天
1
0
代码质量管理平台-sonarqube

在工作中,往往开发的时候会不怎么注重代码质量的人很多,存在着很多的漏洞和隐患等问题,sonarqube可以进行代码质量的审核,而且十分的残酷。。。。。接下来我们说下怎么安装 进入官网下载:...

落叶清风
今天
6
0
在Ubuntu安装和配置Sphinx

Ubuntu系统默认是配置有sphinx的,先检查一下,别多此一举。。。。。 在开始本指南之前,您需要: 一个Ubuntu 16.04服务器。 sudo的一个非root用户,您可以通过以下设置本教程 。 安装在服务...

阿锋zxf
今天
1
0
Qt编写输入法V2018超级终结版

对于qt嵌入式linux开发人员来说,输入法一直是个鸡肋问题,要么不支持实体键盘同步,要么不能汉字输入,要么不支持网页输入等,这几年通过陆续接触大量的各种输入法应用场景客户,得到真实需...

飞扬青云
今天
2
0
TypeScript基础入门之高级类型的多态的 this类型

转发 TypeScript基础入门之高级类型的多态的 this类型 高级类型 多态的this类型 多态的this类型表示的是某个包含类或接口的子类型。 这被称做F-bounded多态性。 它能很容易的表现连贯接口间的...

durban
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部