文档章节

任务调度在分布式部署环境下保证task的正确运行

 无聊书生
发布于 2016/09/26 20:21
字数 1542
阅读 187
收藏 6

在项目开发中,会经常使用的到定时任务。一般对与定时任务根据需求的不同、程序员的编码习惯,通常会有2种部署方式:

  1. 使用统一的任务调度系统进行管理。在分布式环境下,通过任务调度系统使用锁机制来保证同时只有一条服务器在运行定时任务。
  2. 在项目中直接配置task执行。在单服务器的情况下,此种方式是没有问题的,当在分布式环境下,就会存在多台服务器同时出发task。此时就需要单进行特殊的设置,以保证其同时只有一个服务器在运行task。

本文以讨论第2中情况下怎么解决在分布式环境下出现的问题。讨论的解决方法如下:

  1. 分别请求rpc服务,在服务中通过锁机制,来判断获得执行task权限的服务器
  2. 通过zk创建节点是否成功来进行判断
  3. 通过redis的Hsetnx 、Setnx的特性进行判断。

 

假设有同一个Task不是在3台服务器中,分别是A、B、C,远程服务S

 

RPC服务

描述:A、B、C三台服务器在执行task时同时请求远程服务S,在S中,通过一定的机制判定某台服务器具有执行task的权限,其他几台服务器则不具备。

在S服务中,判断机制特性为:先请求的具有执行权限。A\B\C请求时传输task唯一标识名,在S服务中接收到请求时,判断当前task标识是否进行记录,如已记录,则返回flase,表示不具备执行条件;反之不存在记录,对先对task标识进行存储,返回true,表示具有执行条件。在判断时,需要使用同步锁机制,保证同时只有一个请求在处于判断条件中。在请求执行完成后,再次S服务的刷新方法,释放已选择标识。

 

Zookeeper创建节点的方式(Java举例)

描述:A\B\C三台服务器在执行task时同时通过zk客户端创建同一node,创建成功的服务器则具有执行task的权限,否则不具备。

zk的判断机制为:zk客户端在创建节点时,会判断节点是否存在,如不存在则创建,反之出现异常。因此在执行task时,同时创建节点成功,则具有执行权限。伪代码如下:

/**
 * 使用zk在分布式部署情况下确保并发情况中只有一台服务在执行
 * 利用zk的创建node机制:当节点存在,则返回异常,否则创建成功。
 * @author aiyungui
 * @create 2016-09-27-11:24
 **/
public class ZkDistributedUtil {

    private CuratorFramework client;

    private String defaultName = "ink";
    private boolean isExecute = true;

    public ZkDistributedUtil(String connectStr){
        this(connectStr,null);
    }

    /**
     * 初始化zk
     * @param connectStr
     * @param namespace
     */
    public ZkDistributedUtil(String connectStr,String namespace){
        if (StringUtils.isBlank(namespace)){
            namespace = defaultName;
        }

        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
        client = builder.connectString(connectStr).sessionTimeoutMs(30000).connectionTimeoutMs(30000)
                .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
                .defaultData(null).build();

        client.start();
    }

    /**
     * 创建节点
     * @param nodePath
     * @throws Exception
     */
    public void createNode(String nodePath) throws Exception {
        client.create().creatingParentsIfNeeded().forPath(nodePath);
    }

    /**
     * 删除节点
     * @param nodePath
     * @throws Exception
     */
    public void deleteNode(String nodePath) throws Exception {
        client.delete().forPath(nodePath);
    }

    public static void main(String args[]){
        //调用示例
        try {
            ZkDistributedUtil watcherService = new ZkDistributedUtil("10.1.5.217:2181");
            watcherService.createNode("/timer/node");
            System.out.println("execute module ....");
            watcherService.deleteNode("/timer/node");

        } catch (Exception e) {
            System.out.println("创建节点失败");
        }
    }
}

redis判断方式

描述:A\B\C三台服务器在执行task时同时请求远程服务器S,在S中,通过redis的Hsetnx、setnx命令的特性,返回1则具有执行task的权限,否则不具备

redis的判断机制特性:redis的Hsetnx、setnx命令会在执行时判断是否存在对应的key,如存在则返回0,不存在则创建key,并进行赋值操作,返回1。此时可根据返回值为1或0来判定是否具有执行权限。伪代码如下:

/**
 *使用Redis在分布式部署情况下确保并发情况中只有一台服务在执行
 *利用redis的hsetnx命令特性:当key存在则返回0,反之创建成功并返回1
 * @author aiyungui
 * @create 2016-09-27-10:44
 **/
public class RedisDistributedUtil {

    private JedisPool jedisPool;

    /**
     * 初始化redis pool
     */
    public void init(){
        if (jedisPool == null)
            jedisPool = (JedisPool) SpringApplicationContext.getBean("jedisPool");
    }

    /**
     *是否获取执行权限 true为是,false为否
     * @param key
     * @param fieldId
     * @param value
     * @return
     */
    public boolean isCanExecute(String key,String fieldId,String value){
        boolean isExecute = true;
        Jedis jedis = null;
        try{
            if (jedisPool == null){
                init();
            }
            jedis = jedisPool.getResource();

            value = value==null?"1":value;
            Long result = jedis.hsetnx(key,fieldId,value);
            if (result == 0){
                isExecute = false;
            }

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if (jedis != null){
                jedis.close();
            }
        }

        return isExecute;
    }

    /**
     * 释放锁定的key信息
     * @param key
     * @param fieldId
     */
    public void refreshKey(String key,String fieldId){
        Jedis jedis = null;
        try{
            if (jedisPool == null){
                init();
            }
            jedis = jedisPool.getResource();
            jedis.hdel(key,fieldId);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if (jedis != null){
                jedis.close();
            }
        }
    }

    public static void main(String args[]){
        RedisDistributedUtil distributedUtil = new RedisDistributedUtil();
        boolean result = distributedUtil.isCanExecute("task","ink.test","1");
        if (result) {
            System.out.println("execute module ....");

            distributedUtil.refreshKey("task","ink.test");
        }

    }
}

 

上述3种方式能保证在分布式部署的环境下同时只有一台服务器具有task执行权限。但task的执行又主要分为一些两种:

  1. 每隔多长时间执行
  2. 在某个时间点执行

对于2,则上述方式能完全满足,对于1,则会因服务器启动时间的不同,出现在不同的时间点执行task,此时需要根据业务再进行区分,如A\B\C服务执行的task是否在不同时间执行都对业务数据的操作是否具有冥等性。如果具有冥等性,则不受影响。否则需要根据业务进行特殊处理。处理方式可有:

  • 在获得task执行权限的服务器在task任务执行完成后,休眠一段时间再通知zookeeper删除节点、redis删除key,RPC服务刷新,已保证task的执行是有效的
  • 对第一个获得执行task权限的服务器进行记录,下次执行依然使用该服务器,直至此服务器出现异常。出现异常后,再次选择最先获得task执行权限的服务器

© 著作权归作者所有

粉丝 2
博文 17
码字总数 24756
作品 0
石景山
高级程序员
私信 提问
阿里飞天云平台架构简介

原贴在这里:http://blog.csdn.net/yangcs2009/article/details/39292097。我做了部分修改。 飞天是由阿里云开发的一个大规模分布式计算系统,其中包括飞天内核和飞天开放服务。 飞天内核负责...

牧师-Panda
2016/10/16
865
0
宜信开源|分布式任务调度平台SIA-TASK的架构设计与运行流程

一、分布式任务调度的背景 无论是互联网应用或者企业级应用,都充斥着大量的批处理任务。我们常常需要一些任务调度系统来帮助解决问题。随着微服务化架构的逐步演进,单体架构逐渐演变为分布...

宜信技术学院
06/04
939
6
分布式定时任务框架---Uncode Schedule

分布式定时任务框架---Uncode Schedule rabbitGYK 关注 2016.11.27 20:36* 字数 1446 阅读 7141评论 5喜欢 36赞赏 1 博客原文 作为一个支付公司的项目组,经常会有很多对账功能(签约对账、支...

晨猫
2018/11/02
147
0
利用Mesos构建多任务调度系统

女主宣言 我们发现公司的服务器cpu, memory等资源利用并不充分;如果能够充分利用这些机器上的空闲资源同时又能保证业务服务的正常运行,将会节省不少的机器资源;所以我们研究了Mesos来构建...

ZVAyIVqt0UFji
2018/09/26
0
0
Flink1.7.2 Dataset 并行计算源码分析

Flink1.7.2 Dataset 并行计算源码分析 概述 了解Flink处理流程(用户程序 -> JobGrapth -> ExecutionGraph -> JobVertex -> ExecutionVertex -> 并行度 -> Task(DataSourceTask,BatchTask,Dat......

thinktothings
03/13
0
0

没有更多内容

加载失败,请刷新页面

加载更多

让《强化学习(第2版)》架起一座通往强化学习经典知识宝库的桥梁

上交大计算科学与工程系俞凯教授,5分钟口述讲解,带你快速认识了解年度重磅图书《强化学习(第二版)》! 在 AlphaGo战胜李世石之后,AlphaZero以其完全凭借自我学习超越人类在各种棋类游戏...

博文视点Bv
26分钟前
6
0
TLA7-EVM开发板的处理器、NOR FLASH、DDR3

TLA7-EVM开发板是一款由广州创龙基于Xilinx Artix-7系列FPGA自主研发的核心板+底板方式的开发板,可快速评估FPGA性能。核心板尺寸仅70mm*50mm,底板采用沉金无铅工艺的6层板设计,专业的PCB...

Tronlong创龙
35分钟前
4
0
UUID的变种-有序

为了解决UUID无序的问题,NHibernate在其主键生成方式中提供了Comb算法(combined guid/timestamp)。保留GUID的10个字节,用另6个字节表示GUID生成的时间(DateTime)。 /// <summary> //...

Canaan_
35分钟前
4
0
Netty学习(6)——通道间数据传输

1. FileChannel实现通道间的数据传输 在Java NIO中,如果两个通道中有一个是FileChannel,那你可以直接将数据从一个channel传输到另外一个channel。 transferFrom() FileChannel的transferF...

江左煤郎
39分钟前
4
0
AngularDOM操作

gtandsn
40分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部