TBSchedule的中文释义为:淘宝分布式定时任务调度框架。这里我们先剧透一下tbs的执行能力:
tbs开源版本依赖于Spring与Zookeeper。纯JAVA实现的调度过程以及优秀的并发处理代表其具备低耦合,可容灾,低资源占用,低故障率,高效率等特点。其主要用于在分布式硬件环境中执行同一项任务,该任务可以单线/集群/分布式集群单线程执行,亦可以单机/集群/分布式集群多线程执行。
上述在多机同时执行一项任务时,必然会产生任务被重复执行的问题。为避免该问题tbs支持用户自定义任务分片机制。简单说来就是告诉北京机房的A机你去执行前1到10万数据;成都机房的A机你去执行前10万零1到20万的数据;同是成都机房的B机你去执行前20万零1到剩余的数据。这里tbs会精准地把startIndex和endIndex下发到对应的机器上,你所要做的就是写到SQL查询参数里,这算是最简单的应用场景了。
上面一段有的朋友应该会问,既然数据的位置固定。那么当第二轮任务开始的时候这些数据岂不是又被计算一次?所以这里要指出应用tbs最重要的一条规范:用tbs来处理的数据,一定千万必须要带有处理标志位,这条数据成功被加工以后一定千万必须要更新为已处理。这是市面上任何一个开源同类框架都不能帮你做的事情。所以就算添加标志位不可能也必须保证数据被处理的幂等性,如若不然使用tbs会让你困难重重!
由此可知:
- 当你需要在特定的时间执行一项任务;
- 这项任务对应的数据量比较大(作者个人认为日均百万级别的任务没必要用到tbs,除非其加工流程复杂耗时。)
- 这项任务不止在一个jvm里执行;
- 这项任务可选地可以由多个jvm共同执行。
有满足上述条件的业务需求时可以考虑使用TBSchedule,这里给出一个假设场景:
- 千万+资金流水数据需要在数十分钟内处理完毕;
- 同一机房4核16G机器四台单JVM,均只部署数据加工应用;
- 资金流水数据具备唯一流水编号(BIGINT)。需要按分户账号,资金方向,账期等字段分组汇总后结转入另外一张汇总表;
OK,大致实现思路如下:
- 我们有唯一主键,能够简单分片。
- 机器性能较好,适合充分利用。
- 分布式计算会产生数据一致性问题,这里在汇总时我们需要考虑预生成汇总记录(预生成简单,这里不讲),这里我们借助三方工具Redis。
1、我们配置8个任务项,2线程组/JVM,充分利用硬件资源。SQL参考如下:
SELECT
VOUCHER_NO, ACCOUNT_NO, ACCOUNT_DIRECTION, BEFORE_AMOUNT, AMOUNT,
AFTER_AMOUNT, PAY_DATE,IS_STATICS
FROM
ACCOUNT_VOUCHER_FLOW
WHERE
MOD(VOUCHER_NO,#{taskItemNum,jdbcType=INTEGER})
IN(${taskDefineNums,jdbcType=VARCHAR})
AND IS_STATICS = 0
LIMIT 0,#{eachCount,jdbcType=INTEGER}
##运行时
SELECT
VOUCHER_NO, ACCOUNT_NO, ACCOUNT_DIRECTION, BEFORE_AMOUNT, AMOUNT,
AFTER_AMOUNT, PAY_DATE,IS_STATICS
FROM
ACCOUNT_VOUCHER_FLOW
WHERE
MOD(VOUCHER_NO,4)
IN(0,4)
AND IS_STATICS = 0
LIMIT 0,500
2、selectTasks()方法很简单,构造上述需要的参数而已。
@Override
public List<AccountVoucherFlow> selectTasks(String taskParameters, String owenrSign, int taskNum, List<TaskItemDefine> list, int pageSize) throws Exception {
//在其他线程组发生故障转移时,list的元素会变化,这里循环取全部。
String taskDefineNums = "";
for(TaskItemDefine tmp in list){
taskDefineNums.contact(tmp.getTaskItemId()).contact(",");
}
//构造取数SQL参数,Hashtable的优势:高速读,运行时线程安全。
Hashtable<String, Object> query = new Hashtable<>();
query.put("dataIndex", taskItemNum);
query.put("taskDefineNums", taskDefineNums.subString(0,taskDefineNums.length - 1));
query.put("skipNum", 0);
query.put("pageSize", pageSize);
return accountVoucherService.selectForStatics(query);
}
3、excute()为数据处理汇总逻辑,供参考。
@Override
public boolean execute(AccountVoucherFlow[] accountVoucherFlows, String s) throws Exception {
Hashtable<Long, AccountVoucherSumTmp> sum = new Hashtable<>();
String voucherNos = "";
for (AccountVoucherFlow item : accountVoucherFlows) {
if (sum.containsKey(item.getAccountNo())) {
switch (VoucherFlowDirectionEnum.getByDirectionCode(item.getAccountDirection())) {
case IN:
sum.get(item.getAccountNo()).addAmount(item.getAmount());
break;
case OUT:
sum.get(item.getAccountNo()).subAmount(item.getAmount());
break;
}
} else {
AccountVoucherSumTmp tmp = new AccountVoucherSumTmp();
tmp.setAccountNo(item.getAccountNo());
tmp.setAccountingDate(new SimpleDateFormat("yyyyMMdd").format(item.getPayDate()));
switch (VoucherFlowDirectionEnum.getByDirectionCode(item.getAccountDirection())) {
case IN:
tmp.setIncomeAmount(item.getAmount());
break;
case OUT:
tmp.setExpendAmount(item.getAmount());
break;
}
sum.put(item.getAccountNo(), tmp);
}
voucherNos = voucherNos.concat(item.getVoucherNo().toString()).concat(",");
}
for (AccountVoucherSumTmp accountVoucherSumTmp : sum.values()) {
Hashtable<String, Object> sumVoucher = CommonUtils.convert2HashTable(accountVoucherSumTmp);
/*这里的insert方法内部使用的是Redis HINCRBY接口。
*HINCRBY当key不存在时会自动创建并初始化field为0,
*而存在时执行值增,它也是线程安全的。
*/
accountVoucherService.insertAccountVoucherSum(sumVoucher);
}
//已汇入Redis的数据更新其计算状态为已计算,避免重复处理。
accountVoucherService.update2Staticed(voucherNos.substring(0, voucherNos.length() - 1));
return true;
}
4、剩余的工作就是把Redis中数据取出进行最后汇总并批量入库了,这部分可以另起一个定时任务执行,也可以MQ通知,代码就不贴了,大家依照自己需求实现即可。