文档章节

Azkaban的Exec Server分析 29:JobRunner的真正执行过程(前奏)

强子1985
 强子1985
发布于 2016/04/11 19:16
字数 545
阅读 310
收藏 0

重点来了,现在看看Job Runner的运行过程!

==========================================================================================

jdb azkaban.execapp.AzkabanExecutorServer  -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in  azkaban.execapp.JobRunner.run

run

==========================================================================================

之前做了一些校验工作,然后

fireEvent(Event.create(this, Type.JOB_STARTED, null, false));

让我们来看有哪些Listeners

JobRunner-Hello-17[1] print listeners

 listeners = "[

azkaban.execapp.FlowRunner$JobRunnerEventListener@2a32e7d1, 

azkaban.execapp.event.JobCallbackManager@2b274bce, 

azkaban.execapp.jmx.JmxJobMBeanManager@53976f5c

]"

那么,这3个Listener分别做了什么事情呢?

==========================================================================================

第一个,不好意思,这里是start,所以下面的代码都没有执行!

private class JobRunnerEventListener implements EventListener {

public JobRunnerEventListener() {

}

 

@Override

public synchronized void handleEvent(Event event) {

//看到这里了

JobRunner runner = (JobRunner) event.getRunner();

//看到这里了

if (event.getType() == Type.JOB_STATUS_CHANGED) {

updateFlow();

} else if (event.getType() == Type.JOB_FINISHED) {

ExecutableNode node = runner.getNode();

long seconds = (node.getEndTime() - node.getStartTime()) / 1000;

synchronized (mainSyncObj) {

logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in "

+ seconds + " seconds");

 

// Cancellation is handled in the main thread, but if the

// flow is

// paused, the main thread is paused too.

// This unpauses the flow for cancellation.

if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {

flowPaused = false;

}

 

finishedNodes.add(node);

node.getParentFlow().setUpdateTime(System.currentTimeMillis());

interrupt();

fireEventListeners(event);

}

}

//看到这里了

}

==========================================================================================

第二个呢?

@Override

public void handleEvent(Event event) {

// 看到这里了

if (!isInitialized) {

return;

}

 

if (event.getRunner() instanceof JobRunner) {

// 看到这里了

try {

if (event.getType() == Event.Type.JOB_STARTED) {

// 这里开始执行

processJobCallOnStart(event);

} else if (event.getType() == Event.Type.JOB_FINISHED) {

processJobCallOnFinish(event);

}

} catch (Throwable e) {

// Use job runner logger so user can see the issue in their job

// log

JobRunner jobRunner = (JobRunner) event.getRunner();

jobRunner.getLogger().error("Encountered error while hanlding job callback event", e);

}

} else {

logger.warn("((( Got an unsupported runner: " + event.getRunner().getClass().getName() + " )))");

}

 

}

进去debug,发现还是什么都没有执行。。。怪我咯?

==========================================================================================

JmxJobMBeanManager

这个Listener具体做了啥呢?

if (event.getType() == Event.Type.JOB_STARTED) {

// 只是增加了一个计数器的值。。。

runningJobCount.incrementAndGet();


紧接着

loader.uploadExecutableNode(node, props);

这个到底干嘛的???

final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs "

+ "(exec_id, project_id, version, flow_id, job_id, start_time, "

+ "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";

执行了这个语句先!

==========================================================================================

stop in azkaban.execapp.JobRunner.insertJobMetadata

==========================================================================================

 下面会构造一个JVM参数

/**

 * Add useful JVM arguments so it is easier to map a running Java process to

 * a flow, execution id and job

 */

private void insertJVMAargs() {

// 看到这里了

String flowName = node.getParentFlow().getFlowId();//

String jobId = node.getId();

// "-Dazkaban.flowid=World -Dazkaban.execid=17 -Dazkaban.jobid=Hello"

 

String jobJVMArgs = String.format("-Dazkaban.flowid=%s -Dazkaban.execid=%s -Dazkaban.jobid=%s", flowName,

executionId, jobId);

// "-Dazkaban.flowid=World -Dazkaban.execid=17 -Dazkaban.jobid=Hello"

 

String previousJVMArgs = props.get(JavaProcessJob.JVM_PARAMS);

jobJVMArgs += (previousJVMArgs == null) ? "" : " " + previousJVMArgs;

//存起来

logger.info("job JVM args: " + jobJVMArgs);

props.put(JavaProcessJob.JVM_PARAMS, jobJVMArgs);

}

==========================================================================================

然后发现,JobRunner实例,需要构造一个具体的job对象

这是在

job = jobtypeManager.buildJobExecutor(this.jobId, props, logger); 实现的

那么到底怎么构造这个对象呢?

鉴于这部分比较重要,我还是另开一个小节吧。


© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 875
博文 1121
码字总数 826942
作品 8
南京
架构师
私信 提问
Azkaban的Exec Server分析 28:Execute Server的任务真正执行过程

上节,我们看到了任务放在executor的一个线程池里,下面开始分析真正的执行过程! ================================================================================ jdb azkaban.execapp...

强子哥哥
2016/04/11
287
0
Azkaban的Exec Server分析 30:JobRunner如何构造一个Job大揭秘

重点来了,JobRunner对象是有了,下面就真的要生成一个Job了,如何生成?本节揭秘! =========================================================================================== String...

强子哥哥
2016/04/11
341
0
Azkaban 任务调度系统(安装搭建)

无论是在业务开发还是在大数据开发中,脚本都是必不可少的存在,在初期我们会使用crontab来解决问题,那么当发现规模变大监控需求可视化需求的到来Crontab已经显然满足不了需求,抱着一颗解决大数...

喵了_个咪
07/26
0
0
【Azkaban搭建】---Azkaban 3.25.0搭建细则 超实用

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/LHWorldBlog/article/details/84305321 drop table active_sla;drop table execution_flows;drop table execu......

LHWorldBlog
11/02
0
0
hadoop工作流引擎azkaban

介绍 Azkaban是twitter出的一个任务调度系统,操作比Oozie要简单很多而且非常直观,提供的功能比较简单。Azkaban以Flow为执行单元进行定时调度,Flow就是预定义好的由一个或多个可存在依赖关...

wayhk
2016/05/13
104
0

没有更多内容

加载失败,请刷新页面

加载更多

NEO 节点介绍

全节点(full nodes)是存储 NEO 区块链全部数据的节点,通过 P2P 的方式与区块链网络连接,在区块链网络中,所有的全节点都是平等的,既充当客户端又充当服务器。 NEO 有两个全节点程序: ...

NEO-FANS
13分钟前
0
0
内网穿透大杀器--EarthWorm

0x00 前言 如果感觉本文对你有帮助,请在文章末尾点个赞,谢谢表哥们支持! 当你在内网渗透,并且拿下一台机器的权限时,你是不是觉得已经算是一次完整的渗透了? 不来一次内网漫游,渗透是不...

刀剑如梦
18分钟前
2
0
PiggyMetrics分布式框架

https://github.com/sqshq/PiggyMetrics

丁建祥
20分钟前
0
0
零距离接触阿里云时序时空数据库TSDB

概述 最近,Amazon新推出了完全托管的时间序列数据库Timestream,可见,各大厂商对未来时间序列数据库的重视与日俱增。 阿里云TSDB是阿里巴巴集团数据库事业部研发的一款高性能分布式时序时空...

阿里云云栖社区
28分钟前
4
0
OkHttpClient封装

import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Map; import java.util.TreeMap; import java.util.Map.Entry; import o......

尘叙缘
30分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部