重写Spring的Scheduled来实现定时任务的开关功能

原创
2019/01/01 21:16
阅读数 1.4K

1 概述

项目中一般都会有定时任务的相关案例,那么我们一般会通过java提供的ScheduledExecutorService 或Spring提供的TaskScheduler(也可使用@Scheduled注解)来实现,因为其不需要依赖外部模块,简单实用,但是Spring提供的@Scheduled注解的定时任务并不支持任务的动态开启与关闭,以及排期的动态修改,本案例通过重写Spring的定时任务注解@Scheduled 来使其可以支持应用服务不停止的情况下在线动态修改任务的排期以及启停功能。

案例代码地址:https://github.com/AIFEINIK/Spring-Scheduled

2 重写Scheduled

2.1 自定义JobScheduled注解

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface JobScheduled {
    /**
     * cron 表达式
     * @return
     */
    String cron() default "";

    /**
     * 时区
     * @return
     */
    String zone() default "";

    /**
     * 是否开启任务,默认开启
     * @return
     */
    String enable() default "true";

    /**
     * 下次任务执行时间与上次执行任务之间固定间隔毫秒数
     * @return
     */
    long fixedDelay() default -1;

    String fixedDelayString() default "";

    /**
     * 以固定速率执行任务
     * @return
     */
    long fixedRate() default -1;

    String fixedRateString() default "";

    /**
     * 初始延迟时间,毫秒
     * @return
     */
    long initialDelay() default -1;

    String initialDelayString() default "";
}

2.2 自定义注解JobScheduled的解析器类

@Component
public class JobScheduledAnnotationBeanPostProcessor
        implements BeanPostProcessor, Ordered, EmbeddedValueResolverAware, BeanNameAware,
        BeanFactoryAware, ApplicationContextAware, SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private DataConfig config;

    @Nullable
    private Object scheduler;

    @Nullable
    private StringValueResolver embeddedValueResolver;

    @Nullable
    private BeanFactory beanFactory;

    @Nullable
    private String beanName;

    private boolean init = true;

    @Nullable
    private ApplicationContext applicationContext;

    public Map<String, Object> beanMap = new HashMap<>();

    private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar();

    private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));

    @Override
    public void setEmbeddedValueResolver(StringValueResolver resolver) {
        this.embeddedValueResolver = resolver;
    }

    @Override
    public int getOrder() {
        return LOWEST_PRECEDENCE;
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    @Override
    public void setBeanName(String name) {
        this.beanName = name;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
        if (this.beanFactory == null) {
            this.beanFactory = applicationContext;
        }
    }

    @Nullable
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return getObject(bean);
    }

    public Object getObject(Object bean) {
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        if (!this.nonAnnotatedClasses.contains(targetClass)) {
            final Set<Method> annotatedMethods = new LinkedHashSet<>(1);
            ReflectionUtils.doWithMethods(targetClass, method -> {
                Set<JobScheduled> schedules = AnnotationUtils.getRepeatableAnnotations(method, JobScheduled.class, Schedules.class);
                for (JobScheduled jobScheduled : schedules) {
                    processScheduled(jobScheduled, method, bean);
                    annotatedMethods.add(method);
                }
            });

            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @JobScheduled annotations found on bean class: " + bean.getClass());
                }
            }
        }
        return bean;
    }

    protected void processScheduled(JobScheduled scheduled, Method method, Object bean) {
        try {
            Assert.isTrue(method.getParameterCount() == 0,
                    "Only no-arg methods may be annotated with @JobScheduled");
            if (init) {
                //将bean先保存到内存,等重新注册任务时使用
                beanMap.put(bean.getClass().getName(), bean);
            }
            Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
            Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
            boolean processedSchedule = false;
            String errorMessage =
                    "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

            // Determine initial delay
            long initialDelay = scheduled.initialDelay();
            String initialDelayString = config.getString(scheduled.initialDelayString());
            if (StringUtils.hasText(initialDelayString)) {
                Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
                if (this.embeddedValueResolver != null) {
                    initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
                }
                if (StringUtils.hasLength(initialDelayString)) {
                    try {
                        initialDelay = Long.valueOf(initialDelayString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                    }
                }
            }

            // Check cron expression
            String cron = config.getString(scheduled.cron());
            if (StringUtils.isEmpty(cron)) {
                cron = scheduled.cron();
            }

            // 检查是否开启任务
            String enableStr = config.getString(scheduled.enable());
            boolean enable;
            if (StringUtils.isEmpty(enableStr)) {
                enable = BooleanUtils.toBoolean(scheduled.enable());
            } else {
                enable = BooleanUtils.toBoolean(enableStr);
            }
            if (StringUtils.hasText(cron)) {
                String zone = scheduled.zone();
                if (this.embeddedValueResolver != null) {
                    cron = this.embeddedValueResolver.resolveStringValue(cron);
                    zone = this.embeddedValueResolver.resolveStringValue(zone);
                }
                if (StringUtils.hasLength(cron)) {
                    Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                    processedSchedule = true;
                    TimeZone timeZone;
                    if (StringUtils.hasText(zone)) {
                        timeZone = StringUtils.parseTimeZoneString(zone);
                    }
                    else {
                        timeZone = TimeZone.getDefault();
                    }

                    registerScheduledTask(method, bean, new CronTask(runnable, new CronTrigger(cron, timeZone)), enable);
                }
            }

            // At this point we don't need to differentiate between initial delay set or not anymore
            if (initialDelay < 0) {
                initialDelay = 0;
            }

            // Check fixed delay
            long fixedDelay = scheduled.fixedDelay();
            if (fixedDelay >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                registerScheduledTask(method, bean, new FixedDelayTask(runnable, fixedDelay, initialDelay), enable);
            }
            String fixedDelayString = config.getString(scheduled.fixedDelayString());
            if (StringUtils.hasText(fixedDelayString)) {
                if (this.embeddedValueResolver != null) {
                    fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
                }
                if (StringUtils.hasLength(fixedDelayString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedDelay = Long.valueOf(fixedDelayString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                    }
                    registerScheduledTask(method, bean, new FixedDelayTask(runnable, fixedDelay, initialDelay), enable);
                }
            }

            // Check fixed rate
            long fixedRate = scheduled.fixedRate();
            if (fixedRate >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                registerScheduledTask(method, bean, new FixedRateTask(runnable, fixedRate, initialDelay), enable);
            }
            String fixedRateString = config.getString(scheduled.fixedRateString());
            if (StringUtils.hasText(fixedRateString)) {
                if (this.embeddedValueResolver != null) {
                    fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
                }
                if (StringUtils.hasLength(fixedRateString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedRate = Long.valueOf(fixedRateString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                    }
                    registerScheduledTask(method, bean, new FixedRateTask(runnable, fixedRate, initialDelay), enable);

                }
            }

            // Check whether we had any attribute set
            Assert.isTrue(processedSchedule, errorMessage);
        }
        catch (IllegalArgumentException ex) {
            throw new IllegalStateException(
                    "Encountered invalid @JobScheduled method '" + method.getName() + "': " + ex.getMessage());
        }
    }

    private void registerScheduledTask(Method method, Object bean, Task task, boolean enable) {
        if (enable) {
            try {
                if (task instanceof CronTask) {
                    this.registrar.scheduleCronTask((CronTask) task);
                } else if (task instanceof FixedDelayTask) {
                    this.registrar.scheduleFixedDelayTask((FixedDelayTask) task);
                } else if (task instanceof FixedRateTask) {
                    this.registrar.scheduleFixedRateTask((FixedRateTask) task);
                }
                logger.info("------>类:" + bean.getClass().getName() + ",方法:" + method.getName() + "的调度任务已注册完成");
            } catch (Exception e) {
                logger.error("------>类:" + bean.getClass().getName() + ",方法:" + method.getName() + "的调度任务注册失败");
            }
        } else {
            logger.warn("------>类:" + bean.getClass().getName() + ",方法:" + method.getName() + "的调度任务未开启");
        }
    }

    @Override
    public void afterSingletonsInstantiated() {
        // Remove resolved singleton classes from cache
        this.nonAnnotatedClasses.clear();

        if (this.applicationContext == null) {
            // Not running in an ApplicationContext -> register tasks early...
            finishRegistration();
        }
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext() == this.applicationContext) {
            // Running in an ApplicationContext -> register tasks this late...
            // giving other ContextRefreshedEvent listeners a chance to perform
            // their work at the same time (e.g. Spring Batch's job registration).
            finishRegistration();
        }
    }

    private void finishRegistration() {
        if (this.scheduler != null) {
            this.registrar.setScheduler(this.scheduler);
        }

        if (this.beanFactory instanceof ListableBeanFactory) {
            Map<String, SchedulingConfigurer> beans =
                    ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
            List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
            AnnotationAwareOrderComparator.sort(configurers);
            for (SchedulingConfigurer configurer : configurers) {
                configurer.configureTasks(this.registrar);
            }
        }

        if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
            try {
                // 使用自定义的ThreadPoolTaskScheduler
                this.registrar.setScheduler(this.beanFactory.getBean("customerThreadPoolTaskScheduler"));
            }
            catch (NoUniqueBeanDefinitionException ex) {
                logger.debug("Could not find unique TaskScheduler bean", ex);
                try {
                    this.registrar.setScheduler(this.beanFactory.getBean(TaskScheduler.class));
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    if (logger.isInfoEnabled()) {
                        logger.info("More than one TaskScheduler bean exists within the context, and " +
                                "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                ex.getBeanNamesFound());
                    }
                }
            }
            catch (NoSuchBeanDefinitionException ex) {
                logger.debug("Could not find default TaskScheduler bean", ex);
                // Search for ScheduledExecutorService bean next...
                try {
                    this.registrar.setScheduler(this.beanFactory.getBean(ScheduledExecutorService.class));
                }
                catch (NoUniqueBeanDefinitionException ex2) {
                    logger.debug("Could not find unique ScheduledExecutorService bean", ex2);
                    try {
                        this.registrar.setScheduler(this.beanFactory.getBean(ScheduledExecutorService.class));
                    }
                    catch (NoSuchBeanDefinitionException ex3) {
                        if (logger.isInfoEnabled()) {
                            logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
                                    "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                    "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                    "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                    ex2.getBeanNamesFound());
                        }
                    }
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    logger.debug("Could not find default ScheduledExecutorService bean", ex2);
                    // Giving up -> falling back to default scheduler within the registrar...
                    logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
                }
            }
        }

        this.registrar.afterPropertiesSet();
    }

    @Override
    public void destroy() throws Exception {
        this.registrar.destroy();
    }

    /**
     * 重新注册任务
     */
    public void reRegister(){
        try {
            //注销之前注册的任务
            this.registrar.destroy();
            this.registrar.setCronTasksList(new ArrayList<>());
            this.init = false;

            for (Object o : this.beanMap.values()) {
                this.getObject(o);
            }

            //设置自定义任务线程池
            this.registrar.setScheduler(this.beanFactory.getBean("customerThreadPoolTaskScheduler"));
            this.registrar.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注:在其他类中可以直接引用JobScheduledAnnotationBeanPostProcessor,如果任务的配置属性值修改了,那么通过调用该类的reRegister来重新注册任务来完成任务的动态更新

2.3 使用被自定义注解@JobScheduled标注的任务执行

@Component
public class CustomerTask {

    @JobScheduled(cron = DataConfig.JOB1_CRON, enable = DataConfig.JOB1_ENABLE)
    public void job1() {
        System.out.println("调度任务1执行");
        try {
            Thread.sleep(60000);
            System.out.println("调度任务1执行完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @JobScheduled(cron = DataConfig.JOB2_CRON, enable = DataConfig.JOB2_ENABLE)
    public void job2() {
        System.out.println("调度任务2执行");
    }
}

注:需在Spring的java配置类中开启@EnableScheduling

2.4 @JobScheduled属性值动态获取类DataConfig

该类通过从数据库中类获取配置信息来模拟JobScheduled属性值被修改后,任务实时变化

@Component
public class DataConfig {

    @Autowired
    private JdbcTemplate template;
    @Autowired
    private JobScheduledAnnotationBeanPostProcessor processor;

    private Map<String, Object> dataMap = new ConcurrentHashMap<>();

    public static final String JOB1_CRON = "job1_cron";
    public static final String JOB1_ENABLE = "job1_enable";

    public static final String JOB2_CRON = "job2_cron";
    public static final String JOB2_ENABLE = "job2_enable";
    private boolean init = false;
    ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5);

    public void init() {
        List<DataKeyValue> dataKeyValues = getDataKeyValues();
        for (DataKeyValue dataKeyValue : dataKeyValues) {
            dataMap.put(dataKeyValue.getConfigKey(), dataKeyValue.getConfigValue());
        }

        //每10秒检查一个次配置数据是否有变化
        executorService.scheduleAtFixedRate(new DataChecker(), 0, 10, TimeUnit.SECONDS);
    }

    private List<DataKeyValue> getDataKeyValues() {
        String sql = "select config_key configKey,config_value configValue from data_config";
        return template.query(sql, new RowMapper<DataKeyValue>(){
                @Nullable
                @Override
                public DataKeyValue mapRow(ResultSet rs, int i) throws SQLException {
                    DataKeyValue data = new DataKeyValue();
                    data.setConfigKey(rs.getString("configKey"));
                    data.setConfigValue(rs.getString("configValue"));
                    return data;
                }
            });
    }

    @Setter
    @Getter
    static class DataKeyValue {
        private String configKey;
        private String configValue;
    }


    public String getString(String key) {
        if (!init) {
            this.init();
            init = true;
        }
        return (String) dataMap.get(key);
    }

    /**
     * 检查数据是否变化,如果变化了则重新注册调度任务
     */
    class DataChecker implements Runnable {
        @Override
        public void run() {
            List<DataKeyValue> dataKeyValues = getDataKeyValues();
            for (DataKeyValue config : dataKeyValues) {
                String oldConfigValue = (String) dataMap.get(config.getConfigKey());
                String newConfigValue = config.getConfigValue();
                if (StringUtils.isNotEmpty(oldConfigValue) &&
                        StringUtils.isNotEmpty(newConfigValue) &&
                        !newConfigValue.equals(oldConfigValue)) {
                    //更新map中的键值
                    dataMap.put(config.getConfigKey(), config.getConfigValue());

                    //重新注册调度任务,正在执行的任务线程会被中断
                    processor.reRegister();
                    break;
                }
            }
        }
    }
}

注:示例中配置的DataConfig数据是从Mysql数据库获取,也可以指定为其他数据库或配置中心来配置,该类中开启了一个定时任务每10秒从数据库检查一次数据是否变化,如有变化那么重新注册任务来达到不停止应用的情况下动态控制任务的排期以及启停状态。

需要注意的是:该案例中演示的修改其中一个任务的JobScheduled属性值,首先会中断所有的任务,然后在重新注册所有任务来达到动态修改的效果,也可以重写代码来对应到具体的一个任务上,而不影响其他任务,这个其实也简单,通过重写ScheduledTaskRegistrar可以实现,具体大家可以自己动手试试!

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部