Elastic-job 2.1.3 概述

原创
2017/04/18 11:04
阅读数 4.6K

Elastic-job-lite 2.1.3 代码详解

框架

Elastic-Job是一个分布式调度解决方案,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

Elastic-Job-Lite: 轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。

Elastic-Job-Cloud:  Mesos + Docker 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。

 

基本原理

Elastic-job并无作业调度中心节点,而是基于部署作业Quartz的程序在到达相应时间点时各自触发调度。zookeeper用于作业注册、信息存储、任务执行过程的状态标记等, 主作业实例在选举过程中产生后用于作业分片的计算。

zookeeper上创建节点树,保存任务配置信息;各监听TreeCacheListener托管于"/${jobName}"的TreeCache对象的ListenerContainer中。当zk的节点树变化(add、remove、update...)TreeCache&TreeNode<implements org.apache.zookeeper.Watcher、

org.apache.curator.framework.api.BackgroundCallback>处理watchedEvent的响应,TreeCache调用publishEvent方法异步唤醒所有TreeCacheListener。

同时将当前TreeNode再次绑定为TreeCache的path监听:
client.checkExists().usingWatcher(this).inBackground(this).forPath(path);client.getData().usingWatcher(this).inBackground(this)).forPath(this.path);client.getChildren().usingWatcher(this).inBackground(this)).forPath(this.path);

印证:

zookeeper在create、delete、setData、exists、getData、getACL、getChildren时都能定义AsyncCallback;但只有在 ZooKeeper构造、exists、getData、getChildren 能注册Watcher.

 

package com.dangdang.ddframe.job.lite.internal.listener;

import com.google.common.base.Charsets;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;

/**
 * 作业注册中心的监听器.
 * 
 * @author zhangliang
 */
public abstract class AbstractJobListener implements TreeCacheListener {
    
    @Override
    public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
        ChildData childData = event.getData();
        if (null == childData) {
            return;
        }
        String path = childData.getPath();
        if (path.isEmpty()) {
            return;
        }
        dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
    }
    
    protected abstract void dataChanged(final String path, final Type eventType, final String data);
}
TreeCache:

 	private void publishEvent(final TreeCacheEvent event) {
		if (treeState.get() != TreeState.CLOSED) {
			LOG.debug("publishEvent: {}", event);
			executorService.submit(() -> {
				try {
					callListeners(event);
				} catch (Exception e) {
					ThreadUtils.checkInterrupted(e);
					handleException(e);
				}
			});
		}
	}

 private void callListeners(final TreeCacheEvent event)
 {
     listeners.forEach(new Function<TreeCacheListener, Void>()
     {
         @Override
         public Void apply(TreeCacheListener listener)
         {
             try
             {
                 listener.childEvent(client, event);
             }
             catch ( Exception e )
             {
                 ThreadUtils.checkInterrupted(e);
                 handleException(e);
             }
             return null;
         }
     });
 }

 

如何使用

maven

elastic-job-lite使用在 zookeeper-3.4.6.jar基础上进行封装curator框架(2.10.0) 来操作zookeeper节点。

构建项目时,使用curator的版本都应该一致:

<dependencies>     
    <dependency>
		<groupId>com.dangdang</groupId>
		<artifactId>elastic-job-lite-core</artifactId>
		<version>2.1.3</version>
	</dependency>
	<dependency>
		<groupId>com.dangdang</groupId>
		<artifactId>elastic-job-lite-spring</artifactId>
		<version>2.1.3</version>
	</dependency>
</dependencies>
      <quartz.version>2.2.1</quartz.version>
      <curator.version>2.10.0</curator.version>

       <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
         </dependency>

spring接入

elastic-job-lite-spring包下spring.handlers、spring.schemas文件声明xml中命名空间和对应的标签。RegNamespaceHandler 、JobNamespaceHandler: extends NamespaceHandlerSupport
  job-ref配置优先级大于class属性配置,在JobScheduler的createJobDetail方法中会判定LitJob类属性elasticJob实例的来源。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
	xmlns:job="http://www.dangdang.com/schema/ddframe/job"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        ">
    	<!--配置作业注册中心 -->
	<reg:zookeeper id="regCenter" server-lists=" yourhost:2181"
		namespace="dd-job" base-sleep-time-milliseconds="1000"
		max-sleep-time-milliseconds="3000" max-retries="3" />

	<!-- 配置简单作业 -->
	<job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob"
		registry-center-ref="regCenter" cron="0/10 * * * * ?"
		sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />

	<bean id="yourRefJobBeanId" class="xxx.MySimpleRefElasticJob">
		<property name="fooService" ref="xxx.FooService" />
	</bean>

	<!-- 配置关联Bean作业 -->
	<job:simple id="simpleRefElasticJob" job-ref="yourRefJobBeanId"
		registry-center-ref="regCenter" cron="0/10 * * * * ?"
		sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />

	<!-- 配置数据流作业 -->
	<job:dataflow id="throughputDataflow" class="xxx.MyThroughputDataflowElasticJob"
		registry-center-ref="regCenter" cron="0/10 * * * * ?"
		sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />

	<!-- 配置脚本作业 -->
	<job:script id="scriptElasticJob" registry-center-ref="regCenter"
		cron="0/10 * * * * ?" sharding-total-count="3"
		sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" />

	<!-- 配置带监听的简单作业 -->
	<job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob"
		registry-center-ref="regCenter" cron="0/10 * * * * ?"
		sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C">
		<job:listener class="xx.MySimpleJobListener" />
		<job:distributed-listener class="xx.MyOnceSimpleJobListener"
			started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" />
	</job:simple>

	<!-- 配置带作业数据库事件追踪的简单作业 -->
	<job:simple id="eventTraceElasticJob" class="xxx.MySimpleListenerElasticJob"
		registry-center-ref="regCenter" cron="0/10 * * * * ?"
		sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C"
		event-trace-rdb-data-source="yourDataSource">
	</job:simple>


</beans>
public  class SmsNoticeTask implements SimpleJob {

	@Override
	public void execute(ShardingContext shardingContext) {
		logger.info("任务执行分片信息为:{}", shardingContext);
		//TODO do something

	}

 

node介绍

作业一旦启动成功后不能修改JobName,如果修改名称则视为新的作业实例。

${namespaces}/${JobName} 下持久化config、leaderserversinstances 、sharding主节点。

SchedulerFacade:
/**
 * 注册作业启动信息.
 * 
 * @param enabled 作业是否启用
 */
public void registerStartUpInfo(final boolean enabled) {
    listenerManager.startAllListeners();
    leaderService.electLeader();
    serverService.persistOnline(enabled);
    instanceService.persistOnline();
    shardingService.setReshardingFlag();
    monitorService.listen();
    if (!reconcileService.isRunning()) {
        reconcileService.startAsync();
    }
}

config

ConfigurationService
持久化节点,保存任务的参数配置。若zk上已经持久化配置,且没有设置overwrite为true,以zk为准。

JobScheduler.init() 
        -> schedulerFacade.updateJobConfiguration(liteJobConfig)
                ->  configService.persist(liteJobConfig)

/**
 * 持久化分布式作业配置信息.
 * 
 * @param liteJobConfig
 *            作业配置
 */
public void persist(final LiteJobConfiguration liteJobConfig) {
	checkConflictJob(liteJobConfig);// 校验JobClass; 校验zk上若存在config节点但数据为null,删除Job整个节点
	if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
		jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT,
				LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
	}
}

leader

LeaderService

持久化 electionsharding、failover 子节点。 

election

/**
 * 主节点路径.
 * 
 */
public final class LeaderNode {
    
    /**
     * 主节点根路径.
     */
    public static final String ROOT = "leader";
    
    static final String ELECTION_ROOT = ROOT + "/election";
    
    static final String INSTANCE = ELECTION_ROOT + "/instance";
    
    static final String LATCH = ELECTION_ROOT + "/latch";
    
    private final JobNodePath jobNodePath;
    
   ..........
}


当作业初始化注册或原主作业实例离线时,触发选主过程。 
 LeaderElectionJobListener、 LeaderAbdicationJobListener

  • latch: 持久化子节点。在选主过程中,因多节点分布式服务创建临时有序子节点来锁限制。

    通过LeaderLatch来分布式并发锁定选主过程。创建的有序临时节点当序号最小时获取到执行权。LeaderLatch.start() : 重置leadership,checkLeadership方法中判定当前自己节点序号是否最小,若是,设置Leadership = true。否则再次注册Watcher和BackgroundCallback来判定;
    LeaderLatch.
    wait() : 判定 leadership == false则Object.wait()。

    LeaderService.electLeader() -> jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback()) 
    LeaderService:
    
     /**
      * 选举主节点.
      */
    public void electLeader() {
        log.debug("Elect a new leader now.");
        jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
        log.debug("Leader election completed.");
    }
    
    
    @RequiredArgsConstructor
    class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
    
    	@Override
    	public void execute() {
    		if (!hasLeader()) {
    			jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE,
    					JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
    		}
    	}
    }
    
    -----------------------------------------------------------------------------------------
    
    JobNodeStorage:
    
    /**
     * 在主节点执行操作.
     * 
     * @param latchNode 分布式锁使用的作业节点名称
     * @param callback 执行操作的回调
     */
    public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
            latch.start();
            latch.await();
            callback.execute();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            handleException(ex);
        }
    }
    
    
    LeaderLatch:
    void reset() throws Exception
    {
        setLeadership(false);
        setNode(null);
    
        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( debugResetWaitLatch != null )
                {
                    debugResetWaitLatch.await();
                    debugResetWaitLatch = null;
                }
    
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    setNode(event.getName());
                    if ( state.get() == State.CLOSED )
                    {
                        setNode(null);
                    }
                    else
                    {
                        getChildren();
                    }
                }
                else
                {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
        client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
    		.inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }
    
    
    public void await() throws InterruptedException, EOFException {
        synchronized(this) {
            while(this.state.get() == LeaderLatch.State.STARTED && !this.hasLeadership.get()) {
                this.wait();
            }
        }
    
        if(this.state.get() != LeaderLatch.State.STARTED) {
            throw new EOFException();
        }
    }
    
  • instance: 临时子节点。当选主完成后生成节点并保存主服务jobInstanceId<作业运行实例Id>。
  • sharding

    ShardingService

    持久化necessary节点,当分片完成后将被删除。
    用于作业启动、分片总数变更、作业服务器变动、或作业运行实例变动情况下设置分片标记。

    ShardingListenerManager:
    
    class ShardingTotalCountChangedJobListener extends AbstractJobListener {
    
    	@Override
    	protected void dataChanged(final String path, final Type eventType, final String data) {
    		if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
    			int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig()
    					.getCoreConfig().getShardingTotalCount();
    			if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
    				shardingService.setReshardingFlag();
    				JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
    			}
    		}
    	}
    }
    
    class ListenServersChangedJobListener extends AbstractJobListener {
    
    	@Override
    	protected void dataChanged(final String path, final Type eventType, final String data) {
    		if (!JobRegistry.getInstance().isShutdown(jobName)
    				&& (isInstanceChange(eventType, path) || isServerChange(path))) {
    			shardingService.setReshardingFlag();
    		}
    	}

failover

  • FailoverService
  • FailoverListenerManager
    • FailoverSettingsChangedJobListener :
       设置failover=false时,移除所有分片sharding/${itemIndex}/failover节点。
    • JobCrashedJobListener:
        判定:failover == ture
                  && 事件源为instances节点下作业实例子节点删除<作业实例有离线>
                  &&  离线的jobInstanceId不为本机  
      则获取失效jobInstanceId处理的含sharding/${itemIndex}/failover标记的分片;若为空,则获取该失效jobInstanceId的所有正常分片 。将这些分片设置标记leader/failover/items/${itemIndex}。
      以leaderLatch的方式执行FailoverLeaderExecutionCallback (集群每次只能当前一个处理完成才能处理下一个,多个服务实例的处理线程会分布式锁竞争等待)
FailoverListenerManager

private boolean isFailoverEnabled() {
	LiteJobConfiguration jobConfig = configService.load(true);
	return null != jobConfig && jobConfig.isFailover();
}

class JobCrashedJobListener extends AbstractJobListener {

	@Override
	protected void dataChanged(final String path, final Type eventType, final String data) {
		if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
			String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
			if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
				return;
			}
			List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
			if (!failoverItems.isEmpty()) {
				for (int each : failoverItems) {
					failoverService.setCrashedFailoverFlag(each);
					failoverService.failoverIfNecessary();
				}
			} else {
				for (int each : shardingService.getShardingItems(jobInstanceId)) {
					failoverService.setCrashedFailoverFlag(each);
					failoverService.failoverIfNecessary();
				}
			}
		}
	}
}

servers

ServerService

当作业服务注册时,生成  服务器IP 持久化节点。所以按IP进行管理作业服务器。

instances

InstanceService

当作业服务注册时,生成临时作业运行实例Id 临时节点。该节点名称规则:

eg: 192.168.42.1@-@6260

package com.dangdang.ddframe.job.lite.api.strategy;

import com.dangdang.ddframe.job.util.env.IpUtils;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.lang.management.ManagementFactory;

/**
 * 作业运行实例.
 * 
 * @author zhangliang
 */
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode(of = "jobInstanceId")
public final class JobInstance {

	private static final String DELIMITER = "@-@";

	/**
	 * 作业实例主键.
	 */
	private final String jobInstanceId;

	public JobInstance() {
		jobInstanceId = IpUtils.getIp() + DELIMITER + 
                       ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
	}

	/**
	 * 获取作业服务器IP地址.
	 * 
	 * @return 作业服务器IP地址
	 */
	public String getIp() {
		return jobInstanceId.substring(0, jobInstanceId.indexOf(DELIMITER));
	}
}

sharding

  • ShardingService
  • ShardingListenerManager
    • ShardingTotalCountChangedJobListener
    • ListenServersChangedJobListener

服务初次注册、或服务实例发生变更、分片总数变更时促发分片。分片将在下次作业触发时执行,只有主节点可以分片,分片时的从节点都将阻塞。

  1. 判定有可用的作业实例服务,且有分片标记。
  2. 判定当前是否为有主服务,没有则触发选举,并等待选主完成。
  3. 判定自己是否主服务,如果不是,则等待直到主服务分片完成。
  4. 判定是否还有在执行过程中的分片,若有,则等待完成。
  5. 创建 leader/sharding/processing  临时节点。
  6. 清空原有的分片节点,并按现有的节点创建各sharding/${itemIndex} 持久化节点。
  7. 根据指定的策略进行分片,在一个事务中创建sharding/${itemIndex}/instance 节点 并填充JobInstanceId;并删除 leader/sharding/necessary  和 leader/sharding/processing  节点。
ShardingService:

/**
 * 如果需要分片且当前节点为主节点, 则作业分片.
 * 
 * <p>
 * 如果当前无可用节点则不分片.
 * </p>
 */
public void shardingIfNecessary() {
	List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
	if (!isNeedSharding() || availableJobInstances.isEmpty()) {
		return;
	}
	if (!leaderService.isLeaderUntilBlock()) {
		blockUntilShardingCompleted();
		return;
	}
	waitingOtherJobCompleted();
	LiteJobConfiguration liteJobConfig = configService.load(false);
	int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
	log.debug("Job '{}' sharding begin.", jobName);
	jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
	resetShardingInfo(shardingTotalCount);
	JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory
			.getStrategy(liteJobConfig.getJobShardingStrategyClass());
	jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(
			jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
	log.debug("Job '{}' sharding complete.", jobName);
}
展开阅读全文
打赏
3
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
3
分享
返回顶部
顶部