文档章节

taobao-pamirs-schedule-2.0源码分析——任务队列分配源码分析

杨武兵
 杨武兵
发布于 2016/03/09 22:28
字数 2357
阅读 337
收藏 9

算法原理

分配服务器算法

算法原理是注册中心有一个队列表PAMIRS_S
,它包含如下关键信息:QUEUE_ID是队列标识   CUR_SERVER是当前分配服务器标识,REQ_SERVER是申请分配服务器标识。

假如有1,2,3,4,5个队列,有A,B,C三个服务器依次启动。则算法的规则是这样的:

A启动的时候:

由于没有其它的主机,则将所有的队列分配给A。

QUEUE_ID
 CUR_SERVER
REQ_SERVER
1 A     
2 A

3 A

4 A
5 A

B启动的时候:

QUEUE_ID
 CUR_SERVER
REQ_SERVER
1 A     
2 A
B
3 A

4 A B
5 A



C启动的时候:

QUEUE_ID
 CUR_SERVER
REQ_SERVER
1 A     
2 A
B
3 A
C
4 A
5 A B


D启动的时候:

QUEUE_ID
 CUR_SERVER
REQ_SERVER
1 A     
2 A
B
3 A
C
4 A D
5 A


服务器释放算法

上述算法中实现了预分配,那什么时候实现正式分配呢?当在获取任务队列的时候(必须控制在当前服务器中的所有任务都执行完毕的情况下,否则会重复执行任务的可能性)会先释放自己已经持有,但是别人要申请的队列,将这些队列让给申请人。

比如当前队列是A,在执行释放队列前的数据状态是:

QUEUE_ID
 CUR_SERVER
REQ_SERVER
1 A     
2 A
B
3 A
C
4 A D
5 A

释放自己持有,别人申请的队列之后的数据状态为:
QUEUE_ID
 CUR_SERVER
REQ_SERVER
1 A     
2 B
3 C

4 D
5 A

这个时候A持有的队列只有1和5了,队列就实现了均匀的分配给所有机器。

算法实现

分配队列代码实现

最开始的代码是在TBScheduleManager的方法assignScheduleTask方法。


public void assignScheduleTask() throws Exception {
		int clearServerCount = scheduleCenter
				.clearExpireScheduleServer(this.taskTypeInfo,this.taskTypeRunningInfo);
		List<ScheduleServer> serverList = scheduleCenter
				.selectAllValidScheduleServer(this.getTaskTypeRunningInfo().getTaskType());
		int clearTaskQueueInfoCount = scheduleCenter.clearTaskQueueInfo(
				this.getTaskTypeRunningInfo().getTaskType(), serverList);

		boolean isNeedReAssign = false;
		if (clearServerCount > 0 || clearTaskQueueInfoCount > 0) {
			isNeedReAssign = true;
		} else  {
			for (ScheduleServer item : serverList) {
				//注意,比较时间一定要用数据库时间
				if (item.getCenterServerTime().getTime() - item.getRegisterTime().getTime()
						< taskTypeInfo.getJudgeDeadInterval() * 3 ) {
					isNeedReAssign = true;
					break;
				}
			} 
		}
		if (isNeedReAssign == true) {
			scheduleCenter.assignQueue(this.getTaskTypeRunningInfo().getTaskType(),
					this.currenScheduleServer.getUuid(), serverList);
		}
		if (log.isDebugEnabled()) {
			//log.debug(message);
		}
	}

它会先查询一下是否需要重新分配队列,当已经清理过过期的服务器,或者已经清理过非法服务器持有的队列,或者有新的服务器(注册时间距离现在时间小于3个时间周期)注册的时候,则需要重新预分配队列。比较时间一定要以注册中心的时间为准。

需要重新预分配队列则进入方法scheduleCenter.assignQueue。

private Connection getConnection() throws SQLException{
    	Connection result = this.dataSource.getConnection();
    	if(result.getAutoCommit() == true){
    		result.setAutoCommit(false);
    	}
    	return result;
    } public void assignQueue(String taskType, String currentUuid,
			List<ScheduleServer> serverList) throws Exception {
		Connection conn = null;
		 try{
			 conn = this.getConnection();
			 clientInner.assignQueue(conn, taskType,currentUuid,serverList);
			 conn.commit();
		 }catch(Throwable e){
			 if(conn != null){
			     conn.rollback();
			 }
			 if(e instanceof Exception){
				 throw (Exception)e;
			 }else{
				 throw new Exception(e);
			 }			 
		 }finally{
			 if(conn!= null){
				 conn.close();
			 }
		 }		
	}

这个方法说明连接关闭了自动提交,方法内的多个SQL执行是在一个事务里的。这个非常关键。


/**
	 * 重新分配任务处理队列
	 * 
	 * @param taskType
	 * @param serverList
	 * @throws Exception
	 */
	public void assignQueue(Connection conn,String taskType, String currentUuid,
			List<ScheduleServer> serverList) throws Exception {
		this.lockTaskTypeRunningInfo(conn,taskType, currentUuid);
			String sqlQueue = " SELECT TASK_TYPE,QUEUE_ID,CUR_SERVER,REQ_SERVER FROM "
					+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")
					+ " WHERE TASK_TYPE = ? ORDER BY QUEUE_ID";
			PreparedStatement stmtQueue = conn.prepareStatement(sqlQueue);
			stmtQueue.setString(1, taskType);
			ResultSet setQueue = stmtQueue.executeQuery();
			int point = 0;
			int taskCount = 0;
			while (setQueue.next()) {
				PreparedStatement stmtUpdateQueue = null;
				String sqlModifyQueue = "";
				if (setQueue.getString("CUR_SERVER") == null) {
					sqlModifyQueue = " UPDATE "
							+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")
							+ " SET CUR_SERVER = ?,REQ_SERVER = null,GMT_MODIFIED = "
							+ getDataBaseSysdateString(conn)
							+ " WHERE TASK_TYPE = ? and QUEUE_ID = ? ";
					stmtUpdateQueue = conn.prepareStatement(sqlModifyQueue);
					stmtUpdateQueue.setString(1, serverList.get(point)
							.getUuid());
					stmtUpdateQueue.setString(2, taskType);
					stmtUpdateQueue
							.setString(3, setQueue.getString("QUEUE_ID"));
					stmtUpdateQueue.executeUpdate();
					stmtUpdateQueue.close();
				} else if (!(serverList.get(point).getUuid().equalsIgnoreCase(
						setQueue.getString("CUR_SERVER")) == true && setQueue
						.getString("REQ_SERVER") == null)) {
					sqlModifyQueue = " UPDATE "
							+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")
							+ " SET REQ_SERVER = ? ,GMT_MODIFIED = "
							+ getDataBaseSysdateString(conn)
							+ " WHERE TASK_TYPE = ? and QUEUE_ID = ? ";
					stmtUpdateQueue = conn.prepareStatement(sqlModifyQueue);
					stmtUpdateQueue.setString(1, serverList.get(point)
							.getUuid());
					stmtUpdateQueue.setString(2, taskType);
					stmtUpdateQueue
							.setString(3, setQueue.getString("QUEUE_ID"));
					stmtUpdateQueue.executeUpdate();
					stmtUpdateQueue.close();
				} else {
					// 不需要修改当前记录的信息
				}
				taskCount = taskCount + 1;
				if (point >= serverList.size() - 1) {
					point = 0;
				} else {
					point = point + 1;
				}
			}
			setQueue.close();
			stmtQueue.close();
			if (taskCount == 0) {
				throw new Exception("没有对任务类型配置数据处理队列,TASK_TYPE = " + taskType);
			}
	}
public void lockTaskTypeRunningInfo(Connection conn,String taskType, String lockServerUuid)
 throws Exception {
 String sql = " UPDATE "
 + transferTableName(conn, "PAMIRS_SCHEDULE_TASKTRUN")
 + " set LAST_ASSIGN_TIME = "
 + getDataBaseSysdateString(conn)
 + ",LAST_ASSIGN_UUID = ? , GMT_MODIFIED = "
 + getDataBaseSysdateString(conn) + " where TASK_TYPE = ? ";
 PreparedStatement statement = conn.prepareStatement(sql);
 statement.setString(1, lockServerUuid);
 statement.setString(2, taskType);
 statement.executeUpdate();
 statement.close();
 }

分配队列之前,会先调用方法lockTaskTypeRunningInfo对这个运行期类型进行加锁,看它使用的SQL语句可以看出来,它是使用了数据库实现的行锁(或者范围锁)来实现加锁,避免多个进程同时分配队列时的冲突,其它进程若要更新该行需要等待释放锁。这就要求我们在建表的时候一定要对字段TASK_TYPE建立索引,并且如果是mysql的话,要选择支持行锁的表引擎,避免锁粒度过大导致的系统性能问题。

分配队列的实现是先查询出该任务所有的队列列表,然后循环这个列表,依次给这个队列列表分配服务器,参数输入的是有效服务器列表。

这个代码就实现了上述算法。它依次对队列列表进行循环,有下面这些情况:

如果当前队列未分配服务器(即 CUR_SERVER=null)则将当前服务器分配给该队列(即赋值给CUR_SERVER字段)

如果当前队列已经分配服务器( CUR_SERVER!=null),并且分配的服务器不是当前服务器,则将当前服务器设置为待分配服务器(即赋值给REQ_SERVER字段);如果是当前服务器则表示应分配,就没有必要再放入待分配服务器。

其中服务器的选择是循环的,因为服务器的数量可能小于队列数。选择到最后一个服务器则下一个又回到第一个服务器。

这样就实现了服务器可以均匀的分配给多个队列,当服务器数大于队列数的时候就有可能会出现有的服务器无法分配给对应的任务队列的问题,会报警。


服务器代码实现

在调度管理器中有一个获取当前服务器某个任务队列列表的方法,查看该方法源码可以看到检查处理器中的数据是否已经处理完,若没有处理完则会循环等待阻塞程序直到处理完成才能继续获取任务队列。它最终调用了私有方法getCurrentScheduleQueueNow。


/**
	 * 重新加载当前服务器的任务队列
	 * 1、释放当前服务器持有,但有其它服务器进行申请的任务队列
	 * 2、重新获取当前服务器的处理队列
	 * 
	 * 为了避免此操作的过度,阻塞真正的数据处理能力。系统设置一个重新装载的频率。例如1分钟
	 * 
	 * 特别注意:
	 *   此方法的调用必须是在当前所有任务都处理完毕后才能调用,否则是否任务队列后可能数据被重复处理
	 */
	@SuppressWarnings("static-access")
	public List<String> getCurrentScheduleQueue() {
		try{
		if (this.isNeedReloadQueue == true) {			
			//特别注意:需要判断数据队列是否已经空了,否则可能在队列切换的时候导致数据重复处理
			//主要是在线程不休眠就加载数据的时候一定需要这个判断
			if (this.processor != null) {
					while (this.processor.isDealFinishAllData() == false) {
						Thread.currentThread().sleep(50);
					}
			}
			//真正开始处理数据
			this.getCurrentScheduleQueueNow();
		}
		this.lastReloadTaskQueueTime = ScheduleUtil.getCurrentTimeMillis();		
		return this.currentTaskQueue;		
		}catch(Exception e){
			throw new RuntimeException(e);
		}
	}



getCurrentScheduleQueueNow方法才真正实现了获取队列的逻辑,我们进去看一下。


private List<String> getCurrentScheduleQueueNow() throws Exception {
		//是否被人申请的队列
		this.scheduleCenter.releaseDealQueue(this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid());
		//重新查询当前服务器能够处理的队列
		this.currentTaskQueue = this.scheduleCenter.reloadDealQueue(
				this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid());
		
		//如果超过10个心跳周期还没有获取到调度队列,则报警
		if(this.currentTaskQueue.size() ==0 && 
				ScheduleUtil.getCurrentTimeMillis() - this.lastReloadTaskQueueTime
				> this.taskTypeInfo.getHeartBeatRate() * 10){			
			String message ="调度服务器" + this.currenScheduleServer.getUuid() +"[TASK_TYPE=" + this.getTaskTypeRunningInfo().getTaskType() + "]自启动以来,超过10个心跳周期,还 没有获取到分配的任务队列";
			log.warn(message);
			if(this.scheduleAlert != null){
				this.scheduleAlert.noTaskQueue(this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid(),message);
			}
		}
		
		if(this.currentTaskQueue.size() >0){
			 //更新时间戳
			 this.lastReloadTaskQueueTime = ScheduleUtil.getCurrentTimeMillis();
		}
		
		return this.currentTaskQueue;
	}




它先调用了scheduleCenter.releaseDealQueue方法释放自己的队列,即下列代码。然后重新加载自己的队列,当10个周期获取到的队列数为0则会报警。


/**
	 * 释放自己把持,别人申请的队列
	 * 
	 * @param taskType
	 * @param uuid
	 * @return
	 * @throws Exception
	 */
	public void releaseDealQueue(Connection conn,String taskType, String uuid) throws Exception {
		String querySql = "select QUEUE_ID from "
			+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")
			+ " WHERE TASK_TYPE = ? and CUR_SERVER = ?  AND  REQ_SERVER IS NOT NULL ";
		PreparedStatement stmtQueue = conn.prepareStatement(querySql);
		stmtQueue.setString(1, taskType);
		stmtQueue.setString(2, uuid);
		ResultSet set = stmtQueue.executeQuery();
		List<String> queueIds = new ArrayList<String>();
		while(set.next()){
			queueIds.add(set.getString("QUEUE_ID"));
		}
        set.close();
        stmtQueue.close();
        
		String sqlQueue = " update "
			+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")
			+ " set CUR_SERVER = REQ_SERVER,REQ_SERVER = NULL, GMT_MODIFIED = "
			+ getDataBaseSysdateString(conn)
			+ " WHERE TASK_TYPE = ? and CUR_SERVER = ? AND QUEUE_ID = ?  AND  REQ_SERVER IS NOT NULL ";
		
		for(String queueId:queueIds){
			stmtQueue = conn.prepareStatement(sqlQueue);
			stmtQueue.setString(1, taskType);
			stmtQueue.setString(2, uuid);
			stmtQueue.setString(3, queueId);
			stmtQueue.executeUpdate();
			stmtQueue.close();
			conn.commit();
		}
	}


该方法的实现是查询当前任务分给当前服务器的所有队列列表,然后会依次循环将字段REQ_SERVER的值赋给字段CUR_SERVER,也就是表示将待分配服务器正式设置为已分配服务器,并且将REQ_SERVER设置为空,这也就实现了服务器释放算法。





© 著作权归作者所有

杨武兵

杨武兵

粉丝 264
博文 61
码字总数 123254
作品 1
昌平
架构师
私信 提问
taobao-pamirs-schedule-2.0源码分析

taobao-pamirs-schedule-2.0源码分析,我们的生产环境中大量使用了该项目,因此才有了动机深入研究该项目,源码分析分享给大家。 http://my.oschina.net/ywbrj042/blog/626909...

杨武兵
2016/03/03
507
0
taobao-pamirs-schedule-2.0源码分析——类设计

使用方法 首先学习一个开源项目,一定要先学习该开源项目的使用方法。该项目的使用方法本文不再详述。请参考博文: http://pinsir.iteye.com/blog/882275 http://pinsir.iteye.com/blog/882...

杨武兵
2016/03/02
956
8
taobao-pamirs-schedule2.0设计和实现的局限性

不适合简单的少量任务调度 问题描述 非常简单的定时调度任务,只是定时触发执行任务,这个任务量是非常少的,单机实现就可以的任务,这种场景使用taobao-pamirs-schedule就会存在开发、配置和...

杨武兵
2016/03/14
216
5
taobao-pamirs-schedule-2.0源码分析—核心流程

核心的流程时序图如下。 如上图所示,淘宝调度管理器在创建后就会执行一系列初始化过程,并且启动一些定时线程。具体流程描述如下: 1.创建定时调度器。 2.从配置中心加载任务配置信息。 3....

杨武兵
2016/03/09
184
0
taobao-pamirs-schedule-2.0源码分析——任务处理器源码分析

TBScheduleProcessorSleep分析 基本介绍 sleep模式: 当某一个线程任务处理完毕,从任务池中取不到任务的时候,检查其它线程是否处于活动状态。如果是,则自己休眠;如果其它线程都已经因为没...

杨武兵
2016/03/11
221
2

没有更多内容

加载失败,请刷新页面

加载更多

Java中print、printf、println的区别

printf主要是继承了C语言的printf的一些特性,可以进行格式化输出 print就是一般的标准输出,但是不换行 println和print基本没什么差别,就是最后会换行

hellation_
23分钟前
0
0
spring在静态类中注入bean的的解释

@Componentpublic class ModelMapper {@AutoWiredprivate static AssignmentManager assignmentManager;public static void add(){a+b;}} 静态方法是属于类的,普通方法才属于...

无知的小狼
23分钟前
2
0
分而治之-归并排序

如果有1个数组,数组的左半部分和右半部分都已经排好序,如何将该数组合成1个有序的数组? 开辟1个同样大小的临时空间辅助我们完成归并过程,如下图 k:表示归并过程中,当前需要替换的原数组...

万山红遍
37分钟前
2
0
Linux修改时区的正确方法【修改时间,需要修改软连接,靠谱】

CentOS和Ubuntu的时区文件是/etc/localtime,但是在CentOS7以后localtime以及变成了一个链接文件 [root@centos7 ~]# ll /etc/localtime lrwxrwxrwx 1 root root 33 Oct 12 11:01 /etc/loca......

Airship
今天
1
0
《Netkiller Spring Cloud 手札》之 Master / Slave 主从数据库数据源配置

5.19.1. Master / Slave 主从数据库数据源配置 5.19.1.1. application.properties spring.datasource.master.driverClassName = com.mysql.cj.jdbc.Driverspring.datasource.master.url=j......

netkiller-
今天
50
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部