文档章节

Quartz任务调度源码分析

s
 sgkbkega
发布于 2016/09/08 09:41
字数 961
阅读 39
收藏 0

从源码分析中可以看出,任务的整个调度过程为,初始化线程池,及调度器QuartzScheduler,然后由线程池去执行QuartzSchedulerThread,将触发器任务(job与触发器)添加到存储器(TreeSet,timeTrriger)中,然后启动调度器,QuartzSchedulerThread从timeTrriger去除待触发的任务,并包装成TriggerFiredBundle,然后由JobRunShellFactory 
创建TriggerFiredBundle的执行线程JobRunShell, 调度执行通过线程池SimpleThreadPool去执行JobRunShell,而JobRunShell执行的就是job.execute(JobExecutionContext context)。Quartz主要中的集合类有ArrayList,LinkedList,HashMap,TreeSet(TreeMap);之所以用到上面四个集合类,主要用到集合的如下特点:ArrayList访问速度快,LinkedList添加删除元素快;HashMap添加删除快,TreeSet访问速度快。

触发任务创建工厂类 

Java代码 下载 

  1. public class JTAJobRunShellFactory  
  2.     implements JobRunShellFactory  
  3. {  
  4.     public void initialize(Scheduler sched)  
  5.         throws SchedulerConfigException  
  6.     {  
  7.         scheduler = sched;  
  8.     }  
  9.     public JobRunShell createJobRunShell(TriggerFiredBundle bundle)  
  10.         throws SchedulerException  
  11.     {  
  12.         return new JTAJobRunShell(scheduler, bundle);  
  13.     }  
  14.     private Scheduler scheduler;  
  15. }  


//触发任务运行类 

Java代码 下载 

  1. public class JTAJobRunShell extends JobRunShell  
  2. {  
  3.   
  4.     public JTAJobRunShell(Scheduler scheduler, TriggerFiredBundle bndle)  
  5.     {  
  6.         super(scheduler, bndle);  
  7.         transactionTimeout = null;  
  8.     }  
  9. }  
  10. public class JobRunShell extends SchedulerListenerSupport  
  11.     implements Runnable  
  12. {  
  13. public JobRunShell(Scheduler scheduler, TriggerFiredBundle bndle)  
  14.     {  
  15.         jec = null;  
  16.         qs = null;  
  17.         firedTriggerBundle = null;  
  18.         this.scheduler = null;  
  19.         shutdownRequested = false;  
  20.         this.scheduler = scheduler;  
  21.         firedTriggerBundle = bndle;  
  22.     }  
  23.      public void run()  
  24.     {  
  25.         //添加到内部监听器  
  26.         qs.addInternalSchedulerListener(this);  
  27. label0:  
  28.         {  
  29.        //protected JobExecutionContextImpl jec,job执行上下文  
  30.             OperableTrigger trigger = (OperableTrigger)jec.getTrigger();  
  31.             JobDetail jobDetail = jec.getJobDetail();  
  32.             org.quartz.Trigger.CompletedExecutionInstruction instCode;  
  33.             do  
  34.             {  
  35.                 JobExecutionException jobExEx = null;  
  36.                 Job job = jec.getJobInstance();  
  37.                 try  
  38.                 {  
  39.                     begin();  
  40.                 }  
  41.                 catch(SchedulerException se)  
  42.                 {  
  43.                     qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn't begin execution.").toString(), se);  
  44.                     break label0;  
  45.                 }  
  46.                 try  
  47.                 {  
  48.                     if(!notifyListenersBeginning(jec))  
  49.                         break label0;  
  50.                 }  
  51.                 catch(VetoedException ve)  
  52.                 {  
  53.                     try  
  54.                     {  
  55.                         org.quartz.Trigger.CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);  
  56.                         qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);  
  57.                         if(jec.getTrigger().getNextFireTime() == null)  
  58.                             qs.notifySchedulerListenersFinalized(jec.getTrigger());  
  59.                         complete(true);  
  60.                     }  
  61.                     catch(SchedulerException se)  
  62.                     {  
  63.                         qs.notifySchedulerListenersError((new StringBuilder()).append("Error during veto of Job (").append(jec.getJobDetail().getKey()).append(": couldn't finalize execution.").toString(), se);  
  64.                     }  
  65.                     break label0;  
  66.                 }  
  67.                 long startTime = System.currentTimeMillis();  
  68.                 long endTime = startTime;  
  69.                 try  
  70.                 {  
  71.                     log.debug((new StringBuilder()).append("Calling execute on job ").append(jobDetail.getKey()).toString());  
  72.                     //执行Job,关键  
  73.             job.execute(jec);  
  74.                     endTime = System.currentTimeMillis();  
  75.                 }  
  76.                 catch(JobExecutionException jee)  
  77.                 {  
  78.                     endTime = System.currentTimeMillis();  
  79.                     jobExEx = jee;  
  80.                     getLog().info((new StringBuilder()).append("Job ").append(jobDetail.getKey()).append(" threw a JobExecutionException: ").toString(), jobExEx);  
  81.                 }  
  82.                 catch(Throwable e)  
  83.                 {  
  84.                     endTime = System.currentTimeMillis();  
  85.                     getLog().error((new StringBuilder()).append("Job ").append(jobDetail.getKey()).append(" threw an unhandled Exception: ").toString(), e);  
  86.                     SchedulerException se = new SchedulerException("Job threw an unhandled exception.", e);  
  87.                     qs.notifySchedulerListenersError((new StringBuilder()).append("Job (").append(jec.getJobDetail().getKey()).append(" threw an exception.").toString(), se);  
  88.                     jobExEx = new JobExecutionException(se, false);  
  89.                 }  
  90.         //设置jJobExecutionContext运行时间  
  91.                 jec.setJobRunTime(endTime - startTime);  
  92.                 if(!notifyJobListenersComplete(jec, jobExEx))  
  93.                     break label0;  
  94.                 instCode = org.quartz.Trigger.CompletedExecutionInstruction.NOOP;  
  95.                 try  
  96.                 {  
  97.                     instCode = trigger.executionComplete(jec, jobExEx);  
  98.                 }  
  99.                 catch(Exception e)  
  100.                 {  
  101.                     SchedulerException se = new SchedulerException("Trigger threw an unhandled exception.", e);  
  102.                     qs.notifySchedulerListenersError("Please report this error to the Quartz developers.", se);  
  103.                 }  
  104.                 if(!notifyTriggerListenersComplete(jec, instCode))  
  105.                     break label0;  
  106.                 if(instCode == org.quartz.Trigger.CompletedExecutionInstruction.RE_EXECUTE_JOB)  
  107.                 {  
  108.                     jec.incrementRefireCount();  
  109.                     try  
  110.                     {  
  111.                         complete(false);  
  112.                     }  
  113.                     catch(SchedulerException se)  
  114.                     {  
  115.                         qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn't finalize execution.").toString(), se);  
  116.                     }  
  117.                     continue;  
  118.                 }  
  119.                 try  
  120.                 {  
  121.                     complete(true);  
  122.                     break;  
  123.                 }  
  124.                 catch(SchedulerException se)  
  125.                 {  
  126.                     qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn't finalize execution.").toString(), se);  
  127.                 }  
  128.             } while(true);  
  129.         //通知job执行完成  
  130.             qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);  
  131.         }  
  132.         qs.removeInternalSchedulerListener(this);  
  133.         break MISSING_BLOCK_LABEL_710;  
  134.         Exception exception;  
  135.         exception;  
  136.         qs.removeInternalSchedulerListener(this);  
  137.         throw exception;  
  138.     }  
  139.     protected JobExecutionContextImpl jec;//job执行上下文  
  140.     protected QuartzScheduler qs;  
  141.     protected TriggerFiredBundle firedTriggerBundle;  
  142.     protected Scheduler scheduler;  
  143.     protected volatile boolean shutdownRequested;  
  144.     private final Logger log = LoggerFactory.getLogger(getClass());  
  145. }  


//TriggerKey,JobKey包装类 

Java代码 下载 

  1. class TriggerWrapper  
  2. {  
  3.   
  4.     TriggerWrapper(OperableTrigger trigger)  
  5.     {  
  6.         state = 0;  
  7.         if(trigger == null)  
  8.         {  
  9.             throw new IllegalArgumentException("Trigger cannot be null!");  
  10.         } else  
  11.         {  
  12.             this.trigger = trigger;  
  13.             key = trigger.getKey();  
  14.             jobKey = trigger.getJobKey();  
  15.             return;  
  16.         }  
  17.     }  
  18.   
  19.     public boolean equals(Object obj)  
  20.     {  
  21.         if(obj instanceof TriggerWrapper)  
  22.         {  
  23.             TriggerWrapper tw = (TriggerWrapper)obj;  
  24.             if(tw.key.equals(key))  
  25.                 return true;  
  26.         }  
  27.         return false;  
  28.     }  
  29.   
  30.     public int hashCode()  
  31.     {  
  32.         return key.hashCode();  
  33.     }  
  34.   
  35.     public OperableTrigger getTrigger()  
  36.     {  
  37.         return trigger;  
  38.     }  
  39.   
  40.     public final TriggerKey key;  
  41.     public final JobKey jobKey;  
  42.     public final OperableTrigger trigger;  
  43.     public int state;  
  44.     public static final int STATE_WAITING = 0;//等待  
  45.     public static final int STATE_ACQUIRED = 1;//就绪  
  46.     public static final int STATE_EXECUTING = 2;//执行  
  47.     public static final int STATE_COMPLETE = 3;//完成  
  48.     public static final int STATE_PAUSED = 4;//暂停  
  49.     public static final int STATE_BLOCKED = 5;//阻塞  
  50.     public static final int STATE_PAUSED_BLOCKED = 6;//暂停阻塞  
  51.     public static final int STATE_ERROR = 7;//错误  
  52. }  


//简单触发器 

Java代码 下载 

  1. public class SimpleTriggerImpl extends AbstractTrigger  
  2.     implements SimpleTrigger, CoreTrigger  
  3. {  
  4.  //获取下一次触发时间  
  5.  public Date getNextFireTime()  
  6.     {  
  7.         return nextFireTime;  
  8.     }  
  9.  private Date startTime;  
  10.     private Date endTime;  
  11.     private Date nextFireTime;  
  12.     private Date previousFireTime;  
  13.     private int repeatCount;  
  14.     private long repeatInterval;  
  15.     private int timesTriggered;  
  16.     private boolean complete;  
  17.   
  18. }  


//触发任务包装类 

Java代码 下载 

  1. public class TriggerFiredBundle  
  2.     implements Serializable  
  3. {  
  4. public TriggerFiredBundle(JobDetail job, OperableTrigger trigger, Calendar cal, boolean jobIsRecovering, Date fireTime, Date scheduledFireTime, Date prevFireTime,   
  5.             Date nextFireTime)  
  6.     {  
  7.         this.job = job;  
  8.         this.trigger = trigger;  
  9.         this.cal = cal;  
  10.         this.jobIsRecovering = jobIsRecovering;  
  11.         this.fireTime = fireTime;  
  12.         this.scheduledFireTime = scheduledFireTime;  
  13.         this.prevFireTime = prevFireTime;  
  14.         this.nextFireTime = nextFireTime;  
  15.     }  
  16.     private JobDetail job;  
  17.     private OperableTrigger trigger;  
  18.     private Calendar cal;  
  19.     private boolean jobIsRecovering;  
  20.     private Date fireTime;  
  21.     private Date scheduledFireTime;  
  22.     private Date prevFireTime;  
  23.     private Date nextFireTime;  
  24. }  


//触发任务包装结果类 

Java代码 下载 

  1. public class TriggerFiredResult  
  2. {  
  3.     public TriggerFiredResult(TriggerFiredBundle triggerFiredBundle)  
  4.     {  
  5.         this.triggerFiredBundle = triggerFiredBundle;  
  6.     }  
  7.     private TriggerFiredBundle triggerFiredBundle;  
  8.     private Exception exception;  
  9. }  

© 著作权归作者所有

s
粉丝 1
博文 2
码字总数 2198
作品 0
杭州
私信 提问
基于spring+quartz的分布式任务调度

学习地址:http://www.roncoo.com/course/view/e2b459016e2e477dbd5d67c8b23fe86d 课程介绍 Quartz是OpenSymphony开源组织在Job scheduling领域又一个开源项目,它可以与J2EE与J2SE应用程序相...

小红牛
2018/04/19
0
0
基于Netty+Zookeeper+Quartz调度分析

系列文章 Spring整合Quartz分布式调度 Quartz数据库表分析 Quartz调度源码分析 基于Netty+Zookeeper+Quartz调度分析 前言 前几篇文章分别从使用和源码层面对Quartz做了简单的分析,在分析的过...

ksfzhaohui
2018/09/03
812
0
源码分析Elastic-Job前置篇二-基于Spring启动序列图与核心类图

本文主要目的:简单梳理了基于 Spring ElasticJob 的启动流程,从下文开始,将重点剖析 ElasticJob 的核心实现细节,例如选主、分片、失效转移机制等等。 1、在Spring中使用Elastic-Job的示例...

丁威
09/25
0
0
quartz-java的crontab

项目地址: http://quartz-scheduler.org/ Quatz提供 类crontab(可精细到秒) 和 定时执行 两种方式 源码说明: QuartzSchedulerThread: - 执行 job(即quartz任务以线程的方式执行) - 通过 Qu...

深蓝苹果
2014/05/28
380
0
quartz2.2源码分析1-使用和原理

前言 Quartz是一款由java写成的作业调度框架,在大量javase/javaee应用中被用来做定时任务,它功能强大而又不失使用简单性。 使用例子 maven引入: 加入配置文件org/quartz/quartz.propertie...

Small-Liu
2016/04/23
2.4K
0

没有更多内容

加载失败,请刷新页面

加载更多

CentOS 7系统增加swap

转载请注明文章出处:CentOS 7系统增加swap swap是位于磁盘上的特殊文件(或分区),属于“虚拟内存”的一部分。通俗点就是内存的备胎,内存充足的情况下,基本上没swap什么事(和设置有关)...

tlanyan
31分钟前
4
0
基于Prometheus和Grafana的监控平台 - 环境搭建

相关概念 微服务中的监控分根据作用领域分为三大类,Logging,Tracing,Metrics。 Logging - 用于记录离散的事件。例如,应用程序的调试信息或错误信息。它是我们诊断问题的依据。比如我们说...

JAVA日知录
今天
5
0
PHP运行时全局构造体

struct _php_core_globals { zend_bool magic_quotes_gpc; // 是否对输入的GET/POST/Cookie数据使用自动字符串转义。 zend_bool magic_quotes_runtime; //是否对运行时从外部资源产生的数据使...

冻结not
今天
4
0
webpack插件html-webpack-plugin

本文转载于:专业的前端网站→webpack插件html-webpack-plugin 1、插件安装 npm install html-webpack-plugin --save-dev 2、插件使用 webpack.config.js配置文件为: var htmlWebpackPlugin=...

前端老手
今天
6
0
数据挖掘

zhengchen1996
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部