文档章节

分布式任务调度框架

胡贵
 胡贵
发布于 2015/07/17 14:52
字数 1546
阅读 650
收藏 10

1. 框架概况:

LTS是一个轻量级分布式任务调度框架。有三种角色, JobClient, JobTracker, TaskTracker。各个节点都是无状态的,可以部署多个,来实现负载均衡,实现更大的负载量, 并且框架具有很好的容错能力。 采用多种注册中心(Zookeeper,redis等)进行节点信息暴露,master选举。(Mongo or Mysql)存储任务队列和任务执行日志, netty做底层通信。

  • JobClient : 主要负责提交任务, 和 接收任务执行反馈结果。

  • JobTracker : 负责接收并分配任务,任务调度。

  • TaskTracker: 负责执行任务,执行完反馈给JobTracker。

支持任务类型:

  • 实时任务

  • 也支持定时任务 (如:3天之后执行)

  • CronExpression (如:0 0/1 * * * ?)

感兴趣,请加QQ群:109500214 一起探讨、完善。并且记得star一下哈,3Q

github地址:https://github.com/qq254963746/light-task-scheduler

2.架构图

Aaron Swartz

2.1节点组:

  • 1. 一个节点组等同于一个集群,同一个节点组中的各个节点是对等的,外界无论连接节点组中的任务一个节点都是可以的。

  • 2. 每个节点组中都有一个master节点(master宕机,会自动选举出新的master节点),框架会提供接口API来监听master节点的变化,用户可以自己使用master节点做自己想做的事情。

  • 3. JobClient和TaskTracker都可以存在多个节点组。譬如 JobClient 可以存在多个节点组。 譬如:JobClient 节点组为 ‘lts_WEB’ 中的一个节点提交提交一个 只有节点组为’lts_TRADE’的 TaskTracker 才能执行的任务。

  • 4. (每个集群中)JobTacker只有一个节点组。

  • 5. 多个JobClient节点组和多个TaskTracker节点组再加上一个JobTacker节点组, 组成一个大的集群。

3. 工作流程:

  • 1. JobClient 提交一个 任务 给 JobTracker, 这里我提供了两种客户端API, 一种是如果JobTracker 不存在或者提交失败,直接返回提交失败。另一种客户端是重试客户端, 如果提交失败,先存储到本地FailStore(可以使用NFS来达到同个节点组共享leveldb文件的目的,多线程访问,已经做了文件锁处理),返回给客户端提交成功的信息,待JobTracker可用的时候,再将任务提交。

  • 2. JobTracker收到JobClient提交来的任务,将任务存入任务队列。JobTracker等待TaskTracker的Pull请求,然后将任务Push给TaskTracker去执行。

  • 3. TaskTracker收到JobTracker分发来的任务之后,然后从线程池中拿到一个线程去执行。执行完毕之后,再反馈任务执行结果给JobTracker(成功or 失败[失败有失败错误信息]),如果发现JobTacker不可用,那么存储本地FailStore,等待TaskTracker可用的时候再反馈。反馈结果的同时,询问JobTacker有没有新的任务要执行。

  • 4. JobTacker收到TaskTracker节点的任务结果信息。根据任务信息决定要不要反馈给客户端。不需要反馈的直接删除,需要反馈的,直接反馈,反馈失败进入FeedbackQueue, 等待重新反馈。

  • 5. JobClient收到任务执行结果,进行自己想要的逻辑处理。

4. 特性

  • 负载均衡:

    • JobClient和TaskTracker可是根据自己设置的负载均衡策略来请求JobTracker节点组中的一个节点。当连接上后将一直保持连接这个节点,保持连接通道,直到这个节点不可用,减少每次都重新连接一个节点带来的性能开销。

  • 健壮性:

    • 当节点组中的一个节点当机之后,自动转到其他节点工作。当整个节点组当机之后,将会采用存储文件的方式,待节点组可用的时候进行重发。

    • 当执行任务的TaskTracker节点当机之后,JobTracker会将这个TaskTracker上的未完成的任务(死任务),重新分配给节点组中其他节点执行。

  • 伸缩性:

    • 因为各个节点都是无状态的,可以动态增加机器部署实例, 节点关注者会自动发现。

  • 扩展性:

    • 采用和dubbo一样的SPI扩展方式,可以实现任务队列扩展,日志记录器扩展等

5. 日志记录

对于任务的分发,执行,还有用户通过 (BizLogger) 【LtsLoggerFactory.getBizLogger()】 输入的业务日志,LTS都有记录,用户可以在LTS Admin 后台界面查看某个任务的所有日志,可以实时查看这个任务的执行情况。

6. 开发计划:

  • WEB后台管理:性能统计分析,预警等

  • 实现LTS的分布式队列存储

7. LTS Admin

Aaron Swartz

8. 调用示例

下面提供的是最简单的配置方式。更多配置请查看 lts-example 模块下的 API 调用方式例子.

8.1 JobTracker 端

    final JobTracker jobTracker = new JobTracker();    
    // 节点信息配置
    jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");    
    // 1. 任务队列用mongo
    jobTracker.addConfig("job.queue", "mongo");    
    // mongo 配置
    jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017"); 
    jobTracker.addConfig("mongo.database", "lts");
    jobTracker.setOldDataHandler(new OldDataDeletePolicy());    
    // 启动节点
    jobTracker.start();

8.2 TaskTracker端

    TaskTracker taskTracker = new TaskTracker();
    taskTracker.setJobRunnerClass(TestJobRunner.class);
    taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
    taskTracker.setNodeGroup("test_trade_TaskTracker");
    taskTracker.setWorkThreads(20);
    taskTracker.start();    // 任务执行类
    public class TestJobRunner implements JobRunner {        
        @Override
        public void run(Job job) throws Throwable {            
           System.out.println("我要执行"+ job);            
           System.out.println(job.getParam("shopId"));            
           // TODO 用户自己的业务逻辑, 应该保证幂等
            try {                
                  Thread.sleep(5*1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

8.3 JobClient端

    JobClient jobClient = new RetryJobClient();    
    // final JobClient jobClient = new JobClient();
    jobClient.setNodeGroup("test_jobClient");
    jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
    jobClient.start();    // 提交任务
    Job job = new Job();
    job.setTaskId("3213213123");
    job.setParam("shopId", "11111");
    job.setTaskTrackerNodeGroup("test_trade_TaskTracker");    
    // job.setCronExpression("0 0/1 * * * ?");  
    // 支持 cronExpression表达式
    // job.setTriggerTime(new Date()); // 支持指定时间执行
    Response response = jobClient.submitJob(job);


© 著作权归作者所有

共有 人打赏支持
胡贵
粉丝 14
博文 1
码字总数 1546
作品 1
杭州
技术主管
私信 提问
基于spring+quartz的分布式任务调度

学习地址:http://www.roncoo.com/course/view/e2b459016e2e477dbd5d67c8b23fe86d 课程介绍 Quartz是OpenSymphony开源组织在Job scheduling领域又一个开源项目,它可以与J2EE与J2SE应用程序相...

小红牛
2018/04/19
0
0
分布式定时任务框架---Uncode Schedule

分布式定时任务框架---Uncode Schedule rabbitGYK 关注 2016.11.27 20:36* 字数 1446 阅读 7141评论 5喜欢 36赞赏 1 博客原文 作为一个支付公司的项目组,经常会有很多对账功能(签约对账、支...

晨猫
2018/11/02
0
0
justlive1/earth-frost

earth-frost earth-frost是一个轻量级分布式任务调度框架。 介绍 调度模块和执行模块分离 使用redis作为数据库 基于订阅模式实现服务注册和发现 功能 简单易上手,支持web界面进行任务的CRU...

justlive1
2018/04/28
0
0
Ray - 面向增强学习场景的分布式计算框架

如果关注这个领域的同学可能知道,Ray其实在去年就已经在开源社区正式发布了,只不过后来就一直没有什么太大动静,前段时间也是因为机缘巧合,我又回头学习了解了一下,顺便总结如下: Ray是...

colorant
2018/05/23
0
0
代码神童/YayCrawler

YayCrawler 项目简介 YayCrawler爬虫技术交流 559745472 欢迎加群讨论,快速启动!!! 项目目标 在力所能及的情况下,最大限度的提高 Web爬虫开发人员的生产力,爬虫框架里的一股清流 主要功...

代码神童
2016/08/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

租房软件隐私保护如同虚设

近日,苏州市民赵先生向江苏新闻广播新闻热线025-84658888反映,他在“安居客”手机应用软件上浏览二手房信息,并且使用该软件自动生成的虚拟号码向当地一家中介公司进行咨询。可电话刚挂不久...

linux-tao
今天
1
0
分布式项目(五)iot-pgsql

书接上回,在Mapping server中,我们已经把数据都整理好了,现在利用postgresql存储历史数据。 iot-pgsql 构建iot-pgsql模块,这里我们写数据库为了性能考虑不在使用mybatis,换成spring jd...

lelinked
今天
4
0
一文分析java基础面试题中易出错考点

前言 这篇文章主要针对的是笔试题中出现的通过查看代码执行结果选择正确答案题材。 正式进入题目内容: 1、(单选题)下面代码的输出结果是什么? public class Base { private Strin...

一看就喷亏的小猿
今天
2
0
cocoapods 用法

cocoapods install pod install 更新本地已经install的仓库 更新所有的仓库 pod update --verbose --no-repo-update 更新制定的仓库 pod update ** --verbose --no-repo-update...

HOrange
今天
3
0
linux下socket编程实现一个服务器连接多个客户端

使用socekt通信一般步骤 1)服务器端:socker()建立套接字,绑定(bind)并监听(listen),用accept()等待客户端连接。 2)客户端:socker()建立套接字,连接(connect)服务器,连接上后...

shzwork
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部