文档章节

分布式调度Elastic-Job攻略

算法之名
 算法之名
发布于 2018/11/07 19:48
字数 1163
阅读 367
收藏 6

昨天虽然试用了一下唯品会的“土星”,但是我实在没想明白他的Job该怎么用Spring来托管,所以没有使用。今天来说一下当当的Elastic-Job.

安装管理平台

先说一下Elastic-Job的管理平台跟Java的Job开发没有半毛钱关系,他只是把注册进Zookeeper的信息读取出来,进行控制。

先下载Elastic-Job的源代码git clone https://github.com/elasticjob/elastic-job-lite.git,编译之后找到elastic-job-lite-console-3.0.0.M1-SNAPSHOT.tar.gz上传到服务器,解压之后进入/bin目录。

先不要急着执行start.sh,因为这个文件里面有Windows字符,linux无法识别。

所以先执行   sed -i 's/\r$//' start.sh  改掉Windows字符。

然后执行nohup ./start.sh &

在浏览器打开服务器的8899端口

先配置好zookeeper的注册中心地址,命名空间,基本上这个就装好了。

编写Java Job

pom

<dependency>
   <groupId>com.github.kuhn-he</groupId>
   <artifactId>elastic-job-lite-spring-boot-starter</artifactId>
   <version>2.1.5</version>
   <exclusions>
      <exclusion>
         <artifactId>curator-client</artifactId>
         <groupId>org.apache.curator</groupId>
      </exclusion>
      <exclusion>
         <artifactId>curator-framework</artifactId>
         <groupId>org.apache.curator</groupId>
      </exclusion>
      <exclusion>
         <artifactId>curator-recipes</artifactId>
         <groupId>org.apache.curator</groupId>
      </exclusion>
   </exclusions>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-framework</artifactId>
   <version>2.12.0</version>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>2.12.0</version>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-client</artifactId>
   <version>2.12.0</version>
</dependency>

资源文件添加

elaticjob:
  zookeeper:
    server-lists: 192.168.5.129:2188
    namespace: elastic-job-lite-springboot

这里要跟你管理平台的保持一致

配置文件

@Configuration
@ConditionalOnExpression("'${elatic.zookeeper.server-lists}'.length() >0")
public class ElasticConfig {
    /**
     * 初始化配置
     * @param serverList
     * @param namespace
     * @return
     */
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${elaticjob.zookeeper.server-lists}") String serverList
            , @Value("${elaticjob.zookeeper.namespace}") String namespace) {

        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }

    /**
     * 设置活动监听,前提是已经设置好了监听,见下一个目录
     * @return
     */
    @Bean
    public ElasticJobListener elasticJobListener() {
        return new ElasticJobListener(100, 100);
    }
}

监听器

@Component
public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
    /**
     * 设置间隔时间
     * @param startedTimeoutMilliseconds
     * @param completedTimeoutMilliseconds
     */
    public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }

    /**
     * 任务开始
     * @param shardingContexts
     */
    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        System.out.println("任务开始");
    }

    /**
     * 任务结束
     * @param shardingContexts
     */
    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        System.err.println("任务结束");
    }

}

Job

@Component
@ElasticSimpleJob(cron="0/5 * * * * ?",jobName="jobTask",shardingTotalCount=2,jobParameter="测试参数",shardingItemParameters="0=Chengdu0,1=Chengdu1")
public class StockSimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        switch (shardingContext.getShardingItem()) {
            case 0:
                System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, " +
                                "当前分片项: %s.当前参数: %s," +
                                "当前任务名称: %s.当前任务参数: %s"
                        ,
                        Thread.currentThread().getId(),
                        shardingContext.getShardingTotalCount(),
                        shardingContext.getShardingItem(),
                        shardingContext.getShardingParameter(),
                        shardingContext.getJobName(),
                        shardingContext.getJobParameter()

                        )
                );
                break;
            case 1:
                System.out.println("啦啦啦");
                break;
            default:
                break;
        }
    }
}

启动项目可以看到

2018-11-07 19:29:08.900  INFO [schedule-center,,,] 21244 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8011 (http) with context path '/api-sc'
2018-11-07 19:29:08.901  INFO [schedule-center,,,] 21244 --- [           main] .s.c.n.e.s.EurekaAutoServiceRegistration : Updating port to 8011
2018-11-07 19:29:08.903  INFO [schedule-center,,,] 21244 --- [           main] c.c.schedule.ScheduleCenterApplication   : Started ScheduleCenterApplication in 16.618 seconds (JVM running for 17.331)
啦啦啦
------Thread ID: 96, 任务总片数: 2, 当前分片项: 0.当前参数: Chengdu0,当前任务名称: com.cloud.schedule.jobs.StockSimpleJob.当前任务参数: 
------Thread ID: 100, 任务总片数: 2, 当前分片项: 0.当前参数: Chengdu0,当前任务名称: com.cloud.schedule.jobs.StockSimpleJob.当前任务参数: 
啦啦啦
------Thread ID: 102, 任务总片数: 2, 当前分片项: 0.当前参数: Chengdu0,当前任务名称: com.cloud.schedule.jobs.StockSimpleJob.当前任务参数: 
啦啦啦

查看平台的作业

我们可以在这里面修改他的配置

经启动多个实例,我们可以看到分片会被多个实例均摊,相同的分片只会在一个进程内执行,多个Job也是一样,不会重复执行。退出一个进程,单个进程就会执行全部分片,实现了高可用。

动态添加Job

//@Component
//@ElasticSimpleJob(cron="0/5 * * * * ?",jobName="testTask",shardingTotalCount=2,jobParameter="测试参数",shardingItemParameters="0=A,1=B")
public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("TestJob");
    }
}

我们只写了一个Job,不进行配置

我们可以在Restful中进行动态添加

@RestController
public class TestController {

    @Autowired
    private ZookeeperRegistryCenter zookeeperRegistryCenter;


    /**
     * 动态添加任务逻辑
     */
    @GetMapping("/test")
    public void test(@RequestParam("cron") String cron) {
        int shardingTotalCount = 2;
        String jobName = UUID.randomUUID().toString() + "-test123";

        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
                .newBuilder(jobName, cron, shardingTotalCount)
                .shardingItemParameters("0=A,1=B")
                .build();

        SimpleJobConfiguration simpleJobConfiguration =
                new SimpleJobConfiguration(jobCoreConfiguration, TestJob.class.getCanonicalName());
        JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build());



        try {
            jobScheduler.init();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("定时任务创建失败");
        }
    }
}

通过@RequestParam("cron") String cron参数,我们可以动态给该Job添加可变的时间配置。

在zookeeper中我们可以看到他的注册信息

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2188(CONNECTED) 0] ls /
[zookeeper, elastic-job-lite-springboot, spark, hadoop-ha, dubbo, zoo, pushservers, guanjian]
[zk: localhost:2188(CONNECTED) 1] ls /elastic-job-lite-springboot
[com.guanjian.job.TestJob, e5d89fde-a665-46e2-8cac-7560a48812c9-test123, com.cloud.schedule.jobs.MyJob, com.cloud.schedule.jobs.StockSimpleJob, com.guanjian.job.StockSimpleJob]
[zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.

com.guanjian.job.TestJob                 com.cloud.schedule.jobs.MyJob            com.cloud.schedule.jobs.StockSimpleJob   com.guanjian.job.StockSimpleJob
[zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.cloud.schedule.jobs.

com.cloud.schedule.jobs.MyJob            com.cloud.schedule.jobs.StockSimpleJob
[zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.cloud.schedule.jobs.StockSimpleJob
[leader, servers, config, instances, sharding]
[zk: localhost:2188(CONNECTED) 3] 

© 著作权归作者所有

算法之名
粉丝 38
博文 154
码字总数 217608
作品 0
广州
私信 提问
SpringBoot使用Elastic-Job

本文介绍SpringBoot整合Elastic-Job分布式调度任务(简单任务)。 1.有关Elastic-Job Elastic-Job是当当网开源的分布式任务调度解决方案,是业内使用较多的分布式调度解决方案。 这里主要介绍...

dalaoyang
03/05
0
0
分布式作业 Elastic Job 如何动态调整?

前面分享了两篇分布式作业调度框架 Elastic Job 的介绍及应用实战。 ElasticJob-分布式作业调度神器 分布式作业 Elastic Job 快速上手指南! Elastic Job 提供了简单易用的运维平台,方便用...

Java技术栈
2018/08/24
0
0
分布式定时任务框架Elastic-Job的使用

一、前言 Elastic-Job是一个优秀的分布式作业调度框架。 Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。 Elastic-Job-Lite定位为轻...

冷血狂魔
03/06
0
0
分布式定时任务Elastic-Job框架在SpringBoot工程中的应用实践(一)

摘要:如何构建具备作业分片和弹性扩缩容的定时任务系统是每个大型业务系统在设计时需要考虑的重要问题? 对于构建一般的业务系统来说,使用Quartz或者Spring Task即可基本满足我们的单体服用...

癫狂侠
2018/05/12
0
0
Elastic-Job 与 Sharding-JDBC 官网正式上线

当当先后于 2015 年 9 月和 2016 年 1 月开源的 Elastic-Job 和 Sharding-JDBC,经过这一两年的洗礼,它们已经初步获得技术圈的同仁的认可。 两个项目在码云和 GitHub 上的 star 数加起来已超...

亮_ShardingSphere
2017/08/30
3.2K
12

没有更多内容

加载失败,请刷新页面

加载更多

centos 设置hostname

1、centos 6 hostname配置文件为 /etc/sysconfig/network 内容为 NETWORKING=yesHOSTNAME=test_hostname hostname是Linux系统下的一个内核参数,它保存在 /proc/sys/kernel/hostname 下,但......

燃犀
11分钟前
0
0
openSUSE

opensuse aliyun镜像 sudo zypper addrepo -f https://mirrors.aliyun.com/opensuse/distribution/leap/15.0/repo/oss/ aliyun-Oss sudo zypper addrepo -f https://mirrors.aliyun.com/ope......

李有常
16分钟前
0
0
五月天 知足 [結他chord譜]

詞曲: 阿信 Key:E 4/4 Arranged By Kermit Tam Capo 4 > Play C |C   G  |Am  Em  (怎)麼去(擁)有 (一)道彩(虹)  |F   C |F     G (怎)麼去(擁)抱 (一)夏天的(風) |...

阿锋zxf
22分钟前
1
0
OSChina 周二乱弹 —— 你醒啦?现在你已经是丧尸了

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @小鱼丁 :分享全仁权的单曲《걱정말아요 그대 (你不要担心)》: 《걱정말아요 그대 (你不要担心)》- 全仁权 手机党少年们想听歌,请使劲儿戳(...

小小编辑
24分钟前
364
7
再一次生产 CPU 高负载排查实践

前言 前几日早上打开邮箱收到一封监控报警邮件:某某 ip 服务器 CPU 负载较高,请研发尽快排查解决,发送时间正好是凌晨。 其实早在去年我也处理过类似的问题,并记录下来:《一次生产 CPU ...

crossoverJie
39分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部