文档章节

Spring Boot Quartz 分布式集群任务调度实现

编辑之路
 编辑之路
发布于 06/21 17:50
字数 1434
阅读 137
收藏 7

主要内容

  • Spring Scheduler 框架
  • Quartz 框架,功能强大,配置灵活
  • Quartz 集群
  • mysql 持久化定时任务脚本(tables_mysql.sql)

介绍

在工程中时常会遇到一些需求,例如定时刷新一下配置、隔一段时间检查下网络状态并发送邮件等诸如此类的定时任务。
定时任务本质就是一个异步的线程,线程可以查询或修改并执行一系列的操作。由于本质是线程,在 Java 中可以自行编写一个线程池对定时任务进行控制,但这样效率太低了,且功能有限,属于重复造轮子。

分布式任务调度应用场景

Quartz的集群功能通过故障转移和负载平衡功能为您的调度程序带来高可用性和可扩展性。

调度程序中会有很多定时任务需要执行,一台服务器已经不能满足使用,需要解决定时任务单机单点故障问题。

用Quartz框架,在集群环境下,通过数据库锁机制来实现定时任务的执行;独立的 Quartz 节点并不与另一其的节点或是管理节点通信。

Spring Scheduler 实现定时任务

1.定义 Task 类

/**
 * Spring Scheduled示例
 */
@Component
public class ScheduledTask {

    private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private Integer count0 = 1;
    private Integer count1 = 1;
    private Integer count2 = 1;

    @Scheduled(fixedRate = 5000)
    public void reportCurrentTime() throws InterruptedException {
        System.out.println(String.format("reportCurrentTime第%s次执行,当前时间为:%s", count0++, dateFormat.format(new Date())));
    }

    @Scheduled(fixedDelay = 5000)
    public void reportCurrentTimeAfterSleep() throws InterruptedException {
        System.out.println(String.format("reportCurrentTimeAfterSleep第%s次执行,当前时间为:%s", count1++, dateFormat.format(new Date())));
    }

    @Scheduled(cron = "0 0 1 * * *")
    public void reportCurrentTimeCron() throws InterruptedException {
        System.out.println(String.format("reportCurrentTimeCron第%s次执行,当前时间为:%s", count2++, dateFormat.format(new Date())));
    }
}

2.启动定时任务

在Spring Boot的主类中加入@EnableScheduling注解,启用定时任务的配置

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
@EnableScheduling
public class ScheduledTaskTests {
    @Test
    public void test() {
        log.info("启动了ScheduledTask定时作业");
        while (true) {
        }
    }
}

quartz实现分布式定时任务

quartz 是一个开源的分布式调度库,它基于java实现。
> 它有着强大的调度功能,支持丰富多样的调度方式,比如简单调度,基于cron表达式的调度等等。
> 支持调度任务的多种持久化方式。比如支持内存存储,数据库存储,Terracotta server 存储。
> 支持分布式和集群能力。
> 采用JDBCJobStore方式存储时,针对事务的处理方式支持全局事务(和业务服务共享同一个事务)和局部事务(quarzt 单独管理自己的事务)
> 基于plugin机制以及listener机制支持灵活的扩展。

1.pom.xml配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

<!-- mysql -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>

<!-- orm -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

2.spring-quartz.properties集群配置

#============================================================================
# 配置JobStore
#============================================================================
# JobDataMaps是否都为String类型,默认false
org.quartz.jobStore.useProperties=false

# 表的前缀,默认QRTZ_
org.quartz.jobStore.tablePrefix = QRTZ_

# 是否加入集群
org.quartz.jobStore.isClustered = true

# 调度实例失效的检查时间间隔 ms
org.quartz.jobStore.clusterCheckinInterval = 5000

# 当设置为“true”时,此属性告诉Quartz 在非托管JDBC连接上调用setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED)。
org.quartz.jobStore.txIsolationLevelReadCommitted = true

# 数据保存方式为数据库持久化
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

# 数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以满足大部分数据库
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate

#============================================================================
# Scheduler 调度器属性配置
#============================================================================
# 调度标识名 集群中每一个实例都必须使用相同的名称
org.quartz.scheduler.instanceName = ClusterQuartz
# ID设置为自动获取 每一个必须不同
org.quartz.scheduler.instanceId= AUTO

#============================================================================
# 配置ThreadPool
#============================================================================
# 线程池的实现类(一般使用SimpleThreadPool即可满足几乎所有用户的需求)
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool

# 指定线程数,一般设置为1-100直接的整数,根据系统资源配置
org.quartz.threadPool.threadCount = 5

# 设置线程的优先级(可以是Thread.MIN_PRIORITY(即1)和Thread.MAX_PRIORITY(这是10)之间的任何int 。默认值为Thread.NORM_PRIORITY(5)。)
org.quartz.threadPool.threadPriority = 5

3.定义两个job

  • QuartzJob.java
//持久化
@PersistJobDataAfterExecution
//禁止并发执行(Quartz不要并发地执行同一个job定义(这里指一个job类的多个实例))
@DisallowConcurrentExecution
@Slf4j
public class QuartzJob extends QuartzJobBean {
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        String taskName = context.getJobDetail().getJobDataMap().getString("name");
        log.info("---> Quartz job {}, {} <----", new Date(), taskName);
    }
}
  • QuartzJob2.java
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Slf4j
public class QuartzJob2 extends QuartzJobBean {

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        String taskName = context.getJobDetail().getJobDataMap().getString("name");
        log.info("---> Quartz job 2 {}, {} <----", new Date(), taskName);
    }
}

4.初始化触发器等信息,这里通过Listener初始化

@Slf4j
public class StartApplicationListener implements ApplicationListener<ContextRefreshedEvent> {
    @Autowired
    SchedulerConfig schedulerConfig;
    public static AtomicInteger count = new AtomicInteger(0);
    private static String TRIGGER_GROUP_NAME = "test_trriger";
    private static String JOB_GROUP_NAME = "test_job";

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 防止重复执行
        if (event.getApplicationContext().getParent() == null && count.incrementAndGet() <= 1) {
            initMyJob();
        }
    }

    public void initMyJob() {
        Scheduler scheduler = null;
        try {
            scheduler = schedulerConfig.scheduler();

            TriggerKey triggerKey = TriggerKey.triggerKey("trigger1", TRIGGER_GROUP_NAME);
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            if (null == trigger) {
                Class clazz = QuartzJob.class;
                JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity("job1", JOB_GROUP_NAME).build();
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
                trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", TRIGGER_GROUP_NAME)
                        .withSchedule(scheduleBuilder).build();
                scheduler.scheduleJob(jobDetail, trigger);
                log.info("Quartz 创建了job:...:{}", jobDetail.getKey());
            } else {
                log.info("job已存在:{}", trigger.getKey());
            }

            TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2", TRIGGER_GROUP_NAME);
            CronTrigger trigger2 = (CronTrigger) scheduler.getTrigger(triggerKey2);
            if (null == trigger2) {
                Class clazz = QuartzJob2.class;
                JobDetail jobDetail2 = JobBuilder.newJob(clazz).withIdentity("job2", JOB_GROUP_NAME).build();
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/15 * * * * ?");
                trigger2 = TriggerBuilder.newTrigger().withIdentity("trigger2", TRIGGER_GROUP_NAME)
                        .withSchedule(scheduleBuilder).build();
                scheduler.scheduleJob(jobDetail2, trigger2);
                log.info("Quartz 创建了job:...:{}", jobDetail2.getKey());
            } else {
                log.info("job已存在:{}", trigger2.getKey());
            }
            scheduler.start();
        } catch (Exception e) {
            log.info(e.getMessage());
        }
    }
}

5.启动定时器

启动两个Application,分别是示例中的DemoQuartzApplication和DemoQuartzApplication2,会发现,两个Job会分别在两个应用执行。

当手动停止一个应用的时候,另一个应用会自动接管所有任务并继续执行,如果任务太多,我们可以再开一台服务即可。实现了调度任务的高可用性和可扩展性

运行效果如图:
执行结果

© 著作权归作者所有

编辑之路
粉丝 51
博文 50
码字总数 109912
作品 0
长沙
私信 提问
Uncode-Schedule 1.0.3 发布,分布式任务调度组件

Uncode-Schedule分布式任务调度组件,非常小巧,无需任何修改就可以使quartz/spring task具备分布式特性,确保所有任务在集群中不重复,不遗漏的执行。 Uncode-Schedule 1.0.3 发布,本次版本...

冶卫军
2017/07/02
838
0
Quartz的简单易懂定时任务实现

作为一个优秀的开源调度框架,Quartz 具有以下特点: 强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求; 灵活的应用方式,例如支持任务和调度的多种组合方式,支持...

begrateful
2018/09/18
0
0
分布式定时任务框架---Uncode Schedule

分布式定时任务框架---Uncode Schedule rabbitGYK 关注 2016.11.27 20:36* 字数 1446 阅读 7141评论 5喜欢 36赞赏 1 博客原文 作为一个支付公司的项目组,经常会有很多对账功能(签约对账、支...

晨猫
2018/11/02
130
0
两个分布式batch调度方案的比较

现在有两个方案,目的都是想提供一个分布式的任务调度执行框架. 背景:若干业务应用,均独立部署,需要做集群. 方案A: 一个调度中心(基于quartz),负责维护和调度job.某个job到点执行时,发送消息...

ice_stream
2015/09/02
3.1K
1
实战分布式任务调度框架Elastic Job课程

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。 Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的...

尹吉欢
2018/01/13
0
0

没有更多内容

加载失败,请刷新页面

加载更多

计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
6
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
6
0
【技术分享】TestFlight测试的流程文档

上架基本需求资料 1、苹果开发者账号(如还没账号先申请-苹果开发者账号申请教程) 2、开发好的APP 通过本篇教程,可以学习到ios证书申请和打包ipa上传到appstoreconnect.apple.com进行TestF...

qtb999
昨天
10
0
再见 Spring Boot 1.X,Spring Boot 2.X 走向舞台中心

2019年8月6日,Spring 官方在其博客宣布,Spring Boot 1.x 停止维护,Spring Boot 1.x 生命周期正式结束。 其实早在2018年7月30号,Spring 官方就已经在博客进行过预告,Spring Boot 1.X 将维...

Java技术剑
昨天
18
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部