文档章节

超简单使用redisson延迟队列做定时任务

王念博客
 王念博客
发布于 02/17 14:10
字数 1305
阅读 9.1K
收藏 11

前言:在工作开发中很多需求都需要用到定时任务,但是市面上多半都是轮询或者固定时间执行的开源工具,我之前写过一次基于quartz的定时任务,前端和分布式还需要完善 https://my.oschina.net/wangnian/blog/758054 ,编程式传入一个时间,到点就会按照事先配置好的执行。今天工作中又遇到了类似的需求,没有完善拿不出手,所以简单的封装一下redisson的API,只需要简单的传入间隔时间就可以了。

这个没有啥理论知识,就是对于redisson的delayedQueue延迟队列的封装,接下来直接粘贴代码,有需要的直接复制过去用,没需要的可以看看我的代码指指毛病。

1.导入redisson包

注意,如果是Springboot的项目强制使用,别单独引入redisson的jar包,再自己配置config

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.10.5</version>
</dependency>

2.用上面的包直接配置SpringBootRedis标准就可以

spring.redis.host=192.168.1.100
spring.redis.port=6379

3.新增一个启动项目时候就开启监听类RedisDelayedQueueInit

package com.test.redis.demo;

import com.alibaba.fastjson.JSON;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 初始化队列监听
 */
@Component
public class RedisDelayedQueueInit implements ApplicationContextAware {

    private static Logger logger = LoggerFactory.getLogger(RedisDelayedQueueInit.class);

    @Autowired
    RedissonClient redissonClient;

    /**
     * 获取应用上下文并获取相应的接口实现类
     *
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
        for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
            String listenerName = taskEventListenerEntry.getValue().getClass().getName();
            startThread(listenerName, taskEventListenerEntry.getValue());
        }
    }

    /**
     * 启动线程获取队列*
     *
     * @param queueName                 queueName
     * @param redisDelayedQueueListener 任务回调监听
     * @param <T>                       泛型
     * @return
     */
    private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        //由于此线程需要常驻,可以新建线程,不用交给线程池管理
        Thread thread = new Thread(() -> {
            logger.info("启动监听队列线程" + queueName);
            while (true) {
                try {
                    T t = blockingFairQueue.take();
                    logger.info("监听队列线程{},获取到值:{}", queueName, JSON.toJSONString(t));
                    new Thread(() -> {
                        redisDelayedQueueListener.invoke(t);
                    }).start();
                } catch (Exception e) {
                    logger.info("监听队列线程错误,", e);
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException ex) {
                    }
                }
            }
        });
        thread.setName(queueName);
        thread.start();
    }

}

4.新建一个接口RedisDelayedQueueListener

package com.test.redis.demo;

/**
 * 队列事件监听接口,需要实现这个方法
 *
 * @param <T>
 */
public interface RedisDelayedQueueListener<T> {
    /**
     * 执行方法
     *
     * @param t
     */
    void invoke(T t);
}

5.新增任务工具类

package com.test.redis.demo;

import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
public class RedisDelayedQueue {

    @Autowired
    RedissonClient redissonClient;

    private static Logger logger = LoggerFactory.getLogger(RedisDelayedQueue.class);

    /**
     * 添加队列
     *
     * @param t        DTO传输类
     * @param delay    时间数量
     * @param timeUnit 时间单位
     * @param <T>      泛型
     */
    public <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
        logger.info("添加队列{},delay:{},timeUnit:{}" + queueName, delay, timeUnit);
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(t, delay, timeUnit);
        delayedQueue.destroy();
    }

}

4.然后就可以运行并测试

新建自己的监听类继承上面的接口

package com.test.redis.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * 监听器
 */
@Component
public class TestListener implements RedisDelayedQueueListener<TaskBodyDTO> {

    Logger logger = LoggerFactory.getLogger(TestListener.class);

    @Override
    public void invoke(TaskBodyDTO taskBodyDTO) {
        //这里调用你延迟之后的代码
        logger.info("执行...." + taskBodyDTO.getBody() + "===" + taskBodyDTO.getName());
    }
}

新增DTO

package com.test.redis.demo;

import java.io.Serializable;

public class TaskBodyDTO implements Serializable {
    private String name;
    private String body;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }
}

运行测试

 @Override
    public void run(String... args) throws Exception {
        TaskBodyDTO taskBody = new TaskBodyDTO();
        taskBody.setBody("测试DTO实体类的BODY,3秒之后执行");
        taskBody.setName("测试DTO实体类的姓名,3秒之后执行");
        //添加队列3秒之后执行
        redisDelayedQueue.addQueue(taskBody, 10, TimeUnit.SECONDS, TestListener.class.getName());

        taskBody.setBody("测试DTO实体类的BODY,10秒之后执行");
        taskBody.setName("测试DTO实体类的姓名,10秒之后执行");
        //添加队列10秒之后执行
        redisDelayedQueue.addQueue(taskBody, 20, TimeUnit.SECONDS, TestListener.class.getName());

        taskBody.setBody("测试DTO实体类的BODY,20秒之后执行");
        taskBody.setName("测试DTO实体类的姓名,20秒之后执行");
        //添加队列20秒之后执行
        redisDelayedQueue.addQueue(taskBody, 30, TimeUnit.SECONDS, TestListener.class.getName());
    }

打印的值

2020-02-17 13:45:08.371  INFO 14700 --- [           main] com.test.redis.demo.DemoApplication      : 执行....测试DTO实体类的BODY,3秒之后执行===测试DTO实体类的姓名,3秒之后执行
2020-02-17 13:45:15.467  INFO 14700 --- [           main] com.test.redis.demo.DemoApplication      : 执行....测试DTO实体类的BODY,10秒之后执行===测试DTO实体类的姓名,10秒之后执行
2020-02-17 13:45:25.463  INFO 14700 --- [           main] com.test.redis.demo.DemoApplication      : 执行....测试DTO实体类的BODY,20秒之后执行===测试DTO实体类的姓名,20秒之后执行

 

5.结束抒情环节

哈哈,说了没有啥,太简单以至于我都不好意思说,但是不封装你又不能用,嗯嗯,这样想想我的开年第一篇划水笔记还是挺棒的。对了,我这里是通过DTO的名字来区分队列名的,所以每一个定时任务的DTO都不要相同。

大家对于定时任务有其他好用的方式,或者开源的好项目,欢迎下方留言评论,我将在评论区选出100位中奖用户送出iPhone5.

哎呀,不对 我没有粉丝,等我粉丝到10000了再选中奖用户吧,哈哈

© 著作权归作者所有

王念博客
粉丝 209
博文 122
码字总数 94978
作品 0
浦东
程序员
私信 提问
加载中

评论(20)

java是最好的编程
java是最好的编程
他的这个延迟队列,我用了,感觉还好。但是有一个问题。
我发现先是预发送,然后等待延迟,最后添加消息到队列中。
这样有一个问题,比如我在等待的时候,机器凉了,那么这个准备发出去的消息,就丢失了。

您好, 上面那位朋友反馈的这个问题, 我也有遇到吗? 您有什么解决方案吗?
王念博客
王念博客 博主
我又修改了一次 稍等 等会我就更新实现
王念博客
王念博客 博主
修改好了,你再看看
王念博客
王念博客 博主
不是预发送,addQueue的时候就放在redis里了呀。循环等待只是获取队列而已,我第一版本想简单放在addQueue的触发获取循环等待而已。现在已经切换成启动项目的时候就开启线程等待,这样启动项目就能消费redis中到期的数据了,不然需要业务addQueue一下才能开启线程等待队列监听
MrXionGe
MrXionGe
他的这个延迟队列,我用了,感觉还好。但是有一个问题。
我发现先是预发送,然后等待延迟,最后添加消息到队列中。
这样有一个问题,比如我在等待的时候,机器凉了,那么这个准备发出去的消息,就丢失了。
王念博客
王念博客 博主
发送的时候就addQueue的时候就放在redis里了呀。循环等待只是获取队列而已,我第一版本想简单放在addQueue的触发获取循环等待而已。现在已经切换成启动项目的时候就开启线程等待,这样启动项目就能消费redis中到期的数据了,不然需要业务addQueue一下才能开启线程等待队列监听
MrXionGe
MrXionGe
可能是我没有说明白。 您的代码没问题,我只是说Redisson的“delayedQueue.offer()”是延迟执行的。 通常我们希望的延迟队列是延迟消费,因为这样我们不必担心等待期内的消息丢失,而Redisson是在调用offer之后,过一段时间,才会把你的数据放到Queue中,所以我认为这样会有消息丢失的风险。
王念博客
王念博客 博主
为啥Redisson是在调用offer之后,过一段时间,才会把数据存到redis?调用offer()同步就把数据存到redis里了呀,Redisson这都是依赖于redis自己的过期时间轮询做的呀。
MrXionGe
MrXionGe
这个事情我还不是100%确认。但我们可以做一个实验。延迟10秒的例子。 我们开启Redisson debug级别日志,之后再去看redis的DB。 1.创建完client和queue之后,日志静默,直到10s之后才有redis的通信日志。 2.如果我们在中途kill掉例子,redis的队列中就不会再出现数据,消息丢失了。 所以我分析,它是延迟发送数据的。 且offer方法的注释说“Inserts element into this queue with specified transfer delay to destination queue.” 当然我这只是自己的简单分析。 我们可以深入探讨一下。
王念博客
王念博客 博主
对的 这个注释我也看到了,指定到目标队列的传输延迟,他的描述不够清楚或者翻译不对,中文的解释是 到你的目标队列监听,并不是redis。你这样延迟2分钟,发送之后直接关闭项目看redisDB就好了,然后再次启动项目消费
MrXionGe
MrXionGe
回复 @王念博客 : 我又重新跑了一个demo,确实如此。多谢指点,还是我实验没有做到位。👍
王念博客
王念博客 博主
回复 @MrXionGe : 哈哈 你就不应该往这个方向去想,任务要是在本地计时要redis干吗?也不会有这样的设计。。。我刚开始没有回答你,就是没有明白你说的意思😄
MrXionGe
MrXionGe
回复 @王念博客 : 我当时读了下他的doc,就有点怪,也没敢在生产使用。仅仅是简单做了下demo。
MrXionGe
MrXionGe
多机部署怎么样?我印象中它是多消费
MrXionGe
MrXionGe
多机没问题,毕竟是队列,取走了就没了
王念博客
王念博客 博主
哈哈 我都不用回答,自问自答
汉时的月
。。。
王念博客
王念博客 博主
你咋也无语了?😄
JAVA从入门到放弃到删库到跑路
王念博客
王念博客 博主
我又封装了一下 你看这下还无语吗😦
灵感来袭,基于Redis的分布式延迟队列

延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费。比如1分钟之后发送短信,发送邮件,检测数据状态等。 Redisson Delayed Queue 如果你项目中使用了redisso...

胡峻峥
03/28
0
0
SpringBoot | 第二十二章:定时任务的使用

前言 上两章节,我们简单的讲解了关于异步调用和异步请求相关知识点。这一章节,我们来讲讲开发过程也是经常会碰见的定时任务。比如每天定时清理无效数据、定时发送短信、定时发送邮件、支付...

oKong
2018/08/19
2.3K
3
基于kafka的定时消息/任务服务

定时任务,在很多业务场景中都会存在.一般,我们简单解决的话,就是使用数据库来存储数据供服务端周期获取执行.显然,对于数据库处理,如果多线程或者多机器处理,就会存在扩展的问题.比如:现在一个...

xiaomin0322
2018/04/18
1.5K
0
聊聊redisson的DelayedQueue

序 本文主要研究一下redisson的DelayedQueue maven 实例 这里使用了两个queue,对delayedQueue的offer操作是直接进入delayedQueue,但是delay是作用在目标队列上,这里就是RBlockingQueue 源...

go4it
2018/09/22
332
0
storm定时器timer源码分析-timer.clj

storm定时器与java.util.Timer定时器比较相似。java.util.Timer定时器实际上是个线程,定时调度所拥有的TimerTasks;storm定时器也有一个线程负责调度所拥有的"定时任务"。storm定时器的"定时...

Adel
2016/04/13
122
0

没有更多内容

加载失败,请刷新页面

加载更多

egg学习笔记第六天:使用中间件屏蔽可疑用户

站点有时候想屏蔽一些特定频繁抓取服务器数据的用户,可以放在中间件中去做,用户在指定Ip数组内,则屏蔽,如果不在,则匹配路由规则执行controller。 中间件的概念: 匹配路由前,匹配路由完...

一生懸命吧
23分钟前
34
0
005-其他技巧

css精灵图(css雪碧)sprites 减少服务器接收和发送请求的次数,提高页面加载速度 原理:将网页中的一些小背景图整合到一张大图中 使用background-position移动背景图位置-x/y坐标 字体图标ico...

沉默的懒猫
28分钟前
15
0
YouTube视频下载:Airy for mac

想在YouTube下载视频?借助适用于Mac的AIry YouTube下载程序,您可以获得一个简单而高效的下载程序,可以在瞬间处理来自YouTube的任何曲目或播放列表。只需找到您要下载的视频,选择格式和分...

MacW软件分享
37分钟前
34
0
guava中EvictingQueue使用与改进

一、简介 因为业务有一些服务器在国外,网络非常不稳定,执行http请求的时候波动很大。所以我们希望在网络变慢的时候通过http代理切换到其他服务器发送http请求。 如果界定变慢呢? 如果,最...

trayvon
今天
16
0
Python类继承对象 - Python class inherits object

问题: Is there any reason for a class declaration to inherit from object ? 类声明从object继承有什么理由吗? I just found some code that does this and I can't find a good reason......

javail
今天
18
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部