文档章节

Azkaban的定时调度任务分析33:定时执行上文

强子1985
 强子1985
发布于 2016/04/12 14:04
字数 468
阅读 257
收藏 0

之前,我们研究了执行1次的任务,但是实际生产环境中,我们可能需要很多个定时调度的需求,

所以,接下来,我们来研究定时调度的原理!

========================================================================

点开这个按钮后,出现新的界面如下:

填写自己的数值后,提交Schedule按钮后,观察URL

对应的URL===>

.POST /schedule HTTP/1.1

看来需要去查阅一个新的URL Servlet

root.addServlet(new ServletHolder(new ScheduleServlet()), "/schedule");

========================================================================================

jdb azkaban.webapp.AzkabanWebServer -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in azkaban.webapp.servlet.ScheduleServlet.doPost

stop in  azkaban.webapp.servlet.ScheduleServlet.ajaxScheduleFlow

stop in azkaban.scheduler.ScheduleManager.scheduleFlow

run

========================================================================================

最后执行的是ExecutorServlet的下面的方法

 @Override

  protected void handlePost(HttpServletRequest req, HttpServletResponse resp,

      Session session) throws ServletException, IOException {

  //看到这里了

    if (hasParam(req, "ajax")) {

      handleAJAXAction(req, resp, session);

    }

  }

经过一番参数解析后,后面会有这样的一段代码

Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready",

firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(),

firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);

logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId

+ ")" + "].");

这个倒是需要仔细研究下~

========================================================================================

正如单次运行是放入一个queue中一样,对于循环执行的任务,放入一个线程的队列里

public class TriggerManager extends EventHandler implements

里面有一个线程

  private final TriggerScannerThread runnerThread;

任务是如何插入到这个线程的呢?

 

 

 

原因就在于,之前有个函数

public void insertTrigger(Trigger t) throws TriggerManagerException {

    synchronized (syncObj) {

      try {

        triggerLoader.addTrigger(t);

      } catch (TriggerLoaderException e) {

        throw new TriggerManagerException(e);

      }

      runnerThread.addTrigger(t);//看这里

      triggerIdMap.put(t.getTriggerId(), t);

    }

  }

public void addTrigger(Trigger t) {

      synchronized (syncObj) {

        t.updateNextCheckTime();

        triggers.add(t);

      }

}

 private class TriggerScannerThread extends Thread {

    private BlockingQueue<Trigger> triggers;

 

这样,一个调度任务就插入到了一个BlockingQueue,

 

当然,如果有兴趣的话,还可以看这个变量的初始化部分:

public TriggerScannerThread(long scannerInterval) {

      triggers = new PriorityBlockingQueue<Trigger>(1, new TriggerComparator());

      justFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();

      this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");

      this.scannerInterval = scannerInterval;

    }

原来是一个PriorityBlockingQueue,这样排序都不用自己做了,很方便!

------------------

先看看这个线程什么上下文初始化的?

下文再说!

 


© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 875
博文 1094
码字总数 801281
作品 8
南京
架构师
私信 提问
强子哥哥/MyAzkaban

#MyAzkaban Azkaban是由Linkedin开源的一个批量工作流任务调度器,用于在一个工作流内以一个特定的顺序运行一组工作和流程。 Azkaban定义了一种KV文件格式来建立任务之间的依赖关系,并提供一...

强子哥哥
2016/04/15
0
0
我的第2个开源项目:MyAzkaban-3.0.0

网址: http://git.oschina.net/qiangzigege/MyAzkaban http://azkaban.github.io/azkaban/docs/latest 官方例子: https://github.com/joeharris76/azkabanexamples 为什么要做这个东西? ......

强子哥哥
2016/04/14
888
0
#研发解决方案#分布式并行计算调度和管理系统Summoner

郑昀 创建于2015/11/10 最后更新于2015/11/12 关键词:佣金计算、定时任务、数据抽取、数据清洗、数据计算、Java、Redis、MySQL、Zookeeper、azkaban2、oozie、mesos Summoner 是国玺部门推出...

旁观者-郑昀
2016/01/08
144
0
Azkaban的定时调度任务分析34:定时执行下文

下面说线程azkaban.trigger.TriggerManager.TriggerScannerThread是如何运作的 1)初始化 stop in azkaban.trigger.TriggerManager$TriggerScannerThread.xxx run 初始化的上下文是: java.l......

强子哥哥
2016/04/12
189
0
批量工作流任务调度器--MyAzkaban

azkaban是由Linkedin开源的一个批量工作流任务调度器,在研究3.0.0版本的过程中,发现有一些问题:比如 定时调度参数下拉框中秒选项默认不显示、根据工作类型生成实例时有bug。所以创建此项目...

强子哥哥
2016/04/15
925
2

没有更多内容

加载失败,请刷新页面

加载更多

十万个为什么之为什么大家都说dubbo

Dubbo是什么? 使用背景 dubbo为什么这么流行, 为什么大家都这么喜欢用dubbo; 通过了解分布式开发了解到, 为适应访问量暴增,业务拆分后, 子应用部署在多台服务器上,而多台服务器通过可以通过d...

尾生
15分钟前
0
0
Docker搭建代码质量检测平台-SonarQube(中文版)

Sonar是一个用于代码质量管理的开源平台,用于管理源代码的质量,可以从七个维度检测代码质量。通过插件形式,可以支持包括java,C#,C/C++,PL/SQL,Cobol,JavaScrip,Groovy等等二十几种编程语言...

Jacktanger
21分钟前
0
0
Windows / Linux / MacOS 设置代理上网的方法汇总

本文汇总了 Windows / Linux / MacOS 设置代理上网的各种方法,总结如下: 1、设置系统代理(Windows、Linux、MacOS) 2、设置代理插件(Chrome、Chromium、Firefox、Opera、QQ等浏览器) 3、...

sunboy2050
昨天
2
0
自定义 Maven 的 repositories

有时,应用中需要一些比较新的依赖,而这些依赖并没有正式发布,还是处于milestone或者是snapshot阶段,并不能从中央仓库或者镜像站上下载到。此时,就需要 自定义Maven的<repositories>。 ...

waylau
昨天
2
0
徒手写一个es6代码库

mkdir democd demonpm initnpm install -g babelnpm install -g babel-clinpm install --save-dev babel-preset-es2015-node5 在项目目录创建两个文件夹 functional-playground ......

lilugirl
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部