文档章节

SpringBoot基础教程3-1-3 Quartz定时任务单点持久化

Mkeeper
 Mkeeper
发布于 2018/10/23 08:59
字数 2159
阅读 61
收藏 8

1 概述

实际项目中,复杂的定时任务都会结合持久化,动态改变定时任务状态,本文将介绍基于Quartz的定时任务单点持久化方式,通过RESTful风格,演示定时任务的CRUD,最后使用Swagger测试。

2 数据库表说明

//Quartz表
qrtz_calendars:以 Blob 类型存储 Quartz 的 Calendar 信息
qrtz_cron_triggers:存储 Cron Trigger,包括 Cron 表达式和时区信息
qrtz_fired_triggers:存储与已触发的 Trigger 相关的状态信息,以及相联 Job 的执行信息
qrtz_paused_trigger_grps:存储已暂停的 Trigger 组的信息
qrtz_scheduler_state:存储少量的有关调度器 (Scheduler) 的状态,和别的 调度器 (Scheduler)实例(假如是用于一个集群中)
qrtz_locks:储程序的非观锁的信息(假如使用了悲观锁)
qrtz_job_details:存储每一个已配置的 Job 的详细信息(jobDetail)
qrtz_job_listeners:存储有关已配置的 Job 监听器 的信息
qrtz_simple_triggers:存储简单的 Trigger,包括重复次数,间隔,以及已触的次数
qrtz_blog_triggers:以 Blob 类型存储的Trigger(用于 Quartz 用户用 JDBC 创建他们自己定制的 Trigger 类型,JobStore 并不知道如何存储实例的时候)
qrtz_trigger_listeners:存储已配置的触发器监听器 ( Trigger Listener ) 的信息
qrtz_triggers:存储已配置的 触发器 (Trigger) 的信息

//新建表
ScheduleJob:自定义定时任务详细状态表,方便管理定时任务
  • 建表脚本:/resourecs/quartz.sql

3 添加依赖

<!--quartz相关依赖-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

<!--数据库相关依赖-->
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<scope>runtime</scope>
</dependency>

<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>druid-spring-boot-starter</artifactId>
	<version>1.1.9</version>
</dependency>

<dependency>
	<groupId>org.mybatis.spring.boot</groupId>
	<artifactId>mybatis-spring-boot-starter</artifactId>
	<version>1.3.2</version>
</dependency>

4 添加配置

#数据库连接池配置
spring:
  datasource:
    name: mysql_test
    type: com.alibaba.druid.pool.DruidDataSource
    #druid相关配置
    druid:
      #监控统计拦截的filters
      filters: stat
      driver-class-name: com.mysql.jdbc.Driver
      #基本属性
      url: jdbc:mysql://127.0.0.1:3306/springboot?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true
      username: root
      password: 123456
      #配置初始化大小/最小/最大
      initial-size: 1
      min-idle: 1
      max-active: 20
      #获取连接等待超时时间
      max-wait: 60000
      #间隔多久进行一次检测,检测需要关闭的空闲连接
      time-between-eviction-runs-millis: 60000
      #一个连接在池中最小生存的时间
      min-evictable-idle-time-millis: 300000
      validation-query: SELECT 'x'
      test-while-idle: true
      test-on-borrow: false
      test-on-return: false
      #打开PSCache,并指定每个连接上PSCache的大小。oracle设为true,mysql设为false。分库分表较多推荐设置为false
      pool-prepared-statements: false
      max-pool-prepared-statement-per-connection-size: 20
  #Quartz配置
  quartz:
    jdbc:
      initialize-schema: always
    job-store-type: jdbc

##Mybatis配置
mybatis:
  #Mapper.xml所在的位置
  mapper-locations: classpath:mapping/*.xml
  #entity扫描的包名
  type-aliases-package: com.mkeeper.entity

5 Spring接管Quartz

@Component
public class ScheduleJobFactory extends AdaptableJobFactory {
    // 让不受spring管理的类具有spring自动注入的特性
    @Autowired
    private AutowireCapableBeanFactory autowireCapableBeanFactory;

    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        Object jobInstance = super.createJobInstance(bundle);
        autowireCapableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}

6 Quartz配置SchedulerFactoryBean初始化

@Configuration
public class ScheduleConfig {
    @Autowired
    private ScheduleJobFactory scheduleJobFactory;

    @Bean
    @Qualifier("scheduleBean")
    public SchedulerFactoryBean schedulerFactoryBean(@Qualifier("dataSource") DataSource dataSource) {
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        // 名称
        schedulerFactoryBean.setSchedulerName("TASK_EXECUTOR");
        // 延迟10秒启动Scheduler
        schedulerFactoryBean.setStartupDelay(10);
        // 通过applicationContextSchedulerContextKey属性配置spring上下文
        schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContextKey");
        // 设置是否任意一个已定义的Job会覆盖现有的Job。默认为false,即已定义的Job不会覆盖现有的Job。
        schedulerFactoryBean.setOverwriteExistingJobs(true);
        // 自动开始
        schedulerFactoryBean.setAutoStartup(true);
        // 数据源
        schedulerFactoryBean.setDataSource(dataSource);
        // 将JobFactory改为自定义的,否则在 Job 中注入 Bean 会失败
        schedulerFactoryBean.setJobFactory(scheduleJobFactory);
        return schedulerFactoryBean;
    }
}

7 自定义任务管理

实体

@Data
public class ScheduleJob implements Serializable {

    private static final Long serialVersionUID = 1435515995276255188L;

    private Long id;

    private String className;

    private String cronExpression;

    private String jobName;

    private String jobGroup;

    private String triggerName;

    private String triggerGroup;

    private Boolean pause;

    private Boolean enable;

    private String description;

    private Date createTime;

    private Date lastUpdateTime;

}

为了节约篇幅,mapping,dao省略,请参考源码

9 创建Quartz任务调度工具类(重点)

@Slf4j
public class ScheduleUtil {

    /**
     * 获取 Trigger Key
     *
     * @param scheduleJob
     * @return
     */
    public static TriggerKey getTriggerKey(ScheduleJob scheduleJob) {
        return TriggerKey.triggerKey(scheduleJob.getTriggerName(), scheduleJob.getTriggerGroup());
    }

    /**
     * 获取 Job Key
     *
     * @param scheduleJob
     * @return
     */
    public static JobKey getJobKey(ScheduleJob scheduleJob) {
        return JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
    }

    /**
     * 获取 Cron Trigger
     *
     * @param scheduler
     * @param scheduleJob
     * @return
     * @throws ServiceException
     */
    public static CronTrigger getCronTrigger(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            return (CronTrigger) scheduler.getTrigger(getTriggerKey(scheduleJob));
        } catch (SchedulerException e) {
            throw new ServiceException("Get Cron trigger failed", e);
        }
    }

    /**
     * 创建任务
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void createScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {

        validateCronExpression(scheduleJob);

        try {
            // 要执行的 Job 的类
            Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(scheduleJob.getClassName()).newInstance().getClass();

            JobDetail jobDetail = JobBuilder.newJob(jobClass)
                    .withIdentity(scheduleJob.getJobName(), scheduleJob.getJobGroup())
                    .withDescription(scheduleJob.getDescription())
                    .build();

            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression())
                    .withMisfireHandlingInstructionDoNothing();

            CronTrigger cronTrigger = TriggerBuilder.newTrigger()
                    .withIdentity(scheduleJob.getTriggerName(), scheduleJob.getTriggerGroup())
                    .withDescription(scheduleJob.getDescription())
                    .withSchedule(scheduleBuilder)
                    .startNow()
                    .build();

            scheduler.scheduleJob(jobDetail, cronTrigger);

            log.info("Create schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());

            if (scheduleJob.getPause()) {
                pauseJob(scheduler, scheduleJob);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("Execute schedule job failed");
            throw new ServiceException("Execute schedule job failed", e);
        }
    }

    /**
     * 更新任务
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void updateScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {

        validateCronExpression(scheduleJob);

        try {

            TriggerKey triggerKey = getTriggerKey(scheduleJob);

            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression())
                    .withMisfireHandlingInstructionDoNothing();

            CronTrigger cronTrigger = getCronTrigger(scheduler, scheduleJob);

            cronTrigger = cronTrigger.getTriggerBuilder()
                    .withIdentity(triggerKey)
                    .withDescription(scheduleJob.getDescription())
                    .withSchedule(cronScheduleBuilder).build();

            scheduler.rescheduleJob(triggerKey, cronTrigger);

            log.info("Update schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());

            if (scheduleJob.getPause()) {
                pauseJob(scheduler, scheduleJob);
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Update schedule job failed");
            throw new ServiceException("Update schedule job failed", e);
        }
    }

    /**
     * 执行任务
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void run(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            scheduler.triggerJob(getJobKey(scheduleJob));
            log.info("Run schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Run schedule job failed");
            throw new ServiceException("Run schedule job failed", e);
        }
    }

    /**
     * 暂停任务
     *
     * @param scheduler
     * @param scheduleJob
     */
    public static void pauseJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            scheduler.pauseJob(getJobKey(scheduleJob));
            log.info("Pause schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Pause schedule job failed");
            throw new ServiceException("Pause job failed", e);
        }
    }

    /**
     * 继续执行任务
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void resumeJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            scheduler.resumeJob(getJobKey(scheduleJob));
            log.info("Resume schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Resume schedule job failed");
            throw new ServiceException("Resume job failed", e);
        }
    }

    /**
     * 删除任务
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void deleteJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            scheduler.deleteJob(getJobKey(scheduleJob));
            log.info("Delete schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Delete schedule job failed");
            throw new ServiceException("Delete job failed", e);
        }
    }

    /**
     * 校验Cron表达式
     */
    public static void validateCronExpression(ScheduleJob scheduleJob) throws ServiceException {
        if (!CronExpression.isValidExpression(scheduleJob.getCronExpression())) {
            throw new ServiceException(String.format("Job %s expression %s is not correct!", scheduleJob.getClassName(), scheduleJob.getCronExpression()));
        }
    }
}

10 创建定时任务服务类

@Service
public class JobService {

    @Resource
    private JobMapper jobMapper;

    @Resource
    private Scheduler scheduler;

    public List<ScheduleJob> getAllEnableJob() {
        return jobMapper.getAllEnableJob();
    }

    public ScheduleJob select(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = jobMapper.select(jobId);
        if (scheduleJob == null) {
            throw new ServiceException("ScheduleJob:" + jobId + " not found");
        }
        return scheduleJob;
    }

    @Transactional(rollbackFor = DataAccessException.class)
    public ScheduleJob update(Long jobId, ScheduleJob scheduleJob) throws ServiceException {

        if (jobMapper.update(scheduleJob) <= 0) {
            throw new ServiceException("Update product:" + jobId + "failed");
        }

        ScheduleUtil.updateScheduleJob(scheduler, scheduleJob);

        return scheduleJob;
    }

    @Transactional(rollbackFor = DataAccessException.class)
    public boolean add(ScheduleJob scheduleJob) throws ServiceException {
        Integer num = jobMapper.insert(scheduleJob);
        if (num <= 0) {
            throw new ServiceException("Add product failed");
        }

        ScheduleUtil.createScheduleJob(scheduler, scheduleJob);

        return true;
    }

    @Transactional(rollbackFor = DataAccessException.class)
    public boolean delete(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = select(jobId);

        Integer num = jobMapper.delete(jobId);
        if (num <= 0) {
            throw new ServiceException("Delete product:" + jobId + "failed");
        }

        ScheduleUtil.deleteJob(scheduler, scheduleJob);

        return true;
    }

    public List<ScheduleJob> getAllJob() {
        return jobMapper.getAllJob();
    }

    public boolean resume(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, false);
        ScheduleUtil.resumeJob(scheduler, scheduleJob);
        return true;
    }

    public boolean pause(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, true);
        ScheduleUtil.pauseJob(scheduler, scheduleJob);
        return true;
    }

    public boolean run(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, false);
        ScheduleUtil.run(scheduler, scheduleJob);
        return true;
    }

    private ScheduleJob updateScheduleJobStatus(Long jobId, Boolean isPause) throws ServiceException {
        ScheduleJob scheduleJob = select(jobId);
        scheduleJob.setPause(isPause);
        update(scheduleJob.getId(), scheduleJob);
        return scheduleJob;
    }
}

11 创建应用启动监听类

/**
 *     启动应用时运行定时任务
 *
 * @author mkeeper
 * @create 2018/10/19 10:05
 */
@Slf4j
@Component
public class ApplicationListener implements CommandLineRunner {

    @Resource
    private JobService jobService;

    @Resource
    private Scheduler scheduler;

    @Override
    public void run(String... args) {
        List<ScheduleJob> scheduleJobList = jobService.getAllEnableJob();
        for (ScheduleJob scheduleJob : scheduleJobList) {
            try {
                CronTrigger cronTrigger = ScheduleUtil.getCronTrigger(scheduler, scheduleJob);
                if (cronTrigger == null) {
                    ScheduleUtil.createScheduleJob(scheduler, scheduleJob);
                } else {
                    ScheduleUtil.updateScheduleJob(scheduler, scheduleJob);
                }
                log.info("Startup {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
            } catch (ServiceException e) {
                log.error("Job ERROR", e);
            }
        }
    }
}

12 新建任务

@Slf4j
@Component
public class TestJob implements Job {

    @Override
    public void execute(JobExecutionContext jobExecutionContext){
        // Do what you want here
        log.info("Test job is executing at: " + 		  	 System.currentTimeMillis()/1000);
    }
}

13 Controller

@RestController
@RequestMapping("/job")
public class JobController {

    @Autowired
    private JobService jobService;

    @GetMapping
    public R getAllJob() {
        return R.isOk().data(jobService.getAllJob());
    }

    @GetMapping("/{id}")
    public R getJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.select(jobId));
    }

    @PutMapping("/update/{id}")
    public R updateJob(@PathVariable("id") Long jobId, @RequestBody ScheduleJob newScheduleJob) throws ServiceException {
        return R.isOk().data(jobService.update(jobId, newScheduleJob));
    }

    @DeleteMapping("/delete/{id}")
    public R deleteJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.delete(jobId));
    }

    @PostMapping("/add")
    public R saveJob(@RequestBody ScheduleJob newScheduleJob) throws ServiceException {
        return R.isOk().data(jobService.add(newScheduleJob));
    }


    @GetMapping("/run/{id}")
    public R runJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.run(jobId));
    }


    @GetMapping("/pause/{id}")
    public R pauseJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.pause(jobId));
    }

    @GetMapping("/resume/{id}")
    public R resumeJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.resume(jobId));
    }
}

14 测试结果

考虑到要测试的接口很多,这里推荐Swagger Swagger是一个规范和完整的框架,用于生成、描述、调用和可视化RESTful风格的Web服务 添加依赖

<!--swagger2-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.6.1</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.6.1</version>
        </dependency>

添加配置文件

@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Value("${swagger.enable:false}")
    private boolean enable;

    @Bean
    public Docket createRestApi() {
        return new Docket(DocumentationType.SWAGGER_2)
                .enable(enable)
                .apiInfo(apiInfo())
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.mkeeper.controller"))
                .paths(PathSelectors.any())
                .build();
    }
    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                .title("Quartz定时任务单点持久化接口文档")
                .description("Quartz定时任务单点持久化")
                .version("1.0")
                .build();
    }

}

application.yml中开启Swagger

swagger:
  enable: true

启动服务,浏览器中访问测试接口,地址:http://localhost:8080/swagger-ui.htm

15 工程目录

16 结束语

说点什么呢,有任何建议,欢迎留言探讨,本文源码


欢迎关注博主公众号,第一时间推送最新文章

欢迎关注博主公众号

© 著作权归作者所有

共有 人打赏支持
Mkeeper
粉丝 19
博文 30
码字总数 22957
作品 0
西城
架构师
私信 提问
恒宇少年/spring-boot-chapter

简书整套文档以及源码解析 专题 专题名称 专题描述 001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件 002 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解 003 Quer...

恒宇少年
2018/04/19
0
0
springboot 整合quartz (一):基础使用

---layout: blogistop: truetitle: "springboot 整合quartz (一):基础使用"date: 2018-10-09category: springboottags: springboot quartz --- 介绍 Quartz是一个开源的框架,它完全由写成。......

开心的哈士奇
2018/10/11
0
0
springboot 整合quartz (三):jobStores

---layout: blogistop: truetitle: "springboot 整合quartz (三):jobStores"date: 2018-10-09category: springboottags: springboot quartz --- JobStore介绍 JobStore是负责跟踪调度器中所有......

开心的哈士奇
2018/10/17
0
0
11、SpringBoot------定时任务

开发工具:STS 代码下载链接:https://github.com/theIndoorTrain/Springboot/tree/52ef6c0c805913db1e66ed18671c322e284233f0 前言: 之前我们有讲过Quartz任务调度。 现在,我们来讲解下S...

零度微笑
2018/07/16
0
0
springboot中如何做一个开关,启用和停用quartz

使用Spring Boot 2.x 做了一个后台管理系统,集成了Quartz功能,但有些工程需要用到定时任务,有些工程不会用到定时任务,有没有办法做一个设置,能够在用不到定时任务的工程中停用quartz,这...

罗树鹏
2018/11/05
536
6

没有更多内容

加载失败,请刷新页面

加载更多

乱入Linux界的我是如何学习的

欢迎来到建哥学Linux,咳!咳!咳!开个玩笑哈,我是一个IT男,IT界的入门选手,正在学习Linux。 在之前,一直想进军IT界,学习IT技术,但是苦于没有人指导,也不知道学什么,最开始我自己在...

linuxprobe16
39分钟前
4
0
OSChina 周日乱弹 —— 没时间 没头发 但有钱

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @开源中国总经办主任 :分享齐一的单曲《这个年纪》 《这个年纪》- 齐一 手机党少年们想听歌,请使劲儿戳(这里) @肿肿卷 :我真的可以睡一天...

小小编辑
今天
51
4
Django进阶 1.1 ORM基础—ORM 1.2.1 增删改查之查询 1.2.2 删改增 (1) 1.2.3 删改增 (2)

ORM基础 ORM是Django操作数据库的API,Django的作者将sql语句封装在里面供我们使用。 我们前面还提到过Django提供一个模拟数据库的工具,sqlite,供我们学习测试使用。 如果我们想使用mysql...

隐匿的蚂蚁
今天
3
0
Windows 上安装 Scala

在安装 Scala 之前需要先安装 Java 环境,具体安装的详细方法就不在这里描述了。 您可以自行搜索我们网站中的内容获得其他网站的帮助来获得如何安装 Java 环境的方法。 接下来,我们可以从 ...

honeymose
今天
3
0
数据库篇多表操作

第1章 多表操作 实际开发中,一个项目通常需要很多张表才能完成。例如:一个商城项目就需要分类表(category)、商品表(products)、订单表(orders)等多张表。且这些表的数据之间存在一定的关系...

stars永恒
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部