文档章节

分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业数据存储

DemonsI
 DemonsI
发布于 10/15 18:37
字数 2209
阅读 10
收藏 0

分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业数据存储

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-storage/ 

本文基于 Elastic-Job V2.1.5 版本分享

1. 概述

本文主要分享 Elastic-Job-Lite 作业数据存储

涉及到主要类的类图如下( 打开大图 ):

2. JobNodePath

JobNodePath,作业节点路径类。作业节点是在普通的节点前加上作业名称的前缀

在 Zookeeper 看一个作业的数据存储:

[zk: localhost:2181(CONNECTED) 65] ls /elastic-job-example-lite-java/javaSimpleJob
[leader, servers, config, instances, sharding]
  • elastic-job-example-lite-java:作业节点集群名,使用 ZookeeperConfiguration.namespace 属性配置。

  • javaSimpleJob:作业名字,使用 JobCoreConfiguration.jobName 属性配置。

  • config / servers / instances / sharding / leader:不同服务的数据存储节点路径。

JobNodePath,注释很易懂,点击链接查看。这里我们梳理下 JobNodePath 和其它节点路径类的关系:

Zookeeper 路径 JobNodePath 静态属性 JobNodePath 方法 节点路径类
config CONFIG_NODE #getConfigNodePath() ConfigurationNode
servers SERVERS_NODE #getServerNodePath() ServerNode
instances INSTANCES_NODE #getInstancesNodePath() InstanceNode
sharding SHARDING_NODE #getShardingNodePath() ShardingNode
leader / #getFullPath(node) LeaderNode
leader/failover / #getFullPath(node) FailoverNode
guarantee / #getFullPath(node) GuaranteeNode

3. JobNodeStorage

JobNodeStorage,作业节点数据访问类。

Elastic-Job-Lite 使用注册中心存储作业节点数据,JobNodeStorage 对注册中心提供的方法做下简单的封装提供调用。举个例子:

// JobNodeStorage.java
private final CoordinatorRegistryCenter regCenter;
private final JobNodePath jobNodePath;

/**
* 判断作业节点是否存在.
* 
* @param node 作业节点名称
* @return 作业节点是否存在
*/
public boolean isJobNodeExisted(final String node) {
   return regCenter.isExisted(jobNodePath.getFullPath(node));
}

// JobNodePath.java
/**
* 获取节点全路径.
* 
* @param node 节点名称
* @return 节点全路径
*/
public String getFullPath(final String node) {
   return String.format("/%s/%s", jobName, node);
}
  • 传递的参数 node 只是简单的作业节点名称,通过调用 JobNodePath#getFullPath(…) 方法获取节点全路径。

  • 其它方法类似,有兴趣的同学点击链接查看。

4. ConfigurationNode

ConfigurationNode,配置节点路径。

在 Zookeeper 看一个作业的配置节点数据存储:

[zk: localhost:2181(CONNECTED) 67] get /elastic-job-example-lite-java/javaSimpleJob/config
{"jobName":"javaSimpleJob","jobClass":"com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob","jobType":"SIMPLE","cron":"0/5 * * * * ?","shardingTotalCount":3,"shardingItemParameters":"0\u003dBeijing,1\u003dShanghai,2\u003dGuangzhou","jobParameter":"","failover":true,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution":true,"maxTimeDiffSeconds":-1,"monitorPort":-1,"jobShardingStrategyClass":"","reconcileIntervalMinutes":10,"disabled":false,"overwrite":true}
  • /config 是持久节点,存储Lite作业配置( LiteJobConfiguration ) JSON化字符串。

ConfigurationNode 代码如下:

public final class ConfigurationNode {

    static final String ROOT = "config";
}

ConfigurationNode 如何读取、存储,在《Elastic-Job-Lite 源码分析 —— 作业配置》的「3.」作业配置服务已经详细解析。

5. ServerNode

ServerNode,服务器节点路径。

在 Zookeeper 看一个作业的服务器节点数据存储: 

[zk: localhost:2181(CONNECTED) 72] ls /elastic-job-example-lite-java/javaSimpleJob/servers
[192.168.16.164, 169.254.93.156, 192.168.252.57, 192.168.16.137, 192.168.3.2, 192.168.43.31]
[zk: localhost:2181(CONNECTED) 73] get /elastic-job-example-lite-java/javaSimpleJob/servers/192.168.16.164
  • /servers/ 目录下以 IP 为数据节点路径存储每个服务器节点。如果相同IP服务器有多个服务器节点,只存储一个 IP 数据节点。

  • /servers/${IP} 是持久节点,不存储任何信息,只是空串( "");

ServerNode 代码如下:

public final class ServerNode {

    /**
     * 服务器信息根节点.
     */
    public static final String ROOT = "servers";

    private static final String SERVERS = ROOT + "/%s";
}

ServerNode 如何存储,在《Elastic-Job-Lite 源码分析 —— 作业初始化》的「3.2.4」注册作业启动信息已经详细解析。

6. InstanceNode

InstanceNode,运行实例节点路径。

在 Zookeeper 看一个作业的运行实例节点数据存储: 

[zk: localhost:2181(CONNECTED) 81] ls /elastic-job-example-lite-java/javaSimpleJob/instances
[192.168.16.137@-@56010]
[zk: localhost:2181(CONNECTED) 82] get /elastic-job-example-lite-java/javaSimpleJob/instances
  • /instances 目录下以作业实例主键( JOB_INSTANCE_ID ) 为数据节点路径存储每个运行实例节点。

  • /instances/${JOB_INSTANCE_ID} 是临时节点,不存储任何信息,只是空串( "");

  • JOB_INSTANCE_ID 生成方式:

    // JobInstance.java
    
    jobInstanceId = IpUtils.getIp()
                    + DELIMITER
                    + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; // PID
    

InstanceNode 代码如下:

public final class InstanceNode {

    /**
     * 运行实例信息根节点.
     */
    public static final String ROOT = "instances";

    private static final String INSTANCES = ROOT + "/%s";

    /**
     * 获取当前运行实例节点路径
     *
     * @return 当前运行实例节点路径
     */
    String getLocalInstanceNode() {
        return String.format(INSTANCES, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
    }
}

InstanceNode 如何存储,在《Elastic-Job-Lite 源码分析 —— 作业初始化》的「3.2.4」注册作业启动信息已经详细解析。

7. ShardingNode

ShardingNode,分片节点路径。

在 Zookeeper 看一个作业的分片节点数据存储: 

[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/sharding
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 2] ls /elastic-job-example-lite-java/javaSimpleJob/sharding/0
[running, instance, misfire]
[zk: localhost:2181(CONNECTED) 3] get /elastic-job-example-lite-java/javaSimpleJob/sharding/0/instance
192.168.16.137@-@56010
  • /sharding/${ITEM_ID} 目录下以作业分片项序号( ITEM_ID ) 为数据节点路径存储作业分片项的 instance / running / misfire / disable 数据节点信息。

  • /sharding/${ITEM_ID}/instance 是临时节点,存储该作业分片项分配到的作业实例主键JOB_INSTANCE_ID )。在《Elastic-Job-Lite 源码分析 —— 作业分片》详细解析。

  • /sharding/${ITEM_ID}/running 是临时节点,当该作业分片项正在运行,存储空串( "" );当该作业分片项不在运行,移除该数据节点。《Elastic-Job-Lite 源码分析 —— 作业执行》的「4.6」执行普通触发的作业已经详细解析。

  • /sharding/${ITEM_ID}/misfire 是永久节点,当该作业分片项被错过执行,存储空串( "" );当该作业分片项重新执行,移除该数据节点。《Elastic-Job-Lite 源码分析 —— 作业执行》的「4.7」执行被错过触发的作业已经详细解析。

  • /sharding/${ITEM_ID}/disable 是永久节点,当该作业分片项被禁用,存储空串( "" );当该作业分片项被开启,移除数据节点。

ShardingNode,代码如下:

public final class ShardingNode {

    /**
     * 执行状态根节点.
     */
    public static final String ROOT = "sharding";

    static final String INSTANCE_APPENDIX = "instance";

    public static final String INSTANCE = ROOT + "/%s/" + INSTANCE_APPENDIX;

    static final String RUNNING_APPENDIX = "running";

    static final String RUNNING = ROOT + "/%s/" + RUNNING_APPENDIX;

    static final String MISFIRE = ROOT + "/%s/misfire";

    static final String DISABLED = ROOT + "/%s/disabled";

    static final String LEADER_ROOT = LeaderNode.ROOT + "/" + ROOT;

    static final String NECESSARY = LEADER_ROOT + "/necessary";

    static final String PROCESSING = LEADER_ROOT + "/processing";
}
  • LEADER_ROOT / NECESSARY / PROCESSING 放在「4.7」LeaderNode 解析。

8. LeaderNode

LeaderNode,主节点路径。

在 leader 目录下一共有三个存储子节点:

  • election:主节点选举。

  • sharding:作业分片项分配。

  • failover:作业失效转移。

主节点选举

在 Zookeeper 看一个作业的 leader/election 节点数据存储: 

[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/leader/election
[latch, instance]
[zk: localhost:2181(CONNECTED) 2] get /elastic-job-example-lite-java/javaSimpleJob/leader/election/instance
192.168.16.137@-@1910
  • /leader/election/instance 是临时节点,当作业集群完成选举后,存储主作业实例主键( JOB_INSTANCE_ID )。

  • /leader/election/latch 主节点选举分布式锁,是 Apache Curator 针对 Zookeeper 实现的分布式锁的一种,笔者暂未了解存储形式,无法解释。在《Elastic-Job-Lite 源码分析 —— 注册中心》的「3.1」在主节点执行操作进行了简单解析。

作业分片项分配

在 Zookeeper 看一个作业的 leader/sharding 节点数据存储: 

[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/leader/sharding
[necessary, processing]
[zk: localhost:2181(CONNECTED) 2] 个get /elastic-job-example-lite-java/javaSimpleJob/leader/sharding

[zk: localhost:2181(CONNECTED) 3] 个get /elastic-job-example-lite-java/javaSimpleJob/leader/processing
  • /leader/sharding/necessary 是永久节点,当相同作业有新的作业节点加入或者移除时,存储空串( ""),标记需要进行作业分片项重新分配;当重新分配完成后,移除该数据节点。

  • /leader/sharding/processing 是临时节点,当开始重新分配作业分片项时,存储空串( "" ),标记正在进行重新分配;当重新分配完成后,移除该数据节点。

  • 当且仅当作业节点为主节点时,才可以执行作业分片项分配,《Elastic-Job-Lite 源码分析 —— 作业分片》详细解析。

作业失效转移

作业失效转移数据节点在 FailoverNode,放在「9」FailoverNode 解析。

这里大家可能会和我一样比较疑惑,为什么 /leader/failover 放在 /leader 目录下,而不独立成为一个根目录?经过确认,作业失效转移 设计到分布式锁,统一存储在 /leader 目录下。

LeaderNode,代码如下:

public final class LeaderNode {

    /**
     * 主节点根路径.
     */
    public static final String ROOT = "leader";

    static final String ELECTION_ROOT = ROOT + "/election";

    static final String INSTANCE = ELECTION_ROOT + "/instance";

    static final String  LATCH = ELECTION_ROOT + "/latch";
}

9. FailoverNode

FailoverNode,失效转移节点路径。

在 Zookeeper 看一个作业的失效转移节点数据存储: 

[zk: localhost:2181(CONNECTED) 2] ls /elastic-job-example-lite-java/javaSimpleJob/leader/failover
[latch, items]
[zk: localhost:2181(CONNECTED) 4] ls /elastic-job-example-lite-java/javaSimpleJob/leader/failover/items
[0]
  • /leader/failover/latch 作业失效转移分布式锁,和 /leader/failover/latch 是一致的。

  • /leader/items/${ITEM_ID} 是永久节点,当某台作业节点 CRASH 时,其分配的作业分片项标记需要进行失效转移,存储其分配的作业分片项的 /leader/items/${ITEM_ID} 为空串( "" );当失效转移标记,移除 /leader/items/${ITEM_ID},存储 /sharding/${ITEM_ID}/failover 为空串( "" ),临时节点,需要进行失效转移执行。《Elastic-Job-Lite 源码分析 —— 作业失效转移》详细解析。

FailoverNode 代码如下:

public final class FailoverNode {

    static final String FAILOVER = "failover";

    static final String LEADER_ROOT = LeaderNode.ROOT + "/" + FAILOVER;

    static final String ITEMS_ROOT = LEADER_ROOT + "/items";

    static final String ITEMS = ITEMS_ROOT + "/%s";

    static final String LATCH = LEADER_ROOT + "/latch";

    private static final String EXECUTION_FAILOVER = ShardingNode.ROOT + "/%s/" + FAILOVER;

    static String getItemsNode(final int item) {
        return String.format(ITEMS, item);
    }

    static String getExecutionFailoverNode(final int item) {
        return String.format(EXECUTION_FAILOVER, item);
    }
}

10. GuaranteeNode

GuaranteeNode,保证分布式任务全部开始和结束状态节点路径。

© 著作权归作者所有

DemonsI
粉丝 35
博文 343
码字总数 866800
作品 0
朝阳
程序员
私信 提问
分布式作业 Elastic Job 如何动态调整?

前面分享了两篇分布式作业调度框架 Elastic Job 的介绍及应用实战。 ElasticJob-分布式作业调度神器 分布式作业 Elastic Job 快速上手指南! Elastic Job 提供了简单易用的运维平台,方便用...

Java技术栈
08/24
0
0
分布式定时任务调度平台Elastic-Job技术详解

在我们的项目当中,使用定时任务是避免不了的,我们在部署定时任务时,通常只部署一台机器。部署多台机器时,同一个任务会执行多次。比如给用户发送邮件定时任务,每天定时的给用户下发邮件。...

adi851270440
05/29
0
0
分布式定时任务Elastic-Job框架在SpringBoot工程中的应用实践(二)

文章摘要:在生产环境中部署Elastic-Job集群后,那么如何来运维监控线上跑着的定时任务呢? 如果在生产环境的大规模服务器集群上部署了集成Elastic-Job的业务工程,而没有相应的运维监控工具...

癫狂侠
05/15
0
0
SpringBoot整合Elastic-Job,实现动态创建定时任务,任务持久化

SpringBoot使用Elastic-Job-lite,实现动态创建定时任务,任务持久化 Elastic-Job是当当开源的一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。 ...

oppo5630
04/16
0
0
分布式定时任务Elastic-Job框架在SpringBoot工程中的应用实践(一)

摘要:如何构建具备作业分片和弹性扩缩容的定时任务系统是每个大型业务系统在设计时需要考虑的重要问题? 对于构建一般的业务系统来说,使用Quartz或者Spring Task即可基本满足我们的单体服用...

癫狂侠
05/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

day150-2018-11-17-英语流利阅读-待学习

歪果仁也疯狂:海外版抖音的征途 毛西 2018-11-17 1.今日导读 海外版抖音 TikTok 于 2017 年 5 月上线海外,至今覆盖全球 150 多个国家和地区,月活跃用户数已突破 5 亿。然而,“出海”的抖...

飞鱼说编程
今天
6
0
分布式学习最佳实践:从分布式系统的特征开始(附思维导图)

什么是分布式系统 回到顶部   分布式系统是由一组通过网络进行通信、为了完成共同的任务而协调工作的计算机节点组成的系统。分布式系统的出现是为了用廉价的、普通的机器完成单个计算机无法...

dragon_tech
今天
4
0
TOKEN设计

TOKEN设计 Api_Token 首先需要知道API是什么? API(Application Programming Interface)即应用程序接口。你可以认为 API 是一个软件组件或是一个 Web 服务与外界进行的交互的接口。而我们在...

DrChenXX
今天
3
0
浅谈“李氏代换”——从纪念金庸和斯坦李说起

李氏代换(LSP)简介 李氏代换是软件设计的一个原则,又名依赖倒转原则或依赖倒置原则,其衍生原则有接口分离原则等。该原则由Barbara Liskov于1988年提出。 该原则指出,程序中高级别的元素...

SamYjy
今天
35
0
JavaScript实现在线websocket WSS测试工具 -toolfk程序员工具网

本文要推荐的[ToolFk]是一款程序员经常使用的线上免费测试工具箱,ToolFk 特色是专注于程序员日常的开发工具,不用安装任何软件,只要把内容贴上按一个执行按钮,就能获取到想要的内容结果。T...

toolfk
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部