quartz2.2源码分析4-JobStore

原创
2016/05/17 13:24
阅读数 5.8K

JobStore为Quartz任务和触发器提供了一个存储途径,JobStore里面封装了很多方法操作任务、触发器,包括添加、删除、修改、查询等,对于不同的存储介质有不同的实现,其内部核心提供了两大类型JobStore,内存级的和持久化到数据库的,其他插件包还提供其他形式的,不管是什么类型的JobStore都实现了以下几种类型的方法,如下:

1. 全局操作方法:

方法 作用
void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler) Scheduler调用之前给 JobStore 一个初始化的机会
void schedulerStarted() 被通知Scheduler 已经启动了
void schedulerPaused() 被通知 Scheduler 已经暂停了
void schedulerResumed() 被通知 Scheduler 已经恢复了
void shutdown() 被通知 Scheduler 已经结束了,需要释放资源

这类方法被QuartzScheduler调用 ,主要是对JobStore生命周期进行操作,初始化、启动、停止等,不同的实现在不同的周期有自己的实现,比如JDBCJobStore会在 initialize 方法初始化锁, 在schedulerStarted里恢复任务、初始化clusterManagementThread线程和MisfireHandler线程,在 shutdown 的时候停止 MisfireHandler 线程和 clusterManagementThread线程 。

2. 存储和删除JobDetail、Trigger和Calendar 方法 :

void storeJobAndTrigger(JobDetail newJob, OperableTrigger newTrigger) 存储jobDetail和Trigger                                        
void storeJob(JobDetail newJob, boolean replaceExisting) 存储 jobDetail
public void storeJobsAndTriggers(Map<JobDetail, Set<? extends Trigger>> triggersAndJobs, boolean replace) 批量增加 jobDetail 和Trigger
boolean removeJob(JobKey jobKey) 根据JobKey删除JobDetail和对应的Triggers
public boolean removeJobs(List<JobKey> jobKeys) 同上,批量删除
JobDetail retrieveJob(JobKey jobKey) 根据JobKey检索JobDetail
void storeTrigger(OperableTrigger newTrigger, boolean replaceExisting) 存储 Trigger
boolean removeTrigger(TriggerKey triggerKey) 根据triggerKey删除Trigger
public boolean removeTriggers(List<TriggerKey> triggerKeys) 同上,批量删除
boolean replaceTrigger(TriggerKey triggerKey, OperableTrigger newTrigger) 根据triggerKey替换Trigger
OperableTrigger retrieveTrigger(TriggerKey triggerKey) 根据 triggerKey 检索 Trigger
boolean checkExists(JobKey jobKey) 检查是否存在
boolean checkExists(TriggerKey triggerKey) 检查是否存在
void clearAllSchedulingData() 删除所有 JobDetail、Trigger和Calendar
void storeCalendar(String name, Calendar calendar, boolean replaceExisting, boolean updateTriggers) 存储 Calendar
boolean removeCalendar(String calName) 根据名称删除 Calendar
Calendar retrieveCalendar(String calName) 根据名称检索 Calendar

上面几类方法都大同小异,先获取锁,然后对数据进行操作,具体的不同JobStore对数据的操作不一样,内存级的对集合进行操作,jdbc类型的对数据库进行操作, 以storeJobAndTrigger 为例分别看内存型和jdbc型的代码:

RAMJobStore的storeJobAndTrigger 内部实际是调用 storeJob 和 storeTrigger ,如下:

// 根据根据key来定位的map
protected HashMap jobsByKey = new HashMap(1000);
protected HashMap triggersByKey = new HashMap(1000);

// 分组存放,这样便于对整个组进行操作
protected HashMap> jobsByGroup = new HashMap>(25);
protected HashMap> triggersByGroup = new HashMap>(25);

// 按优先级、时间顺序排序,第一条数据就是最近一条要执行的
protected TreeSet timeTriggers = new TreeSet(new TriggerWrapperComparator());
// 时间日期
protected HashMap calendarsByName = new HashMap(25);
// 所有的触发器
protected ArrayList triggers = new ArrayList(1000);

// 暂停的组
protected HashSet pausedTriggerGroups = new HashSet();
protected HashSet pausedJobGroups = new HashSet();

// 如果设置不能并发执行,正在执行的任务会放到这边
protected HashSet blockedJobs = new HashSet();

public void storeJobAndTrigger(JobDetail newJob, OperableTrigger newTrigger) throws JobPersistenceException {
	storeJob(newJob, false);
	storeTrigger(newTrigger, false);
}

public void storeJob(JobDetail newJob, boolean replaceExisting) throws ObjectAlreadyExistsException {
	JobWrapper jw = new JobWrapper((JobDetail) newJob.clone());
	boolean repl = false;
	synchronized (lock) {//加锁
		if (jobsByKey.get(jw.key) != null) {//从这句可以看出jobsByKey里面存放了所有的Job的Key
			if (!replaceExisting) {
				throw new ObjectAlreadyExistsException(newJob);
			}
			repl = true;
		}

		if (!repl) {//如果没有存在则放到集合中
			//获取job分组
			HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup());
			if (grpMap == null) {
				grpMap = new HashMap<JobKey, JobWrapper>(100);
				jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
			}
			//放到分组集合
			grpMap.put(newJob.getKey(), jw);
			//放到全局的map中
			jobsByKey.put(jw.key, jw);
		} else {//如果已经存在则修改JobDetail
			// update job detail
			JobWrapper orig = jobsByKey.get(jw.key);
			orig.jobDetail = jw.jobDetail; // already cloned
		}
	}
}
public void storeTrigger(OperableTrigger newTrigger, boolean replaceExisting) throws JobPersistenceException {
	TriggerWrapper tw = new TriggerWrapper((OperableTrigger) newTrigger.clone());

	synchronized (lock) {
		if (triggersByKey.get(tw.key) != null) {//triggersByKey里面包含所有的trigger的Key
			if (!replaceExisting) {
				throw new ObjectAlreadyExistsException(newTrigger);
			}
			//删除原有的
			removeTrigger(newTrigger.getKey(), false);
		}

		// 判断触发器对应的job是否存在
		if (retrieveJob(newTrigger.getJobKey()) == null) {
			throw new JobPersistenceException(
					"The job (" + newTrigger.getJobKey() + ") referenced by the trigger does not exist.");
		}

		// 添加trigger
		triggers.add(tw);
		// 添加trigger到分组
		HashMap<TriggerKey, TriggerWrapper> grpMap = triggersByGroup.get(newTrigger.getKey().getGroup());
		if (grpMap == null) {
			grpMap = new HashMap<TriggerKey, TriggerWrapper>(100);
			triggersByGroup.put(newTrigger.getKey().getGroup(), grpMap);
		}
		grpMap.put(newTrigger.getKey(), tw);
		// 放到map
		triggersByKey.put(tw.key, tw);

		//被暂停的组里面有没有,如果有则状态设为paused
		if (pausedTriggerGroups.contains(newTrigger.getKey().getGroup())
				|| pausedJobGroups.contains(newTrigger.getJobKey().getGroup())) {
			tw.state = TriggerWrapper.STATE_PAUSED;
			if (blockedJobs.contains(tw.jobKey)) {
				tw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;
			}
		} else if (blockedJobs.contains(tw.jobKey)) {
			tw.state = TriggerWrapper.STATE_BLOCKED;
		} else {
			//放到按优先级时间顺序的set中
			timeTriggers.add(tw);
		}
	}
}

对应内存形式而言用了几种数据结构来存放这些Job和Trigger

  • 首先要根据key能快速定位,这种存放在map中以JobKey或TriggerKey为键
  • 其次要能根据组进行批量操作,放大组名为key的map中
  • Trigger应该能获取最近需要被执行的任务,使用TreeSet自定义TriggerWrapperComparator,它的内部依赖于TriggerTimeComparator比较器:
    class TriggerWrapperComparator implements Comparator, java.io.Serializable {
    	TriggerTimeComparator ttc = new TriggerTimeComparator();
    
    	public int compare(TriggerWrapper trig1, TriggerWrapper trig2) {
    		//最终还是依赖于TriggerTimeComparator比较Trigger
    		return ttc.compare(trig1.trigger, trig2.trigger);
    	}
    
    }
    class TriggerTimeComparator implements Comparator, Serializable {
    	// This static method exists for comparator in TC clustered quartz
    	public static int compare(Date nextFireTime1, int priority1, TriggerKey key1, Date nextFireTime2, int priority2,
    			TriggerKey key2) {
    		if (nextFireTime1 != null || nextFireTime2 != null) {
    			//先比较实际
    			if (nextFireTime1 == null) {
    				return 1;
    			}
    			if (nextFireTime2 == null) {
    				return -1;
    			}
    			//按时间升序
    			if (nextFireTime1.before(nextFireTime2)) {
    				return -1;
    			}
    			if (nextFireTime1.after(nextFireTime2)) {
    				return 1;
    			}
    		}
    		//再比较优先级
    		int comp = priority2 - priority1;
    		if (comp != 0) {
    			return comp;
    		}
    		return key1.compareTo(key2);
    	}
    
    	public int compare(Trigger t1, Trigger t2) {
    		return compare(t1.getNextFireTime(), t1.getPriority(), t1.getKey(), t2.getNextFireTime(), t2.getPriority(),
    				t2.getKey());
    	}
    }

时间优先,其次是优先级。


Jdbc类型的JobStore:

JobStoreSupport包含基于jdbc的JobStore实现的基本功能,它的 storeJobAndTrigger 方法主要就是获取锁,然后插入或修改job_details triggers表,并根据当前任务的状况设置triggers的状态

public void storeJobAndTrigger(final JobDetail newJob, final OperableTrigger newTrigger)
		throws JobPersistenceException {
	//获取锁
	executeInLock((isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null, new VoidTransactionCallback() {
		public void executeVoid(Connection conn) throws JobPersistenceException {
			//执行具体的数据库操作
			storeJob(conn, newJob, false);
			storeTrigger(conn, newTrigger, newJob, false, Constants.STATE_WAITING, false, false);
		}
	});
}
protected abstract  T executeInLock(String lockName, TransactionCallback txCallback)
		throws JobPersistenceException;
protected void storeJob(Connection conn, JobDetail newJob, boolean replaceExisting) throws JobPersistenceException {
	//判断任务是否存在
	boolean existingJob = jobExists(conn, newJob.getKey());
	try {
		if (existingJob) {
			if (!replaceExisting) {
				throw new ObjectAlreadyExistsException(newJob);
			}
			//存在并且可替换就更新
			getDelegate().updateJobDetail(conn, newJob);
		} else {
			//不存在插入
			getDelegate().insertJobDetail(conn, newJob);
		}
	} catch (IOException e) {
		throw new JobPersistenceException("Couldn't store job: " + e.getMessage(), e);
	} catch (SQLException e) {
		throw new JobPersistenceException("Couldn't store job: " + e.getMessage(), e);
	}
}
protected void storeTrigger(Connection conn, OperableTrigger newTrigger, JobDetail job, boolean replaceExisting,
		String state, boolean forceState, boolean recovering) throws JobPersistenceException {

	//判断trigger是否存在,查询表TRIGGERS
	boolean existingTrigger = triggerExists(conn, newTrigger.getKey());
	if ((existingTrigger) && (!replaceExisting)) {
		throw new ObjectAlreadyExistsException(newTrigger);
	}
	try {
		boolean shouldBepaused;

		if (!forceState) {
			//判断所属的组是否处于暂停状态,修改对应的状态
			shouldBepaused = getDelegate().isTriggerGroupPaused(conn, newTrigger.getKey().getGroup());

			if (!shouldBepaused) {
				shouldBepaused = getDelegate().isTriggerGroupPaused(conn, ALL_GROUPS_PAUSED);

				if (shouldBepaused) {
					getDelegate().insertPausedTriggerGroup(conn, newTrigger.getKey().getGroup());
				}
			}
			if (shouldBepaused && (state.equals(STATE_WAITING) || state.equals(STATE_ACQUIRED))) {
				state = STATE_PAUSED;
			}
		}
		//获取并判断对应的任务是否存在
		if (job == null) {
			job = retrieveJob(conn, newTrigger.getJobKey());
		}
		if (job == null) {
			throw new JobPersistenceException(
					"The job (" + newTrigger.getJobKey() + ") referenced by the trigger does not exist.");
		}

		//检查@DisallowConcurrentExecution修改trigger状态
		if (job.isConcurrentExectionDisallowed() && !recovering) {
			state = checkBlockedState(conn, job.getKey(), state);
		}

		if (existingTrigger) {
			//修改
			getDelegate().updateTrigger(conn, newTrigger, state, job);
		} else {
			//插入
			getDelegate().insertTrigger(conn, newTrigger, state, job);
		}
	} catch (Exception e) {
		throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '"
				+ newTrigger.getJobKey() + "' job:" + e.getMessage(), e);
	}
}

上面有两点需要注意:

  • 获取锁Semaphore,有两种锁,一种是本地锁只能用于单机,另一种是数据库锁可以用于集群
  • 操作数据库的DriverDelegate,它提供了几乎所有主流数据库的实现,sql语句在常量StdJDBCConstants中。

3. 查询 JobStore 信息:

int getNumberOfJobs() Job数量
int getNumberOfTriggers() Trigger数量    
int getNumberOfCalendars() Calendar数量                
Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher) 获取匹配组的下的JobKey集合
Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher) 获取匹配组的下的 TriggerKey 集合         
List<String> getJobGroupNames() 获取所有Job组名称
List<String> getTriggerGroupNames() 获取所有Trigger组名称
List<String> getCalendarNames() 获取所有 Calendar 名称
List<OperableTrigger> getTriggersForJob(JobKey jobKey) 根据JobKey获取所有关联的 Trigger
TriggerState getTriggerState(TriggerKey triggerKey) 查询某个Trigger的状态

4. Trigger 状态修改:

void pauseTrigger(TriggerKey triggerKey) 暂停Trigger                  
Collection<String> pauseTriggers(GroupMatcher<TriggerKey> matcher) 暂停给定组的Trigger
void pauseJob(JobKey jobKey) 暂停Job对应的Trigger
Collection<String> pauseJobs(GroupMatcher<JobKey> groupMatcher) 暂停给定Job组的Trigger
void resumeTrigger(TriggerKey triggerKey) 恢复Trigger
Collection<String> resumeTriggers(GroupMatcher<TriggerKey> matcher) 恢复给定组的Trigger
Set<String> getPausedTriggerGroups() 获取暂停Trigger的组
void resumeJob(JobKey jobKey)  
Collection<String> resumeJobs(GroupMatcher<JobKey> matcher)  
void pauseAll() 暂停所有Trigger
void resumeAll()  

5. Trigger 执行:

List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) 获取下次需要执行的Trigger              
List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers) Trigger开始执行
void triggeredJobComplete(OperableTrigger trigger, JobDetail jobDetail,CompletedExecutionInstruction triggerInstCode) Trigger 执行完成
void releaseAcquiredTrigger(OperableTrigger trigger) 释放Trigger

这类方法在Scheduler的生命周期中被调用:

  • 获取下次需要触发的Triggers

         QuartzSchedulerThread循环调用 acquireNextTriggers 获取下次被触发的触发器集合

/**
 * @param noLaterThan
 *            时间点之前
 * @param maxCount
 *            最大数量
 * @param timeWindow
 *            时间窗口, noLaterThan+timeWindow才是截止时间
 */
public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) {
	synchronized (lock) {
		List<OperableTrigger> result = new ArrayList<OperableTrigger>();
		Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
		Set<TriggerWrapper> excludedTriggers = new HashSet<TriggerWrapper>();
		long firstAcquiredTriggerFireTime = 0;
		// return empty list if store has no triggers.
		if (timeTriggers.size() == 0)// 为空就返回空集合
			return result;
		while (true) {
			TriggerWrapper tw;
			try {
				tw = timeTriggers.first();// timeTriggers是TreeSet类型的
				if (tw == null)
					break;
				timeTriggers.remove(tw);//删除
			} catch (java.util.NoSuchElementException nsee) {
				break;
			}
			if (tw.trigger.getNextFireTime() == null) {//没有下次执行时间作废
				continue;
			}
			// misfire检查,命中misfire机制,则直接放弃等待下一轮
			if (applyMisfire(tw)) {
				if (tw.trigger.getNextFireTime() != null) {//下次执行时间不为空还要把trigger放回去
					timeTriggers.add(tw);
				}
				continue;
			}
			// 时间不满足的放回去,直接结束
			if (tw.getTrigger().getNextFireTime().getTime() > noLaterThan + timeWindow) {
				timeTriggers.add(tw);
				break;
			}

			// If trigger's job is set as @DisallowConcurrentExecution, and
			// it has already been added to result, then
			// put it back into the timeTriggers set and continue to search
			// for next trigger.
			JobKey jobKey = tw.trigger.getJobKey();
			JobDetail job = jobsByKey.get(tw.trigger.getJobKey()).jobDetail;
			// 是否允许同时执行相同的任务,这个版本已经换成了注解,jobClass上写@DisallowConcurrentExecution
			if (job.isConcurrentExectionDisallowed()) {
				// 已经在执行了,本次放弃
				if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
					excludedTriggers.add(tw);//本次不执行的记录下来,后面还要放回去
					continue; // go to next trigger in store.
				} else {
					acquiredJobKeysForNoConcurrentExec.add(jobKey);
				}
			}

			tw.state = TriggerWrapper.STATE_ACQUIRED;
			tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
			OperableTrigger trig = (OperableTrigger) tw.trigger.clone();
			result.add(trig);
			if (firstAcquiredTriggerFireTime == 0)
				firstAcquiredTriggerFireTime = tw.trigger.getNextFireTime().getTime();

			// 如果到最大数就结束
			if (result.size() == maxCount)
				break;
		}
		// 上有由于@DisallowConcurrentExecution排除掉的,要放回去
		if (excludedTriggers.size() > 0)
			timeTriggers.addAll(excludedTriggers);
		return result;
	}
}
  • 通知Triggers开始执行了

         QuartzSchedulerThread 获取到triggers后同时JobStore开始执行了, JobStore对trigger做一些状态修改,并把获取trigger对应的Job

/**
 * 通知JobStore,开始执行这些triggers了
 */
public List<TriggerFiredResult> triggersFired(List<OperableTrigger> firedTriggers) {
	synchronized (lock) {
		List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();

		for (OperableTrigger trigger : firedTriggers) {
			//重新获取判断,防止前面有删掉
			TriggerWrapper tw = triggersByKey.get(trigger.getKey());
			if (tw == null || tw.trigger == null) {
				continue;
			}
			//判断状态不是acquired的作废
			if (tw.state != TriggerWrapper.STATE_ACQUIRED) {
				continue;
			}

			//获取时间
			Calendar cal = null;
			if (tw.trigger.getCalendarName() != null) {
				cal = retrieveCalendar(tw.trigger.getCalendarName());
				if (cal == null)
					continue;
			}
			Date prevFireTime = trigger.getPreviousFireTime();
			// in case trigger was replaced between acquiring and firing
			timeTriggers.remove(tw);
			// call triggered on our copy, and the scheduler's copy
			//给trigger一个时机来修改下次触发时间
			tw.trigger.triggered(cal);
			trigger.triggered(cal);
			// tw.state = TriggerWrapper.STATE_EXECUTING;
			tw.state = TriggerWrapper.STATE_WAITING;

			//触发器,job,时间构造成bundle
			TriggerFiredBundle bndle = new TriggerFiredBundle(retrieveJob(tw.jobKey), trigger, cal, false,
					new Date(), trigger.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());

			JobDetail job = bndle.getJobDetail();

			//如果不支持并发执行
			if (job.isConcurrentExectionDisallowed()) {
				ArrayList<TriggerWrapper> trigs = getTriggerWrappersForJob(job.getKey());
				for (TriggerWrapper ttw : trigs) {
					if (ttw.state == TriggerWrapper.STATE_WAITING) {
						ttw.state = TriggerWrapper.STATE_BLOCKED;
					}
					if (ttw.state == TriggerWrapper.STATE_PAUSED) {
						ttw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;
					}
					//临时删掉timeTriggers
					timeTriggers.remove(ttw);
				}
				blockedJobs.add(job.getKey());
			} else if (tw.trigger.getNextFireTime() != null) {
				synchronized (lock) {
					// 如果不是@DisallowConcurrentExecution类型的Job,在执行任务的时候就放回来,
					// 如果是这种类型的,要到放到线程池才会放回来,具体看方法triggeredJobComplete
					timeTriggers.add(tw);
				}
			}
			results.add(new TriggerFiredResult(bndle));
		}
		return results;
	}
}

 

  • 通知Triggers执行结束

        trigger执行完后,通知 JobStore 释放trigger

// 放到线程池后,通知jobStore完成
public void triggeredJobComplete(OperableTrigger trigger, JobDetail jobDetail,
		CompletedExecutionInstruction triggerInstCode) {
	synchronized (lock) {
		JobWrapper jw = jobsByKey.get(jobDetail.getKey());
		TriggerWrapper tw = triggersByKey.get(trigger.getKey());

		// It's possible that the job is null if:
		// 1- it was deleted during execution
		// 2- RAMJobStore is being used only for volatile jobs / triggers
		// from the JDBC job store
		//重新获取判断是否为空,前面异常可能已经是释放了,
		if (jw != null) {
			JobDetail jd = jw.jobDetail;
			//是否把jobData放回JobDetail
			if (jd.isPersistJobDataAfterExecution()) {
				JobDataMap newData = jobDetail.getJobDataMap();
				if (newData != null) {
					newData = (JobDataMap) newData.clone();
					newData.clearDirtyFlag();
				}
				jd = jd.getJobBuilder().setJobData(newData).build();
				jw.jobDetail = jd;
			}
			//如果不支持并发执行
			if (jd.isConcurrentExectionDisallowed()) {
				//删除blocked jobs
				blockedJobs.remove(jd.getKey());
				ArrayList<TriggerWrapper> trigs = getTriggerWrappersForJob(jd.getKey());
				for (TriggerWrapper ttw : trigs) {
					if (ttw.state == TriggerWrapper.STATE_BLOCKED) {
						ttw.state = TriggerWrapper.STATE_WAITING;
						//放回TreeSet
						timeTriggers.add(ttw);
					}
					if (ttw.state == TriggerWrapper.STATE_PAUSED_BLOCKED) {
						ttw.state = TriggerWrapper.STATE_PAUSED;
					}
				}
				signaler.signalSchedulingChange(0L);
			}
		} else { // even if it was deleted, there may be cleanup to do
			blockedJobs.remove(jobDetail.getKey());
			//如果支持并发执行triggersFired方法就已经把trigger放回timeTriggers了
		}
		// check for trigger deleted during execution...
		//根据前面返回过来的状态做相应处理
		if (tw != null) {
			if (triggerInstCode == CompletedExecutionInstruction.DELETE_TRIGGER) {

				if (trigger.getNextFireTime() == null) {
					// double check for possible reschedule within job
					// execution, which would cancel the need to delete...
					if (tw.getTrigger().getNextFireTime() == null) {
						removeTrigger(trigger.getKey());
					}
				} else {
					removeTrigger(trigger.getKey());
					signaler.signalSchedulingChange(0L);
				}
			} else if (triggerInstCode == CompletedExecutionInstruction.SET_TRIGGER_COMPLETE) {
				tw.state = TriggerWrapper.STATE_COMPLETE;
				timeTriggers.remove(tw);
				signaler.signalSchedulingChange(0L);
			} else if (triggerInstCode == CompletedExecutionInstruction.SET_TRIGGER_ERROR) {
				getLog().info("Trigger " + trigger.getKey() + " set to ERROR state.");
				tw.state = TriggerWrapper.STATE_ERROR;
				signaler.signalSchedulingChange(0L);
			} else if (triggerInstCode == CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR) {
				getLog().info("All triggers of Job " + trigger.getJobKey() + " set to ERROR state.");
				setAllTriggersOfJobToState(trigger.getJobKey(), TriggerWrapper.STATE_ERROR);
				signaler.signalSchedulingChange(0L);
			} else if (triggerInstCode == CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_COMPLETE) {
				setAllTriggersOfJobToState(trigger.getJobKey(), TriggerWrapper.STATE_COMPLETE);
				signaler.signalSchedulingChange(0L);
			}
		}
	}
}

 

  • 释放正在执行的Triggers

         QuartzSchedulerThread 中途出现一些异常,调用 releaseAcquiredTrigger 释放trigger

/**
 * 通知JobStore不需要执行的triggers
 */
public void releaseAcquiredTrigger(OperableTrigger trigger) {
	synchronized (lock) {
		//重新获取判断
		TriggerWrapper tw = triggersByKey.get(trigger.getKey());
		if (tw != null && tw.state == TriggerWrapper.STATE_ACQUIRED) {
			// 修改状态放回去
			tw.state = TriggerWrapper.STATE_WAITING;
			timeTriggers.add(tw);
		}
	}
}

上面都是以RAMJobStore为例说明的,JDBC类型的逻辑差不多,都是查询数据库

展开阅读全文
加载中

作者的其它热门文章

打赏
4
3 收藏
分享
打赏
0 评论
3 收藏
4
分享
返回顶部
顶部