文档章节

重写Spring的Scheduled来实现定时任务的开关功能

FEINIK
 FEINIK
发布于 01/01 21:16
字数 2053
阅读 162
收藏 0

1 概述

项目中一般都会有定时任务的相关案例,那么我们一般会通过java提供的ScheduledExecutorService 或Spring提供的TaskScheduler(也可使用@Scheduled注解)来实现,因为其不需要依赖外部模块,简单实用,但是Spring提供的@Scheduled注解的定时任务并不支持任务的动态开启与关闭,以及排期的动态修改,本案例通过重写Spring的定时任务注解@Scheduled 来使其可以支持应用服务不停止的情况下在线动态修改任务的排期以及启停功能。

案例代码地址:https://github.com/AIFEINIK/Spring-Scheduled

2 重写Scheduled

2.1 自定义JobScheduled注解

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface JobScheduled {
    /**
     * cron 表达式
     * @return
     */
    String cron() default "";

    /**
     * 时区
     * @return
     */
    String zone() default "";

    /**
     * 是否开启任务,默认开启
     * @return
     */
    String enable() default "true";

    /**
     * 下次任务执行时间与上次执行任务之间固定间隔毫秒数
     * @return
     */
    long fixedDelay() default -1;

    String fixedDelayString() default "";

    /**
     * 以固定速率执行任务
     * @return
     */
    long fixedRate() default -1;

    String fixedRateString() default "";

    /**
     * 初始延迟时间,毫秒
     * @return
     */
    long initialDelay() default -1;

    String initialDelayString() default "";
}

2.2 自定义注解JobScheduled的解析器类

@Component
public class JobScheduledAnnotationBeanPostProcessor
        implements BeanPostProcessor, Ordered, EmbeddedValueResolverAware, BeanNameAware,
        BeanFactoryAware, ApplicationContextAware, SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private DataConfig config;

    @Nullable
    private Object scheduler;

    @Nullable
    private StringValueResolver embeddedValueResolver;

    @Nullable
    private BeanFactory beanFactory;

    @Nullable
    private String beanName;

    private boolean init = true;

    @Nullable
    private ApplicationContext applicationContext;

    public Map<String, Object> beanMap = new HashMap<>();

    private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar();

    private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));

    @Override
    public void setEmbeddedValueResolver(StringValueResolver resolver) {
        this.embeddedValueResolver = resolver;
    }

    @Override
    public int getOrder() {
        return LOWEST_PRECEDENCE;
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    @Override
    public void setBeanName(String name) {
        this.beanName = name;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
        if (this.beanFactory == null) {
            this.beanFactory = applicationContext;
        }
    }

    @Nullable
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return getObject(bean);
    }

    public Object getObject(Object bean) {
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        if (!this.nonAnnotatedClasses.contains(targetClass)) {
            final Set<Method> annotatedMethods = new LinkedHashSet<>(1);
            ReflectionUtils.doWithMethods(targetClass, method -> {
                Set<JobScheduled> schedules = AnnotationUtils.getRepeatableAnnotations(method, JobScheduled.class, Schedules.class);
                for (JobScheduled jobScheduled : schedules) {
                    processScheduled(jobScheduled, method, bean);
                    annotatedMethods.add(method);
                }
            });

            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @JobScheduled annotations found on bean class: " + bean.getClass());
                }
            }
        }
        return bean;
    }

    protected void processScheduled(JobScheduled scheduled, Method method, Object bean) {
        try {
            Assert.isTrue(method.getParameterCount() == 0,
                    "Only no-arg methods may be annotated with @JobScheduled");
            if (init) {
                //将bean先保存到内存,等重新注册任务时使用
                beanMap.put(bean.getClass().getName(), bean);
            }
            Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
            Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
            boolean processedSchedule = false;
            String errorMessage =
                    "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

            // Determine initial delay
            long initialDelay = scheduled.initialDelay();
            String initialDelayString = config.getString(scheduled.initialDelayString());
            if (StringUtils.hasText(initialDelayString)) {
                Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
                if (this.embeddedValueResolver != null) {
                    initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
                }
                if (StringUtils.hasLength(initialDelayString)) {
                    try {
                        initialDelay = Long.valueOf(initialDelayString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                    }
                }
            }

            // Check cron expression
            String cron = config.getString(scheduled.cron());
            if (StringUtils.isEmpty(cron)) {
                cron = scheduled.cron();
            }

            // 检查是否开启任务
            String enableStr = config.getString(scheduled.enable());
            boolean enable;
            if (StringUtils.isEmpty(enableStr)) {
                enable = BooleanUtils.toBoolean(scheduled.enable());
            } else {
                enable = BooleanUtils.toBoolean(enableStr);
            }
            if (StringUtils.hasText(cron)) {
                String zone = scheduled.zone();
                if (this.embeddedValueResolver != null) {
                    cron = this.embeddedValueResolver.resolveStringValue(cron);
                    zone = this.embeddedValueResolver.resolveStringValue(zone);
                }
                if (StringUtils.hasLength(cron)) {
                    Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                    processedSchedule = true;
                    TimeZone timeZone;
                    if (StringUtils.hasText(zone)) {
                        timeZone = StringUtils.parseTimeZoneString(zone);
                    }
                    else {
                        timeZone = TimeZone.getDefault();
                    }

                    registerScheduledTask(method, bean, new CronTask(runnable, new CronTrigger(cron, timeZone)), enable);
                }
            }

            // At this point we don't need to differentiate between initial delay set or not anymore
            if (initialDelay < 0) {
                initialDelay = 0;
            }

            // Check fixed delay
            long fixedDelay = scheduled.fixedDelay();
            if (fixedDelay >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                registerScheduledTask(method, bean, new FixedDelayTask(runnable, fixedDelay, initialDelay), enable);
            }
            String fixedDelayString = config.getString(scheduled.fixedDelayString());
            if (StringUtils.hasText(fixedDelayString)) {
                if (this.embeddedValueResolver != null) {
                    fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
                }
                if (StringUtils.hasLength(fixedDelayString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedDelay = Long.valueOf(fixedDelayString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                    }
                    registerScheduledTask(method, bean, new FixedDelayTask(runnable, fixedDelay, initialDelay), enable);
                }
            }

            // Check fixed rate
            long fixedRate = scheduled.fixedRate();
            if (fixedRate >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                registerScheduledTask(method, bean, new FixedRateTask(runnable, fixedRate, initialDelay), enable);
            }
            String fixedRateString = config.getString(scheduled.fixedRateString());
            if (StringUtils.hasText(fixedRateString)) {
                if (this.embeddedValueResolver != null) {
                    fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
                }
                if (StringUtils.hasLength(fixedRateString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedRate = Long.valueOf(fixedRateString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                    }
                    registerScheduledTask(method, bean, new FixedRateTask(runnable, fixedRate, initialDelay), enable);

                }
            }

            // Check whether we had any attribute set
            Assert.isTrue(processedSchedule, errorMessage);
        }
        catch (IllegalArgumentException ex) {
            throw new IllegalStateException(
                    "Encountered invalid @JobScheduled method '" + method.getName() + "': " + ex.getMessage());
        }
    }

    private void registerScheduledTask(Method method, Object bean, Task task, boolean enable) {
        if (enable) {
            try {
                if (task instanceof CronTask) {
                    this.registrar.scheduleCronTask((CronTask) task);
                } else if (task instanceof FixedDelayTask) {
                    this.registrar.scheduleFixedDelayTask((FixedDelayTask) task);
                } else if (task instanceof FixedRateTask) {
                    this.registrar.scheduleFixedRateTask((FixedRateTask) task);
                }
                logger.info("------>类:" + bean.getClass().getName() + ",方法:" + method.getName() + "的调度任务已注册完成");
            } catch (Exception e) {
                logger.error("------>类:" + bean.getClass().getName() + ",方法:" + method.getName() + "的调度任务注册失败");
            }
        } else {
            logger.warn("------>类:" + bean.getClass().getName() + ",方法:" + method.getName() + "的调度任务未开启");
        }
    }

    @Override
    public void afterSingletonsInstantiated() {
        // Remove resolved singleton classes from cache
        this.nonAnnotatedClasses.clear();

        if (this.applicationContext == null) {
            // Not running in an ApplicationContext -> register tasks early...
            finishRegistration();
        }
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext() == this.applicationContext) {
            // Running in an ApplicationContext -> register tasks this late...
            // giving other ContextRefreshedEvent listeners a chance to perform
            // their work at the same time (e.g. Spring Batch's job registration).
            finishRegistration();
        }
    }

    private void finishRegistration() {
        if (this.scheduler != null) {
            this.registrar.setScheduler(this.scheduler);
        }

        if (this.beanFactory instanceof ListableBeanFactory) {
            Map<String, SchedulingConfigurer> beans =
                    ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
            List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
            AnnotationAwareOrderComparator.sort(configurers);
            for (SchedulingConfigurer configurer : configurers) {
                configurer.configureTasks(this.registrar);
            }
        }

        if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
            try {
                // 使用自定义的ThreadPoolTaskScheduler
                this.registrar.setScheduler(this.beanFactory.getBean("customerThreadPoolTaskScheduler"));
            }
            catch (NoUniqueBeanDefinitionException ex) {
                logger.debug("Could not find unique TaskScheduler bean", ex);
                try {
                    this.registrar.setScheduler(this.beanFactory.getBean(TaskScheduler.class));
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    if (logger.isInfoEnabled()) {
                        logger.info("More than one TaskScheduler bean exists within the context, and " +
                                "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                ex.getBeanNamesFound());
                    }
                }
            }
            catch (NoSuchBeanDefinitionException ex) {
                logger.debug("Could not find default TaskScheduler bean", ex);
                // Search for ScheduledExecutorService bean next...
                try {
                    this.registrar.setScheduler(this.beanFactory.getBean(ScheduledExecutorService.class));
                }
                catch (NoUniqueBeanDefinitionException ex2) {
                    logger.debug("Could not find unique ScheduledExecutorService bean", ex2);
                    try {
                        this.registrar.setScheduler(this.beanFactory.getBean(ScheduledExecutorService.class));
                    }
                    catch (NoSuchBeanDefinitionException ex3) {
                        if (logger.isInfoEnabled()) {
                            logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
                                    "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                    "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                    "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                    ex2.getBeanNamesFound());
                        }
                    }
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    logger.debug("Could not find default ScheduledExecutorService bean", ex2);
                    // Giving up -> falling back to default scheduler within the registrar...
                    logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
                }
            }
        }

        this.registrar.afterPropertiesSet();
    }

    @Override
    public void destroy() throws Exception {
        this.registrar.destroy();
    }

    /**
     * 重新注册任务
     */
    public void reRegister(){
        try {
            //注销之前注册的任务
            this.registrar.destroy();
            this.registrar.setCronTasksList(new ArrayList<>());
            this.init = false;

            for (Object o : this.beanMap.values()) {
                this.getObject(o);
            }

            //设置自定义任务线程池
            this.registrar.setScheduler(this.beanFactory.getBean("customerThreadPoolTaskScheduler"));
            this.registrar.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注:在其他类中可以直接引用JobScheduledAnnotationBeanPostProcessor,如果任务的配置属性值修改了,那么通过调用该类的reRegister来重新注册任务来完成任务的动态更新

2.3 使用被自定义注解@JobScheduled标注的任务执行

@Component
public class CustomerTask {

    @JobScheduled(cron = DataConfig.JOB1_CRON, enable = DataConfig.JOB1_ENABLE)
    public void job1() {
        System.out.println("调度任务1执行");
        try {
            Thread.sleep(60000);
            System.out.println("调度任务1执行完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @JobScheduled(cron = DataConfig.JOB2_CRON, enable = DataConfig.JOB2_ENABLE)
    public void job2() {
        System.out.println("调度任务2执行");
    }
}

注:需在Spring的java配置类中开启@EnableScheduling

2.4 @JobScheduled属性值动态获取类DataConfig

该类通过从数据库中类获取配置信息来模拟JobScheduled属性值被修改后,任务实时变化

@Component
public class DataConfig {

    @Autowired
    private JdbcTemplate template;
    @Autowired
    private JobScheduledAnnotationBeanPostProcessor processor;

    private Map<String, Object> dataMap = new ConcurrentHashMap<>();

    public static final String JOB1_CRON = "job1_cron";
    public static final String JOB1_ENABLE = "job1_enable";

    public static final String JOB2_CRON = "job2_cron";
    public static final String JOB2_ENABLE = "job2_enable";
    private boolean init = false;
    ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5);

    public void init() {
        List<DataKeyValue> dataKeyValues = getDataKeyValues();
        for (DataKeyValue dataKeyValue : dataKeyValues) {
            dataMap.put(dataKeyValue.getConfigKey(), dataKeyValue.getConfigValue());
        }

        //每10秒检查一个次配置数据是否有变化
        executorService.scheduleAtFixedRate(new DataChecker(), 0, 10, TimeUnit.SECONDS);
    }

    private List<DataKeyValue> getDataKeyValues() {
        String sql = "select config_key configKey,config_value configValue from data_config";
        return template.query(sql, new RowMapper<DataKeyValue>(){
                @Nullable
                @Override
                public DataKeyValue mapRow(ResultSet rs, int i) throws SQLException {
                    DataKeyValue data = new DataKeyValue();
                    data.setConfigKey(rs.getString("configKey"));
                    data.setConfigValue(rs.getString("configValue"));
                    return data;
                }
            });
    }

    @Setter
    @Getter
    static class DataKeyValue {
        private String configKey;
        private String configValue;
    }


    public String getString(String key) {
        if (!init) {
            this.init();
            init = true;
        }
        return (String) dataMap.get(key);
    }

    /**
     * 检查数据是否变化,如果变化了则重新注册调度任务
     */
    class DataChecker implements Runnable {
        @Override
        public void run() {
            List<DataKeyValue> dataKeyValues = getDataKeyValues();
            for (DataKeyValue config : dataKeyValues) {
                String oldConfigValue = (String) dataMap.get(config.getConfigKey());
                String newConfigValue = config.getConfigValue();
                if (StringUtils.isNotEmpty(oldConfigValue) &&
                        StringUtils.isNotEmpty(newConfigValue) &&
                        !newConfigValue.equals(oldConfigValue)) {
                    //更新map中的键值
                    dataMap.put(config.getConfigKey(), config.getConfigValue());

                    //重新注册调度任务,正在执行的任务线程会被中断
                    processor.reRegister();
                    break;
                }
            }
        }
    }
}

注:示例中配置的DataConfig数据是从Mysql数据库获取,也可以指定为其他数据库或配置中心来配置,该类中开启了一个定时任务每10秒从数据库检查一次数据是否变化,如有变化那么重新注册任务来达到不停止应用的情况下动态控制任务的排期以及启停状态。

需要注意的是:该案例中演示的修改其中一个任务的JobScheduled属性值,首先会中断所有的任务,然后在重新注册所有任务来达到动态修改的效果,也可以重写代码来对应到具体的一个任务上,而不影响其他任务,这个其实也简单,通过重写ScheduledTaskRegistrar可以实现,具体大家可以自己动手试试!

© 著作权归作者所有

FEINIK
粉丝 227
博文 61
码字总数 61705
作品 0
广州
高级程序员
私信 提问
Spring Boot实践--定时任务两种(Schedule与Quartz整合)

Spring Boot实践--定时任务两种(Schedule与Quartz整合) 最近在项目中使用到定时任务,之前一直都是使用Quartz 来实现,最近看Spring 基础发现其实Spring 提供 Spring Schedule 可以帮助我们实...

spinachgit
2018/02/11
357
0
Spring Boot:在Spring Boot中使用定时任务

本文主要介绍如何在Spring Boot中使用定时任务,假设你已经建好了一个基础的Spring Boot项目。首先,我们在项目中建立一个定时任务。 1.创建定时任务 package hello;import java.text.Simpl...

Element0506
2015/11/10
452
0
Spring Boot 定时任务单线程和多线程

最近在写springboot项目中一个数据转移的组件,本来是用java中的timer和Executor实现 可以有个springboot测试时关闭单例工厂的现象。现在试一试spring自己的线程管理是否可是不包上面的错误 ...

th778899
2018/07/13
281
0
spring boot实现动态增删启停定时任务

在spring boot项目中,可以通过@EnableScheduling注解和@Scheduled注解实现定时任务,也可以通过SchedulingConfigurer接口来实现定时任务。但是这两种方式不能动态添加、删除、启动、停止任务...

jessehua
04/22
0
0
Spring Cloud 2.x系列之springboot集成quartz

java设计模式之观察者模式 一大波视频分享 Mysql代理中间件Atlas安装和配置 在做项目时有时候会有定时器任务的功能,比如某某时间应该做什么,多少秒应该怎么样之类的。定时任务常用的业务场...

技术小能手
2018/10/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

最简单的获取相机拍照的图片

  import android.content.Intent;import android.graphics.Bitmap;import android.os.Bundle;import android.os.Environment;import android.provider.MediaStore;import andr......

MrLins
27分钟前
4
0
说好不哭!数据可视化深度干货,前端开发下一个涨薪点在这里~

随着互联网在各行各业的影响不断深入,数据规模越来越大,各企业也越来越重视数据的价值。作为一家专业的数据智能公司,个推从消息推送服务起家,经过多年的持续耕耘,积累沉淀了海量数据,在...

个推
28分钟前
7
0
第三方支付-返回与回调注意事项

不管是支付宝,微信,还是其它第三方支付,第四方支付,支付机构服务商只要涉及到钱的交易都要进行如下校验,全部成功了才视为成功订单 1.http请求是否成功 2.校验商户号 3.校验订单号及状态...

Shingfi
31分钟前
4
0
简述Java内存分配和回收策略以及Minor GC 和 Major GC(Full GC)

内存分配: 1. 栈区:栈可分为Java虚拟机和本地方法栈 2. 堆区:堆被所有线程共享,在虚拟机启动时创建,是唯一的目的是存放对象实例,是gc的主要区域。通常可分为两个区块年轻代和年老代。更...

DustinChan
37分钟前
6
0
Excel插入批注:可在批注插入文字、形状、图片

1.批注一直显示:审阅选项卡-------->勾选显示批注选项: 2.插入批注快捷键:Shift+F2 组合键 3.在批注中插入图片:鼠标右键点击批注框的小圆点【重点不可以在批注文本框内点击】----->调出批...

东方墨天
今天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部