文档章节

TBSchedule源码阅读2-TBScheduleManagerFactory 定时任务ManagerFactoryTimerTask

穿林度水
 穿林度水
发布于 2016/11/07 14:27
字数 548
阅读 26
收藏 1

定时任务 :

主要功能:监听zookeeper状态,正常则this.factory.refresh(),异常则this.factory.reStart();

1 正常情况this.factory.refresh()

异常则停止所有调度任务
正常则重新分配调度器

1.1分配调度器

    public void reRegisterManagerFactory() throws Exception{
        //重新分配调度器
        List<String> stopList = this.getScheduleStrategyManager().registerManagerFactory(this);
        for (String strategyName : stopList) {
            this.stopServer(strategyName);
        }
        this.assignScheduleServer();
        this.reRunScheduleServer();
    }

1.1.1 this.assignScheduleServer();

    /**
     * 根据策略重新分配调度任务的机器
     * @throws Exception
     */
    public void assignScheduleServer() throws Exception{
        for(ScheduleStrategyRunntime run: this.scheduleStrategyManager.loadAllScheduleStrategyRunntimeByUUID(this.uuid)){
            List<ScheduleStrategyRunntime> factoryList = this.scheduleStrategyManager.loadAllScheduleStrategyRunntimeByTaskType(run.getStrategyName());
            if(factoryList.size() == 0 || this.isLeader(this.uuid, factoryList) ==false){
                continue;
            }
            ScheduleStrategy scheduleStrategy =this.scheduleStrategyManager.loadStrategy(run.getStrategyName());
            
            int[] nums =  ScheduleUtil.assignTaskNumber(factoryList.size(), scheduleStrategy.getAssignNum(), scheduleStrategy.getNumOfSingleServer());
            for(int i=0;i<factoryList.size();i++){
                ScheduleStrategyRunntime factory =  factoryList.get(i);
                //更新请求的服务器数量
                this.scheduleStrategyManager.updateStrategyRunntimeReqestNum(run.getStrategyName(), 
                        factory.getUuid(),nums[i]);
            }
        }
    }
  • 从策略目录获取所有策略信息
    策略目录/tbSchedule/zmlTbScheduleTest/strategy
    策略信息
[ScheduleStrategyRunntime [strategyName=DBDemoSingle-stratrery,
  uuid=192.168.255.1$zhuml$69F86F4CFF4E4DA397880E19411DF068$0000000005, 
  ip=null, kind=null, taskName=null, taskParameter=null, requestNum=0, currentNum=0, message=]]
  • 再根据策略名字DBDemoSingle-stratrery,获取策略详细信息
    目录路径:/tbSchedule/zmlTbScheduleTest/strategy/DBDemoSingle-stratrery

    {"strategyName":"DBDemoSingle-stratrery","IPList":["127.0.0.1"],"numOfSingleServer":0,"assignNum":4,
      "kind":"Schedule","taskName":"DBDemoSingle_task","taskParameter":"","sts":"resume"}
  • 然后任务分片,并把分片信息写到zookeeper上
    路径 /tbSchedule/zmlTbScheduleTest/strategy/DBDemoSingle-stratrery/192.168.10.249$zhuml$43AB1F13800E4469BBB2DD385C6A0193$0000000006
    data:

    [{"strategyName":"DBDemoSingle-stratrery","uuid":"192.168.10.249$zhuml$43AB1F13800E4469BBB2DD385C6A0193$0000000006","requestNum":4,"currentNum":0,"message":""}]

1.1.2 运行 this.assignScheduleServer();

    public void reRunScheduleServer() throws Exception{
        for (ScheduleStrategyRunntime run : this.scheduleStrategyManager.loadAllScheduleStrategyRunntimeByUUID(this.uuid)) {
            List<IStrategyTask> list = this.managerMap.get(run.getStrategyName());
            if(list == null){
                list = new ArrayList<IStrategyTask>();
                this.managerMap.put(run.getStrategyName(),list);
            }
            while(list.size() > run.getRequestNum() && list.size() >0){
                IStrategyTask task  =  list.remove(list.size() - 1);
                    try {
                        task.stop(run.getStrategyName());
                    } catch (Throwable e) {
                        logger.error("注销任务错误:strategyName=" + run.getStrategyName(), e);
                    }
                }
           //不足,增加调度器
           ScheduleStrategy strategy = this.scheduleStrategyManager.loadStrategy(run.getStrategyName());
           while(list.size() < run.getRequestNum()){
               IStrategyTask result = this.createStrategyTask(strategy);
               if(null==result){
                   logger.error("strategy 对应的配置有问题。strategy name="+strategy.getStrategyName());
               }
               list.add(result);
            }
        }
    }
  • 获取策略信息,根据分片数量判断是否停止任务或者创建任务
    停止任务stop就ok了
    创建任务 createStrategyTask
    /**
     * 创建调度服务器
     * @param baseTaskType
     * @param ownSign
     * @return
     * @throws Exception
     */
    public IStrategyTask createStrategyTask(ScheduleStrategy strategy)
            throws Exception {
        IStrategyTask result = null;
        try{
            if(ScheduleStrategy.Kind.Schedule == strategy.getKind()){
                String baseTaskType = ScheduleUtil.splitBaseTaskTypeFromTaskType(strategy.getTaskName());
                String ownSign =ScheduleUtil.splitOwnsignFromTaskType(strategy.getTaskName());
                result = new TBScheduleManagerStatic(this,baseTaskType,ownSign,scheduleDataManager);
            }else if(ScheduleStrategy.Kind.Java == strategy.getKind()){
                result=(IStrategyTask)Class.forName(strategy.getTaskName()).newInstance();
                result.initialTaskParameter(strategy.getStrategyName(),strategy.getTaskParameter());
            }else if(ScheduleStrategy.Kind.Bean == strategy.getKind()){
                result=(IStrategyTask)this.getBean(strategy.getTaskName());
                result.initialTaskParameter(strategy.getStrategyName(),strategy.getTaskParameter());
            }
        }catch(Exception e ){
            logger.error("strategy 获取对应的java or bean 出错,schedule并没有加载该任务,请确认" +strategy.getStrategyName(),e);
        }
        return result;
    }

创建任务有三种类型 public enum Kind{Schedule,Java,Bean}
这里只介绍Schedule:result = new TBScheduleManagerStatic(this,baseTaskType,ownSign,scheduleDataManager);

© 著作权归作者所有

上一篇: Disruptor 创建过程
下一篇: maven常用命令
穿林度水
粉丝 0
博文 250
码字总数 167120
作品 0
海淀
程序员
私信 提问
TBSchedule应用实战(零)—— 前言

TaoBaoSchedule(以下简称tbs)是国产市面上最早开源得一款“分布式定时任务调度”框架。开源之初因其卓越的性能、近乎零侵入的api设计、去中心化的设计思想、故障自动转移等等特性,几乎是一...

Wednesday_OSC
2017/12/18
947
0
tbschedule中创建策略之后为什么输入的中文乱码了 怎么解决

tbschedule中创建策略之后为什么输入的中文乱码了 该怎么解决 tBScheduleManagerFactory.getScheduleStrategyManager() .createScheduleStrategy(scheduleStrategy); List list = tBSchedul......

xujibai
2016/06/01
96
0
企业业务数据处理用“work”还是“MQ”

  近期公司在做架构梳理已经项目架构方向,不知不觉就引起了使用“work”跑数据还是用“MQ”进行跑数据的争论! 对于争论这件事在各行各业都有,其实我觉得针对“争论”这个词的根源在于一...

zygfengyuwuzu
2018/08/29
0
0
球推荐一个好用的开源的企业级定时调度任务的开源框架

类似tbschedule这样的,不过tbschedule常年没人维护了,存在许多bug,开得自己二次开发,不是很好用!求推荐一个类似的开源工具

小武0515
2017/07/18
205
1
分布式调度平台 XXL_Job

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 https://blog.csdn.net/wxb141001yxx/article/details/93927459 Where? 发展历史 特性发展 ...

Jessica-小波
06/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Java 脚本引擎入门

Java Script Engine Java 脚本引擎可以将脚本嵌入Java代码中,可以自定义和扩展Java应用程序,自JDK1.6被引入,基于Rhino引擎,JDK1.8后使用Nashorn引擎,支持ECMAScript 5,但后期还可能会换...

阿提说说
50分钟前
5
0
05.深入浅出索引(下)

在下面这个表T中,如果我们执行select * from T where k between 3 and 5,需要执行几次树的搜索操作,会扫描多少行? mysql> create table T ( id int primary key, k int not null default...

scgaopan
昨天
6
0
设计模式-中介者模式

设计模式-中介者模式 定义 用一个中介对象封装一系列的对象交互,中介者使各对象不需要显示地相互作用,从而使其耦合松散,而且可以独立地改变它们之间的交互 UML 组成部分主要包含 Mediat...

木本本
昨天
5
0
Python 周刊第 418 期

新闻 PyCon US 2020 开始接受财务赞助! https://pycon.blogspot.com/2019/10/financial-aid-launches-for-pycon-us-2020.html2020年 Python 美国开发者大会,tips: 中国也有,可以赞助国内的...

iCodeBugs
昨天
5
0
ThreadLocal源码阅读

首先,从set方法入手, // ThreadLocalpublic void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t);//这里可以看出,从Threa......

小海bug
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部