杨武兵

# 算法原理

## 分配服务器算法

，它包含如下关键信息：QUEUE_ID是队列标识   CUR_SERVER是当前分配服务器标识，REQ_SERVER是申请分配服务器标识。

A启动的时候：

 QUEUE_ID CUR_SERVER REQ_SERVER 1 A 2 A 3 A 4 A 5 A

B启动的时候：

 QUEUE_ID CUR_SERVER REQ_SERVER 1 A 2 A B 3 A 4 A B 5 A

C启动的时候：

 QUEUE_ID CUR_SERVER REQ_SERVER 1 A 2 A B 3 A C 4 A 5 A B

D启动的时候：

 QUEUE_ID CUR_SERVER REQ_SERVER 1 A 2 A B 3 A C 4 A D 5 A

## 服务器释放算法

 QUEUE_ID CUR_SERVER REQ_SERVER 1 A 2 A B 3 A C 4 A D 5 A

 QUEUE_ID CUR_SERVER REQ_SERVER 1 A 2 B 3 C 4 D 5 A

# 算法实现

## 分配队列代码实现

``````public void assignScheduleTask() throws Exception {
int clearServerCount = scheduleCenter
List<ScheduleServer> serverList = scheduleCenter

boolean isNeedReAssign = false;
if (clearServerCount > 0 || clearTaskQueueInfoCount > 0) {
isNeedReAssign = true;
} else  {
for (ScheduleServer item : serverList) {
//注意，比较时间一定要用数据库时间
if (item.getCenterServerTime().getTime() - item.getRegisterTime().getTime()
isNeedReAssign = true;
break;
}
}
}
if (isNeedReAssign == true) {
this.currenScheduleServer.getUuid(), serverList);
}
if (log.isDebugEnabled()) {
//log.debug(message);
}
}``````

``````private Connection getConnection() throws SQLException{
Connection result = this.dataSource.getConnection();
if(result.getAutoCommit() == true){
result.setAutoCommit(false);
}
return result;
} public void assignQueue(String taskType, String currentUuid,
List<ScheduleServer> serverList) throws Exception {
Connection conn = null;
try{
conn = this.getConnection();
conn.commit();
}catch(Throwable e){
if(conn != null){
conn.rollback();
}
if(e instanceof Exception){
throw (Exception)e;
}else{
throw new Exception(e);
}
}finally{
if(conn!= null){
conn.close();
}
}
}``````

``````/**
* 重新分配任务处理队列
*
* @param serverList
* @throws Exception
*/
public void assignQueue(Connection conn,String taskType, String currentUuid,
List<ScheduleServer> serverList) throws Exception {
String sqlQueue = " SELECT TASK_TYPE,QUEUE_ID,CUR_SERVER,REQ_SERVER FROM "
+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")
+ " WHERE TASK_TYPE = ? ORDER BY QUEUE_ID";
PreparedStatement stmtQueue = conn.prepareStatement(sqlQueue);
ResultSet setQueue = stmtQueue.executeQuery();
int point = 0;
while (setQueue.next()) {
PreparedStatement stmtUpdateQueue = null;
String sqlModifyQueue = "";
if (setQueue.getString("CUR_SERVER") == null) {
sqlModifyQueue = " UPDATE "
+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")
+ " SET CUR_SERVER = ?,REQ_SERVER = null,GMT_MODIFIED = "
+ getDataBaseSysdateString(conn)
+ " WHERE TASK_TYPE = ? and QUEUE_ID = ? ";
stmtUpdateQueue = conn.prepareStatement(sqlModifyQueue);
stmtUpdateQueue.setString(1, serverList.get(point)
.getUuid());
stmtUpdateQueue
.setString(3, setQueue.getString("QUEUE_ID"));
stmtUpdateQueue.executeUpdate();
stmtUpdateQueue.close();
} else if (!(serverList.get(point).getUuid().equalsIgnoreCase(
setQueue.getString("CUR_SERVER")) == true && setQueue
.getString("REQ_SERVER") == null)) {
sqlModifyQueue = " UPDATE "
+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")
+ " SET REQ_SERVER = ? ,GMT_MODIFIED = "
+ getDataBaseSysdateString(conn)
+ " WHERE TASK_TYPE = ? and QUEUE_ID = ? ";
stmtUpdateQueue = conn.prepareStatement(sqlModifyQueue);
stmtUpdateQueue.setString(1, serverList.get(point)
.getUuid());
stmtUpdateQueue
.setString(3, setQueue.getString("QUEUE_ID"));
stmtUpdateQueue.executeUpdate();
stmtUpdateQueue.close();
} else {
// 不需要修改当前记录的信息
}
if (point >= serverList.size() - 1) {
point = 0;
} else {
point = point + 1;
}
}
setQueue.close();
stmtQueue.close();
}
}``````
``````public void lockTaskTypeRunningInfo(Connection conn,String taskType, String lockServerUuid)
throws Exception {
String sql = " UPDATE "
+ " set LAST_ASSIGN_TIME = "
+ getDataBaseSysdateString(conn)
+ ",LAST_ASSIGN_UUID = ? , GMT_MODIFIED = "
+ getDataBaseSysdateString(conn) + " where TASK_TYPE = ? ";
PreparedStatement statement = conn.prepareStatement(sql);
statement.setString(1, lockServerUuid);
statement.executeUpdate();
statement.close();
}``````

## 服务器代码实现

``````/**
* 重新加载当前服务器的任务队列
* 1、释放当前服务器持有，但有其它服务器进行申请的任务队列
* 2、重新获取当前服务器的处理队列
*
* 为了避免此操作的过度，阻塞真正的数据处理能力。系统设置一个重新装载的频率。例如1分钟
*
* 特别注意：
*   此方法的调用必须是在当前所有任务都处理完毕后才能调用，否则是否任务队列后可能数据被重复处理
*/
@SuppressWarnings("static-access")
public List<String> getCurrentScheduleQueue() {
try{
//特别注意：需要判断数据队列是否已经空了，否则可能在队列切换的时候导致数据重复处理
//主要是在线程不休眠就加载数据的时候一定需要这个判断
if (this.processor != null) {
while (this.processor.isDealFinishAllData() == false) {
}
}
//真正开始处理数据
this.getCurrentScheduleQueueNow();
}
}catch(Exception e){
throw new RuntimeException(e);
}
}``````

getCurrentScheduleQueueNow方法才真正实现了获取队列的逻辑，我们进去看一下。

``````private List<String> getCurrentScheduleQueueNow() throws Exception {
//是否被人申请的队列
//重新查询当前服务器能够处理的队列

//如果超过10个心跳周期还没有获取到调度队列，则报警
log.warn(message);
}
}

//更新时间戳
}

}``````

``````/**
* 释放自己把持，别人申请的队列
*
* @param uuid
* @return
* @throws Exception
*/
public void releaseDealQueue(Connection conn,String taskType, String uuid) throws Exception {
String querySql = "select QUEUE_ID from "
+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")
+ " WHERE TASK_TYPE = ? and CUR_SERVER = ?  AND  REQ_SERVER IS NOT NULL ";
PreparedStatement stmtQueue = conn.prepareStatement(querySql);
stmtQueue.setString(2, uuid);
ResultSet set = stmtQueue.executeQuery();
List<String> queueIds = new ArrayList<String>();
while(set.next()){
}
set.close();
stmtQueue.close();

String sqlQueue = " update "
+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")
+ " set CUR_SERVER = REQ_SERVER,REQ_SERVER = NULL, GMT_MODIFIED = "
+ getDataBaseSysdateString(conn)
+ " WHERE TASK_TYPE = ? and CUR_SERVER = ? AND QUEUE_ID = ?  AND  REQ_SERVER IS NOT NULL ";

for(String queueId:queueIds){
stmtQueue = conn.prepareStatement(sqlQueue);
stmtQueue.setString(2, uuid);
stmtQueue.setString(3, queueId);
stmtQueue.executeUpdate();
stmtQueue.close();
conn.commit();
}
}``````

### 杨武兵

taobao-pamirs-schedule-2.0源码分析

taobao-pamirs-schedule-2.0源码分析,我们的生产环境中大量使用了该项目，因此才有了动机深入研究该项目，源码分析分享给大家。 http://my.oschina.net/ywbrj042/blog/626909...

2016/03/03
507
0
taobao-pamirs-schedule-2.0源码分析——类设计

2016/03/02
956
8
taobao-pamirs-schedule2.0设计和实现的局限性

2016/03/14
216
5
taobao-pamirs-schedule-2.0源码分析—核心流程

2016/03/09
184
0
taobao-pamirs-schedule-2.0源码分析——任务处理器源码分析

TBScheduleProcessorSleep分析 基本介绍 sleep模式： 当某一个线程任务处理完毕，从任务池中取不到任务的时候，检查其它线程是否处于活动状态。如果是，则自己休眠；如果其它线程都已经因为没...

2016/03/11
221
2

Java中print、printf、println的区别

printf主要是继承了C语言的printf的一些特性，可以进行格式化输出 print就是一般的标准输出，但是不换行 println和print基本没什么差别，就是最后会换行

hellation_
23分钟前
0
0
spring在静态类中注入bean的的解释

@Componentpublic class ModelMapper {@AutoWiredprivate static AssignmentManager assignmentManager;public static void add(){a+b;}} 静态方法是属于类的，普通方法才属于...

23分钟前
2
0

37分钟前
2
0
Linux修改时区的正确方法【修改时间，需要修改软连接，靠谱】

CentOS和Ubuntu的时区文件是/etc/localtime，但是在CentOS7以后localtime以及变成了一个链接文件 [root@centos7 ~]# ll /etc/localtime lrwxrwxrwx 1 root root 33 Oct 12 11:01 /etc/loca......

Airship

1
0
《Netkiller Spring Cloud 手札》之 Master / Slave 主从数据库数据源配置

5.19.1. Master / Slave 主从数据库数据源配置 5.19.1.1. application.properties spring.datasource.master.driverClassName = com.mysql.cj.jdbc.Driverspring.datasource.master.url=j......

netkiller-

50
0