文档章节

Azkaban的线程系列 37:QueueProcessor线程的任务处理&executor存活监控

强子1985
 强子1985
发布于 2016/04/13 13:32
字数 470
阅读 394
收藏 0

有个线程,AzkabanWebServer-QueueProcessor-Thread.

下面分析下这个线程到底干嘛的!!!

====================================================================================

stop in azkaban.executor.ExecutorManager$QueueProcessorThread.run

====================================================================================

public void run() {

// Loops till QueueProcessorThread is shutdown

while (!shutdown) {// 一直循环

synchronized (this) {//

try {

// start processing queue if active, other wait for

// sometime

if (isActive) {

// 开始处理任务

processQueuedFlows(activeExecutorRefreshWindowInMilisec,

activeExecutorRefreshWindowInFlows);

}

wait(QUEUE_PROCESSOR_WAIT_IN_MS);// 等待1秒

} catch (Exception e) {

logger.error("QueueProcessorThread Interrupted. Probably to shut down.", e);

}

}

}

}

所以接下来要看这个processQueuedFlows干嘛的?

====================================================================================

// process flow with current snapshot of activeExecutors

selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));

从这里来看,就是把任务分发到具体的executor上,

不过这个函数其实还做了其它的事情!那就是executor的存活监控

// if we have dispatched more than maxContinuousFlowProcessed or

// It has been more then activeExecutorsRefreshWindow millisec

// since we

// refreshed

// 满足上面的条件就探测executor的存活性

if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow

|| currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {

// Refresh executorInfo for all activeExecutors

refreshExecutors();

lastExecutorRefreshTime = currentTime;

currentContinuousFlowProcessed = 0;

}


所以接下来,我们来看看refreshExecutors的代码实现!

===================================================================================

jdb azkaban.webapp.AzkabanWebServer -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in  azkaban.executor.ExecutorManager.refreshExecutors

==============================================================================

其中刷新周期

azkProps.getLong(AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000),

private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS = "azkaban.activeexecutor.refresh.milisecinterval";

默认为50秒刷1次更新请求


其实是通过一个线程池来跑的,

Future<String> fetchExecutionInfo = executorInforRefresherService.submit(new Callable<String>() {

@Override

public String call() throws Exception {

return callExecutorForJsonString(executor.getHost(), executor.getPort(), "/serverStatistics",

null);

}

});

那么,这个线程池的个数呢?


executorInforRefresherService = Executors

.newFixedThreadPool(azkProps.getInt(AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));

private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS = "azkaban.executorinfo.refresh.maxThreads";

默认就是5个了,好,回来看怎么执行!

==============================================================================

发出HTTP请求 /serverStatistics

然后结果更新到executor,所以重点就是 /serverStatistics在executor中的执行过程!

==============================================================================

root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");



jdb azkaban.execapp.AzkabanExecutorServer  -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in azkaban.execapp.ServerStatisticsServlet.doGet

stop in azkaban.execapp.ServerStatisticsServlet.populateStatistics


run

其实就是生成本地的信息汇总

1)

fillRemainingMemoryPercent(stats);

azkaban.execapp.ServerStatisticsServlet.fillRemainingMemoryPercent

 

2)

fillRemainingFlowCapacityAndLastDispatchedTime(stats);

stop in azkaban.execapp.ServerStatisticsServlet.fillRemainingFlowCapacityAndLastDispatchedTime

3)

fillCpuUsage(stats);

==============================================================================

其实就是返回这3种信息

对于web server来说,5秒内拿到返回内容后,更新本地消息

  public void setExecutorInfo(ExecutorInfo info) {

    this.cachedExecutorStats = info;

    this.lastStatsUpdatedTime = new Date();

  }

以此来作为存活监控!

© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 875
博文 1090
码字总数 799329
作品 8
南京
架构师
私信 提问
Azkaban的Web Server源码探究系列24: 一次性执行execute任务取出处理

提交之后,很不幸,有个问题如下: 抓包分析: 请求和响应分别如下: 所以接下来需要对这个URL进行debug ============================================================================== ...

强子哥哥
2016/04/10
212
0
java多线程系列:Executors框架

目录 Executor接口介绍 ExecutorService常用接口介绍 创建线程池的一些方法介绍 3.1 newFixedThreadPool方法 3.2 newCachedThreadPool方法 3.3 newScheduledThreadPool方法 疑问解答 4.1. R...

勿妄
06/12
0
0
工作流调度器Azkaban的安装配置

Azkaban安装部署 准备工作 Azkaban Web服务器 azkaban-web-server-2.5.0.tar.gz Azkaban执行服务器 azkaban-executor-server-2.5.0.tar.gz MySQL 目前azkaban只支持 mysql,需安装mysql服务器......

ChinaUnicom110
2017/09/29
0
0
java并发库之Executors常用的创建ExecutorService的几个方法说明

一、线程池的创建 我们可以通过ThreadPoolExecutor来创建一个线程池。 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);......

文星
2011/10/16
0
5
Executors常用的创建ExecutorService的方法的简单说明

一、线程池的创建 我们可以通过ThreadPoolExecutor来创建一个线程池。 Java代码 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQ......

引鸩怼孑
2015/05/05
0
0

没有更多内容

加载失败,请刷新页面

加载更多

sed插入和附加文本基础使用

对于编辑器来说,在数据中增加行算是很基本的操作吧,sed有以下两个操作: 插入(insert)命令(i)会在指定行前增加一个新行 附加(append)命令(a)会在指定行后增加一个新行 还是用下面的文本来测...

woshixin
15分钟前
0
0
HIVE数据倾斜总结

在做Shuffle阶段的优化过程中,遇到了数据倾斜的问题,造成了对一些情况下优化效果不明显。主要是因为在Job完成后的所得到的Counters是整个Job的总和,优化是基于这些Counters得出的平均值,...

瑞查德-Jack
26分钟前
0
0
Pure-ftpd搭建FTP

12月11日任务 15.4 xshell使用xftp传输文件 15.5 使用pure-ftpd搭建ftp服务 使用pure-ftpd搭建FTP服务 轻量的ftp软件 安装pure-ftpd并修改配置文件 # pure-ftpd为epel扩展库里的软件[root...

robertt15
35分钟前
4
0
开源 serverless 产品原理剖析(二) - Fission

背景 本文是开源 serverless 产品原理剖析系列文章的第二篇,关于 serverless 背景知识的介绍可参考文章开源 serverless 产品原理剖析(一) - Kubeless,这里不再赘述。 Fission 简介 Fiss...

阿里云官方博客
41分钟前
2
0
Android面试整理(附答案)

面试,无非都是问上面这些问题(挺多的 - -!),聘请中高级的安卓开发会往深的去问,并且会问一延伸二。以下我先提出几点重点,是面试官基本必问的问题,请一定要去了解! 基础知识 – 四大组...

终端研发部
46分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部