文档章节

SpringBoot基础教程3-1-3 Quartz定时任务单点持久化

Mkeeper
 Mkeeper
发布于 10/23 08:59
字数 2159
阅读 29
收藏 8

1 概述

实际项目中,复杂的定时任务都会结合持久化,动态改变定时任务状态,本文将介绍基于Quartz的定时任务单点持久化方式,通过RESTful风格,演示定时任务的CRUD,最后使用Swagger测试。

2 数据库表说明

//Quartz表
qrtz_calendars:以 Blob 类型存储 Quartz 的 Calendar 信息
qrtz_cron_triggers:存储 Cron Trigger,包括 Cron 表达式和时区信息
qrtz_fired_triggers:存储与已触发的 Trigger 相关的状态信息,以及相联 Job 的执行信息
qrtz_paused_trigger_grps:存储已暂停的 Trigger 组的信息
qrtz_scheduler_state:存储少量的有关调度器 (Scheduler) 的状态,和别的 调度器 (Scheduler)实例(假如是用于一个集群中)
qrtz_locks:储程序的非观锁的信息(假如使用了悲观锁)
qrtz_job_details:存储每一个已配置的 Job 的详细信息(jobDetail)
qrtz_job_listeners:存储有关已配置的 Job 监听器 的信息
qrtz_simple_triggers:存储简单的 Trigger,包括重复次数,间隔,以及已触的次数
qrtz_blog_triggers:以 Blob 类型存储的Trigger(用于 Quartz 用户用 JDBC 创建他们自己定制的 Trigger 类型,JobStore 并不知道如何存储实例的时候)
qrtz_trigger_listeners:存储已配置的触发器监听器 ( Trigger Listener ) 的信息
qrtz_triggers:存储已配置的 触发器 (Trigger) 的信息

//新建表
ScheduleJob:自定义定时任务详细状态表,方便管理定时任务
  • 建表脚本:/resourecs/quartz.sql

3 添加依赖

<!--quartz相关依赖-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

<!--数据库相关依赖-->
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<scope>runtime</scope>
</dependency>

<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>druid-spring-boot-starter</artifactId>
	<version>1.1.9</version>
</dependency>

<dependency>
	<groupId>org.mybatis.spring.boot</groupId>
	<artifactId>mybatis-spring-boot-starter</artifactId>
	<version>1.3.2</version>
</dependency>

4 添加配置

#数据库连接池配置
spring:
  datasource:
    name: mysql_test
    type: com.alibaba.druid.pool.DruidDataSource
    #druid相关配置
    druid:
      #监控统计拦截的filters
      filters: stat
      driver-class-name: com.mysql.jdbc.Driver
      #基本属性
      url: jdbc:mysql://127.0.0.1:3306/springboot?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true
      username: root
      password: 123456
      #配置初始化大小/最小/最大
      initial-size: 1
      min-idle: 1
      max-active: 20
      #获取连接等待超时时间
      max-wait: 60000
      #间隔多久进行一次检测,检测需要关闭的空闲连接
      time-between-eviction-runs-millis: 60000
      #一个连接在池中最小生存的时间
      min-evictable-idle-time-millis: 300000
      validation-query: SELECT 'x'
      test-while-idle: true
      test-on-borrow: false
      test-on-return: false
      #打开PSCache,并指定每个连接上PSCache的大小。oracle设为true,mysql设为false。分库分表较多推荐设置为false
      pool-prepared-statements: false
      max-pool-prepared-statement-per-connection-size: 20
  #Quartz配置
  quartz:
    jdbc:
      initialize-schema: always
    job-store-type: jdbc

##Mybatis配置
mybatis:
  #Mapper.xml所在的位置
  mapper-locations: classpath:mapping/*.xml
  #entity扫描的包名
  type-aliases-package: com.mkeeper.entity

5 Spring接管Quartz

@Component
public class ScheduleJobFactory extends AdaptableJobFactory {
    // 让不受spring管理的类具有spring自动注入的特性
    @Autowired
    private AutowireCapableBeanFactory autowireCapableBeanFactory;

    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        Object jobInstance = super.createJobInstance(bundle);
        autowireCapableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}

6 Quartz配置SchedulerFactoryBean初始化

@Configuration
public class ScheduleConfig {
    @Autowired
    private ScheduleJobFactory scheduleJobFactory;

    @Bean
    @Qualifier("scheduleBean")
    public SchedulerFactoryBean schedulerFactoryBean(@Qualifier("dataSource") DataSource dataSource) {
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        // 名称
        schedulerFactoryBean.setSchedulerName("TASK_EXECUTOR");
        // 延迟10秒启动Scheduler
        schedulerFactoryBean.setStartupDelay(10);
        // 通过applicationContextSchedulerContextKey属性配置spring上下文
        schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContextKey");
        // 设置是否任意一个已定义的Job会覆盖现有的Job。默认为false,即已定义的Job不会覆盖现有的Job。
        schedulerFactoryBean.setOverwriteExistingJobs(true);
        // 自动开始
        schedulerFactoryBean.setAutoStartup(true);
        // 数据源
        schedulerFactoryBean.setDataSource(dataSource);
        // 将JobFactory改为自定义的,否则在 Job 中注入 Bean 会失败
        schedulerFactoryBean.setJobFactory(scheduleJobFactory);
        return schedulerFactoryBean;
    }
}

7 自定义任务管理

实体

@Data
public class ScheduleJob implements Serializable {

    private static final Long serialVersionUID = 1435515995276255188L;

    private Long id;

    private String className;

    private String cronExpression;

    private String jobName;

    private String jobGroup;

    private String triggerName;

    private String triggerGroup;

    private Boolean pause;

    private Boolean enable;

    private String description;

    private Date createTime;

    private Date lastUpdateTime;

}

为了节约篇幅,mapping,dao省略,请参考源码

9 创建Quartz任务调度工具类(重点)

@Slf4j
public class ScheduleUtil {

    /**
     * 获取 Trigger Key
     *
     * @param scheduleJob
     * @return
     */
    public static TriggerKey getTriggerKey(ScheduleJob scheduleJob) {
        return TriggerKey.triggerKey(scheduleJob.getTriggerName(), scheduleJob.getTriggerGroup());
    }

    /**
     * 获取 Job Key
     *
     * @param scheduleJob
     * @return
     */
    public static JobKey getJobKey(ScheduleJob scheduleJob) {
        return JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
    }

    /**
     * 获取 Cron Trigger
     *
     * @param scheduler
     * @param scheduleJob
     * @return
     * @throws ServiceException
     */
    public static CronTrigger getCronTrigger(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            return (CronTrigger) scheduler.getTrigger(getTriggerKey(scheduleJob));
        } catch (SchedulerException e) {
            throw new ServiceException("Get Cron trigger failed", e);
        }
    }

    /**
     * 创建任务
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void createScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {

        validateCronExpression(scheduleJob);

        try {
            // 要执行的 Job 的类
            Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(scheduleJob.getClassName()).newInstance().getClass();

            JobDetail jobDetail = JobBuilder.newJob(jobClass)
                    .withIdentity(scheduleJob.getJobName(), scheduleJob.getJobGroup())
                    .withDescription(scheduleJob.getDescription())
                    .build();

            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression())
                    .withMisfireHandlingInstructionDoNothing();

            CronTrigger cronTrigger = TriggerBuilder.newTrigger()
                    .withIdentity(scheduleJob.getTriggerName(), scheduleJob.getTriggerGroup())
                    .withDescription(scheduleJob.getDescription())
                    .withSchedule(scheduleBuilder)
                    .startNow()
                    .build();

            scheduler.scheduleJob(jobDetail, cronTrigger);

            log.info("Create schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());

            if (scheduleJob.getPause()) {
                pauseJob(scheduler, scheduleJob);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("Execute schedule job failed");
            throw new ServiceException("Execute schedule job failed", e);
        }
    }

    /**
     * 更新任务
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void updateScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {

        validateCronExpression(scheduleJob);

        try {

            TriggerKey triggerKey = getTriggerKey(scheduleJob);

            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression())
                    .withMisfireHandlingInstructionDoNothing();

            CronTrigger cronTrigger = getCronTrigger(scheduler, scheduleJob);

            cronTrigger = cronTrigger.getTriggerBuilder()
                    .withIdentity(triggerKey)
                    .withDescription(scheduleJob.getDescription())
                    .withSchedule(cronScheduleBuilder).build();

            scheduler.rescheduleJob(triggerKey, cronTrigger);

            log.info("Update schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());

            if (scheduleJob.getPause()) {
                pauseJob(scheduler, scheduleJob);
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Update schedule job failed");
            throw new ServiceException("Update schedule job failed", e);
        }
    }

    /**
     * 执行任务
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void run(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            scheduler.triggerJob(getJobKey(scheduleJob));
            log.info("Run schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Run schedule job failed");
            throw new ServiceException("Run schedule job failed", e);
        }
    }

    /**
     * 暂停任务
     *
     * @param scheduler
     * @param scheduleJob
     */
    public static void pauseJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            scheduler.pauseJob(getJobKey(scheduleJob));
            log.info("Pause schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Pause schedule job failed");
            throw new ServiceException("Pause job failed", e);
        }
    }

    /**
     * 继续执行任务
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void resumeJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            scheduler.resumeJob(getJobKey(scheduleJob));
            log.info("Resume schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Resume schedule job failed");
            throw new ServiceException("Resume job failed", e);
        }
    }

    /**
     * 删除任务
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void deleteJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            scheduler.deleteJob(getJobKey(scheduleJob));
            log.info("Delete schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Delete schedule job failed");
            throw new ServiceException("Delete job failed", e);
        }
    }

    /**
     * 校验Cron表达式
     */
    public static void validateCronExpression(ScheduleJob scheduleJob) throws ServiceException {
        if (!CronExpression.isValidExpression(scheduleJob.getCronExpression())) {
            throw new ServiceException(String.format("Job %s expression %s is not correct!", scheduleJob.getClassName(), scheduleJob.getCronExpression()));
        }
    }
}

10 创建定时任务服务类

@Service
public class JobService {

    @Resource
    private JobMapper jobMapper;

    @Resource
    private Scheduler scheduler;

    public List<ScheduleJob> getAllEnableJob() {
        return jobMapper.getAllEnableJob();
    }

    public ScheduleJob select(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = jobMapper.select(jobId);
        if (scheduleJob == null) {
            throw new ServiceException("ScheduleJob:" + jobId + " not found");
        }
        return scheduleJob;
    }

    @Transactional(rollbackFor = DataAccessException.class)
    public ScheduleJob update(Long jobId, ScheduleJob scheduleJob) throws ServiceException {

        if (jobMapper.update(scheduleJob) <= 0) {
            throw new ServiceException("Update product:" + jobId + "failed");
        }

        ScheduleUtil.updateScheduleJob(scheduler, scheduleJob);

        return scheduleJob;
    }

    @Transactional(rollbackFor = DataAccessException.class)
    public boolean add(ScheduleJob scheduleJob) throws ServiceException {
        Integer num = jobMapper.insert(scheduleJob);
        if (num <= 0) {
            throw new ServiceException("Add product failed");
        }

        ScheduleUtil.createScheduleJob(scheduler, scheduleJob);

        return true;
    }

    @Transactional(rollbackFor = DataAccessException.class)
    public boolean delete(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = select(jobId);

        Integer num = jobMapper.delete(jobId);
        if (num <= 0) {
            throw new ServiceException("Delete product:" + jobId + "failed");
        }

        ScheduleUtil.deleteJob(scheduler, scheduleJob);

        return true;
    }

    public List<ScheduleJob> getAllJob() {
        return jobMapper.getAllJob();
    }

    public boolean resume(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, false);
        ScheduleUtil.resumeJob(scheduler, scheduleJob);
        return true;
    }

    public boolean pause(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, true);
        ScheduleUtil.pauseJob(scheduler, scheduleJob);
        return true;
    }

    public boolean run(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, false);
        ScheduleUtil.run(scheduler, scheduleJob);
        return true;
    }

    private ScheduleJob updateScheduleJobStatus(Long jobId, Boolean isPause) throws ServiceException {
        ScheduleJob scheduleJob = select(jobId);
        scheduleJob.setPause(isPause);
        update(scheduleJob.getId(), scheduleJob);
        return scheduleJob;
    }
}

11 创建应用启动监听类

/**
 *     启动应用时运行定时任务
 *
 * @author mkeeper
 * @create 2018/10/19 10:05
 */
@Slf4j
@Component
public class ApplicationListener implements CommandLineRunner {

    @Resource
    private JobService jobService;

    @Resource
    private Scheduler scheduler;

    @Override
    public void run(String... args) {
        List<ScheduleJob> scheduleJobList = jobService.getAllEnableJob();
        for (ScheduleJob scheduleJob : scheduleJobList) {
            try {
                CronTrigger cronTrigger = ScheduleUtil.getCronTrigger(scheduler, scheduleJob);
                if (cronTrigger == null) {
                    ScheduleUtil.createScheduleJob(scheduler, scheduleJob);
                } else {
                    ScheduleUtil.updateScheduleJob(scheduler, scheduleJob);
                }
                log.info("Startup {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
            } catch (ServiceException e) {
                log.error("Job ERROR", e);
            }
        }
    }
}

12 新建任务

@Slf4j
@Component
public class TestJob implements Job {

    @Override
    public void execute(JobExecutionContext jobExecutionContext){
        // Do what you want here
        log.info("Test job is executing at: " + 		  	 System.currentTimeMillis()/1000);
    }
}

13 Controller

@RestController
@RequestMapping("/job")
public class JobController {

    @Autowired
    private JobService jobService;

    @GetMapping
    public R getAllJob() {
        return R.isOk().data(jobService.getAllJob());
    }

    @GetMapping("/{id}")
    public R getJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.select(jobId));
    }

    @PutMapping("/update/{id}")
    public R updateJob(@PathVariable("id") Long jobId, @RequestBody ScheduleJob newScheduleJob) throws ServiceException {
        return R.isOk().data(jobService.update(jobId, newScheduleJob));
    }

    @DeleteMapping("/delete/{id}")
    public R deleteJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.delete(jobId));
    }

    @PostMapping("/add")
    public R saveJob(@RequestBody ScheduleJob newScheduleJob) throws ServiceException {
        return R.isOk().data(jobService.add(newScheduleJob));
    }


    @GetMapping("/run/{id}")
    public R runJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.run(jobId));
    }


    @GetMapping("/pause/{id}")
    public R pauseJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.pause(jobId));
    }

    @GetMapping("/resume/{id}")
    public R resumeJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.resume(jobId));
    }
}

14 测试结果

考虑到要测试的接口很多,这里推荐Swagger Swagger是一个规范和完整的框架,用于生成、描述、调用和可视化RESTful风格的Web服务 添加依赖

<!--swagger2-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.6.1</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.6.1</version>
        </dependency>

添加配置文件

@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Value("${swagger.enable:false}")
    private boolean enable;

    @Bean
    public Docket createRestApi() {
        return new Docket(DocumentationType.SWAGGER_2)
                .enable(enable)
                .apiInfo(apiInfo())
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.mkeeper.controller"))
                .paths(PathSelectors.any())
                .build();
    }
    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                .title("Quartz定时任务单点持久化接口文档")
                .description("Quartz定时任务单点持久化")
                .version("1.0")
                .build();
    }

}

application.yml中开启Swagger

swagger:
  enable: true

启动服务,浏览器中访问测试接口,地址:http://localhost:8080/swagger-ui.htm

15 工程目录

16 结束语

说点什么呢,有任何建议,欢迎留言探讨,本文源码


欢迎关注博主公众号,第一时间推送最新文章

欢迎关注博主公众号

© 著作权归作者所有

共有 人打赏支持
Mkeeper
粉丝 17
博文 27
码字总数 19929
作品 0
西城
架构师
私信 提问
恒宇少年/spring-boot-chapter

简书整套文档以及源码解析 专题 专题名称 专题描述 001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件 002 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解 003 Quer...

恒宇少年
04/19
0
0
springboot 整合quartz (一):基础使用

---layout: blogistop: truetitle: "springboot 整合quartz (一):基础使用"date: 2018-10-09category: springboottags: springboot quartz --- 介绍 Quartz是一个开源的框架,它完全由写成。......

开心的哈士奇
10/11
0
0
springboot 整合quartz (三):jobStores

---layout: blogistop: truetitle: "springboot 整合quartz (三):jobStores"date: 2018-10-09category: springboottags: springboot quartz --- JobStore介绍 JobStore是负责跟踪调度器中所有......

开心的哈士奇
10/17
0
0
11、SpringBoot------定时任务

开发工具:STS 代码下载链接:https://github.com/theIndoorTrain/Springboot/tree/52ef6c0c805913db1e66ed18671c322e284233f0 前言: 之前我们有讲过Quartz任务调度。 现在,我们来讲解下S...

零度微笑
07/16
0
0
springboot(二)进阶篇

继之前的springboot(一)入门篇 ,本次主要讲springboot的一些其他特性,springboot内容很多,后续会持续更新. 多数据源 log4j日志 aop面向切面编程 缓存 定时任务 异步任务 自定义参数 多环境的...

zhuyuansj
2017/10/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

php获取客户端IP

php获取客户端IP 首先先阅读关于IP真实性安全的文章:如何正確的取得使用者 IP? 「任何從客戶端取得的資料都是不可信任的!」 HTTP_CLIENT_IP头是有的,但未成标准,不一定服务器都实现。 ...

DrChenXX
18分钟前
0
0
. The valid characters are defined in RFC 7230 and RFC 问题

通过这里的回答,我们可以知道: Tomcat在 7.0.73, 8.0.39, 8.5.7 版本后,添加了对于http头的验证。 具体来说,就是添加了些规则去限制HTTP头的规范性 参考这里 具体来说: org.apache.tom...

west_coast
37分钟前
1
0
刷leetcode第704题-二分查找

今天双十一买的算法书到货了,路上刷到有人说的这个题,借(chao)鉴(xi)一下别人的思路,这个是C++标准库里面的经典方法,思路精巧,优雅好品味 int search(int* nums, int numsSize, in...

锟斤拷烫烫烫
今天
1
0
【分享实录】BANCOR算法详解及代码实现

1 活动基本信息 1)主题:【区块链技术工坊22期】BANCOR算法详解及代码实现 2)议题: BANCOR算法的特点和优劣势 BANCOR算法和举例 如何加入BANCOR.NETWORK交易所 如何开发自己的BANCOR去中心...

HiBlock
今天
2
0
微信小程序(2)

开始看微信小程序的教程了。刚刚看完官方教程的视图层部分。这里摘录一些自己认为的部分关键点。 1.直接修改数值无法重新渲染,需要使用setData()方法; 2.列表渲染中:wx:key用于保持项目在...

MKjy
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部