文档章节

Azkaban的线程系列 37:QueueProcessor线程的任务处理&executor存活监控

强子哥哥
 强子哥哥
发布于 2016/04/13 13:32
字数 470
阅读 369
收藏 0
点赞 1
评论 0

有个线程,AzkabanWebServer-QueueProcessor-Thread.

下面分析下这个线程到底干嘛的!!!

====================================================================================

stop in azkaban.executor.ExecutorManager$QueueProcessorThread.run

====================================================================================

public void run() {

// Loops till QueueProcessorThread is shutdown

while (!shutdown) {// 一直循环

synchronized (this) {//

try {

// start processing queue if active, other wait for

// sometime

if (isActive) {

// 开始处理任务

processQueuedFlows(activeExecutorRefreshWindowInMilisec,

activeExecutorRefreshWindowInFlows);

}

wait(QUEUE_PROCESSOR_WAIT_IN_MS);// 等待1秒

} catch (Exception e) {

logger.error("QueueProcessorThread Interrupted. Probably to shut down.", e);

}

}

}

}

所以接下来要看这个processQueuedFlows干嘛的?

====================================================================================

// process flow with current snapshot of activeExecutors

selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));

从这里来看,就是把任务分发到具体的executor上,

不过这个函数其实还做了其它的事情!那就是executor的存活监控

// if we have dispatched more than maxContinuousFlowProcessed or

// It has been more then activeExecutorsRefreshWindow millisec

// since we

// refreshed

// 满足上面的条件就探测executor的存活性

if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow

|| currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {

// Refresh executorInfo for all activeExecutors

refreshExecutors();

lastExecutorRefreshTime = currentTime;

currentContinuousFlowProcessed = 0;

}


所以接下来,我们来看看refreshExecutors的代码实现!

===================================================================================

jdb azkaban.webapp.AzkabanWebServer -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in  azkaban.executor.ExecutorManager.refreshExecutors

==============================================================================

其中刷新周期

azkProps.getLong(AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000),

private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS = "azkaban.activeexecutor.refresh.milisecinterval";

默认为50秒刷1次更新请求


其实是通过一个线程池来跑的,

Future<String> fetchExecutionInfo = executorInforRefresherService.submit(new Callable<String>() {

@Override

public String call() throws Exception {

return callExecutorForJsonString(executor.getHost(), executor.getPort(), "/serverStatistics",

null);

}

});

那么,这个线程池的个数呢?


executorInforRefresherService = Executors

.newFixedThreadPool(azkProps.getInt(AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));

private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS = "azkaban.executorinfo.refresh.maxThreads";

默认就是5个了,好,回来看怎么执行!

==============================================================================

发出HTTP请求 /serverStatistics

然后结果更新到executor,所以重点就是 /serverStatistics在executor中的执行过程!

==============================================================================

root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");



jdb azkaban.execapp.AzkabanExecutorServer  -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in azkaban.execapp.ServerStatisticsServlet.doGet

stop in azkaban.execapp.ServerStatisticsServlet.populateStatistics


run

其实就是生成本地的信息汇总

1)

fillRemainingMemoryPercent(stats);

azkaban.execapp.ServerStatisticsServlet.fillRemainingMemoryPercent

 

2)

fillRemainingFlowCapacityAndLastDispatchedTime(stats);

stop in azkaban.execapp.ServerStatisticsServlet.fillRemainingFlowCapacityAndLastDispatchedTime

3)

fillCpuUsage(stats);

==============================================================================

其实就是返回这3种信息

对于web server来说,5秒内拿到返回内容后,更新本地消息

  public void setExecutorInfo(ExecutorInfo info) {

    this.cachedExecutorStats = info;

    this.lastStatsUpdatedTime = new Date();

  }

以此来作为存活监控!

© 著作权归作者所有

共有 人打赏支持
强子哥哥

强子哥哥

粉丝 859
博文 551
码字总数 640910
作品 8
南京
架构师
Azkaban的Web Server源码探究系列24: 一次性执行execute任务取出处理

提交之后,很不幸,有个问题如下: 抓包分析: 请求和响应分别如下: 所以接下来需要对这个URL进行debug ============================================================================== ...

强子哥哥
2016/04/10
212
0
工作流调度器Azkaban的安装配置

Azkaban安装部署 准备工作 Azkaban Web服务器 azkaban-web-server-2.5.0.tar.gz Azkaban执行服务器 azkaban-executor-server-2.5.0.tar.gz MySQL 目前azkaban只支持 mysql,需安装mysql服务器......

ChinaUnicom110
2017/09/29
0
0
java多线程系列:Executors框架

目录 Executor接口介绍 ExecutorService常用接口介绍 创建线程池的一些方法介绍 3.1 newFixedThreadPool方法 3.2 newCachedThreadPool方法 3.3 newScheduledThreadPool方法 疑问解答 4.1. R...

勿妄
06/12
0
0
java并发库之Executors常用的创建ExecutorService的几个方法说明

一、线程池的创建 我们可以通过ThreadPoolExecutor来创建一个线程池。 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);......

文星
2011/10/16
0
5
Executors常用的创建ExecutorService的方法的简单说明

一、线程池的创建 我们可以通过ThreadPoolExecutor来创建一个线程池。 Java代码 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQ......

引鸩怼孑
2015/05/05
0
0
读书笔记之《Java并发编程的艺术》-线程池和Executor的子孙们

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
0
1
Java多线程二(Java线程池的分析和使用)

引言 合理利用线程池能够带来三个好处。 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。 ...

dkz
2013/03/19
0
0
Azkaban的Exec Server分析 27:Execute Server接收任务的过程分析

之前,我们说过,web server分发任务,是通过如下的URL来的 uri = "http://x.x.x.x:port/executor?action=execute&execid=12&user" // 构造URI URI uri = ExecutorApiClient.buildUri(host, ......

强子哥哥
2016/04/11
244
0
Azkaban的Exec Server分析 28:Execute Server的任务真正执行过程

上节,我们看到了任务放在executor的一个线程池里,下面开始分析真正的执行过程! ================================================================================ jdb azkaban.execapp...

强子哥哥
2016/04/11
287
0
从构建分布式秒杀系统出发,聊聊线程池

  【IT168 评论】从0到1构建分布式秒杀系统案例的代码已经全部上传至码云,文章也被分发到各个平台。其中也收到了不少小伙伴喜欢和反馈,有网友如是说:   说实话,能用上的不多,中小企...

博客园
05/23
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

32.filter表案例 nat表应用 (iptables)

10.15 iptables filter表案例 10.16/10.17/10.18 iptables nat表应用 10.15 iptables filter表案例: ~1. 写一个具体的iptables小案例,需求是把80端口、22端口、21 端口放行。但是,22端口我...

王鑫linux
今天
0
0
shell中的函数&shell中的数组&告警系统需求分析

20.16/20.17 shell中的函数 20.18 shell中的数组 20.19 告警系统需求分析

影夜Linux
今天
0
0
Linux网络基础、Linux防火墙

Linux网络基础 ip addr 命令 :查看网口信息 ifconfig命令:查看网口信息,要比ip addr更明了一些 centos 7默认没安装ifconfig命令,可以使用yum install -y net-tools命令来安装。 ifconfig...

李超小牛子
今天
1
0
[机器学习]回归--Decision Tree Regression

CART决策树又称分类回归树,当数据集的因变量为连续性数值时,该树算法就是一个回归树,可以用叶节点观察的均值作为预测值;当数据集的因变量为离散型数值时,该树算法就是一个分类树,可以很...

wangxuwei
昨天
1
0
Redis做分布式无锁CAS的问题

因为Redis本身是单线程的,具备原子性,所以可以用来做分布式无锁的操作,但会有一点小问题。 public interface OrderService { public String getOrderNo();} public class OrderRe...

算法之名
昨天
9
0
143. Reorder List - LeetCode

Question 143. Reorder List Solution 题目大意:给一个链表,将这个列表分成前后两部分,后半部分反转,再将这两分链表的节点交替连接成一个新的链表 思路 :先将链表分成前后两部分,将后部...

yysue
昨天
1
0
数据结构与算法1

第一个代码,描述一个被称为BankAccount的类,该类模拟了银行中的账户操作。程序建立了一个开户金额,显示金额,存款,取款并显示余额。 主要的知识点联系为类的含义,构造函数,公有和私有。...

沉迷于编程的小菜菜
昨天
1
0
从为什么别的队伍总比你的快说起

在机场候检排队的时候,大多数情况下,别的队伍都要比自己所在的队伍快,并常常懊悔当初怎么没去那个队。 其实,最快的队伍只能有一个,而排队之前并不知道那个队快。所以,如果有六个队伍你...

我是菜鸟我骄傲
昨天
1
0
分布式事务常见的解决方案

随着互联网的发展,越来越多的多服务相互之间的调用,这时候就产生了一个问题,在单项目情况下很容易实现的事务控制(通过数据库的acid控制),变得不那么容易。 这时候就产生了多种方案: ...

小海bug
昨天
3
0
python从零学——scrapy初体验

python从零学——scrapy初体验 近日因为一些事情,需要从网上爬取一些东西,故而想通过使用爬虫来顺便学习下强大的python。现将一些学习中遇到的问题记录下来,以便日后查询 1. 开发环境的准...

咾咔叽
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部