文档章节

springboot rabbitmq 之死信队列(延迟消费消息)

码农小胖哥
 码农小胖哥
发布于 2018/02/28 16:08
字数 1154
阅读 2.4W
收藏 8

之前探讨了springboot 集成 rabbitmq  以及开启ack模式   

传送门:https://my.oschina.net/u/2948566/blog/1624963

接着该篇 搞一下 死信队列

  • 概念

死信队列 听上去像 消息“死”了     其实也有点这个意思,死信队列  是 当消息在一个队列 因为下列原因:

  1. 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
  2. 消息超期 (rabbitmq  Time-To-Live -> messageProperties.setExpiration()) 
  3. 队列超载

变成了 “死信” 后    被重新投递(publish)到另一个Exchange   该Exchange 就是DLX     然后该Exchange 根据绑定规则 转发到对应的 队列上  监听该队列  就可以重新消费     说白了 就是  没有被消费的消息  换个地方重新被消费

              生产者   -->  消息 --> 交换机  --> 队列  --> 变成死信  --> DLX交换机 -->队列 --> 消费者

  • springboot rabbitmq 死信队列实践

下面我们模拟一个死信队列的应用场景   消息延时处理

还是以这个项目为基础: https://gitee.com/felord/springboot-message

 项目中 RabbitConfig  死信相关片段:

   /**
     * 死信队列跟交换机类型没有关系 不一定为directExchange  不影响该类型交换机的特性.
     *
     * @return the exchange
     */
    @Bean("deadLetterExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
    }

    /**
     * 声明一个死信队列.
     * x-dead-letter-exchange   对应  死信交换机
     * x-dead-letter-routing-key  对应 死信队列
     *
     * @return the queue
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    声明  死信交换机
        args.put(DEAD_LETTER_QUEUE_KEY, "DL_EXCHANGE");
//       x-dead-letter-routing-key    声明 死信路由键
        args.put(DEAD_LETTER_ROUTING_KEY, "KEY_R");
        return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
    }

    /**
     * 定义死信队列转发队列.
     *
     * @return the queue
     */
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        return QueueBuilder.durable("REDIRECT_QUEUE").build();
    }

    /**
     * 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
     *
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);

    }

    /**
     * 死信路由通过 KEY_R 绑定键绑定到死信队列上.
     *
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
    }

说明: 

deadLetterExchange()声明了一个Direct 类型的Exchange (死信队列跟交换机没有关系

deadLetterQueue() 声明了一个队列   这个队列 跟前面我们声明的队列不一样    注入了 Map<String,Object> 参数    下面的概念非常重要   

x-dead-letter-exchange 来标识一个交换机  x-dead-letter-routing-key  来标识一个绑定键(RoutingKey)  这个绑定键 是分配给 标识的交换机的   如果没有特殊指定 声明队列的原routingkey , 如果有队列通过此绑定键 绑定到交换机    那么死信会被该交换机转发到 该队列上  通过监听 可对消息进行消费  

可以打个比方  这个是为主力队员 设置了一个替补   如果主力 “死”了   他的活 替补接手  这样更好理解

deadLetterBinding() 对这个带参队列 进行了 和交换机的规则绑定   等下 消费者 先把消息通过交换机投递到该队列中去   然后制造条件发生“死信” 

redirectBinding() 我们需要给标识的交换机  以及对其指定的routingkey 来绑定一个所谓的“替补”队列 用来监听

 

流程具体是  消息投递到  DL_QUEUE  10秒后消息过期 生成死信    然后转发到 REDIRECT_QUEUE  通过对其的监听  来消费消息     

SendController 增加消费发送接口 

    /**
     * 测试死信队列.
     *
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/dead")
    public ResponseEntity deadLetter(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//        声明消息处理器  这个对消息进行处理  可以设置一些参数   对消息进行一些定制化处理   我们这里  来设置消息的编码  以及消息的过期时间  因为在.net 以及其他版本过期时间不一致   这里的时间毫秒值 为字符串
        MessagePostProcessor messagePostProcessor = message -> {
            MessageProperties messageProperties = message.getMessageProperties();
//            设置编码
            messageProperties.setContentEncoding("utf-8");
//            设置过期时间10*1000毫秒
            messageProperties.setExpiration("10000");
            return message;
        };
//         向DL_QUEUE 发送消息  10*1000毫秒后过期 形成死信
        rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", p, messagePostProcessor, correlationData);
        return ResponseEntity.ok();
    }

 

监听 REDIRECT_QUEUE 

    /**
     * 监听替补队列 来验证死信.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  这里异常需要处理
     */
    @RabbitListener(queues = {"REDIRECT_QUEUE"})
    public void redirect(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.debug("dead message  10s 后 消费消息 {}",new String (message.getBody()));
    }

 

测试死信队列接口

不出意外  消息会在发出10秒后 才被消费     一下信息证实了这一猜测

 

相关源码:https://gitee.com/felord/springboot-message      希望大家点个赞

© 著作权归作者所有

码农小胖哥

码农小胖哥

粉丝 131
博文 128
码字总数 153947
作品 1
郑州
程序员
私信 提问
加载中

评论(3)

Jeffrey-JFei
Jeffrey-JFei
博主,写的很清晰,不过hashmap大小设置错了
码农小胖哥
码农小胖哥 博主

引用来自“a151605”的评论

就想问一下返回页面的那个json格式怎么弄的 谷歌插件还是jar包依赖

@a151605 插件
a
a151605
就想问一下返回页面的那个json格式怎么弄的 谷歌插件还是jar包依赖
Spring Boot(十四)RabbitMQ延迟队列

一、前言 延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。 实现...

王磊的博客
2018/11/16
403
0
springboot 集成rabbitmq 并采用ack模式 以及封装队列定义

rabbitmq简介 rabbitmq 是spring所在公司Pivotal自己的产品 是基于AMQP高级队列协议的消息中间件 采用erlang开发 因此安装需要erlang环境 具体安装根据自己的环境 因为跟spring有共同的血缘关...

码农小胖哥
2018/02/27
1.2W
14
Spring Boot与RabbitMQ结合实现延迟队列的示例

背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。 场景一:在订单系统中,一个用户下单之后通常有...

xiaomin0322
2018/05/11
756
0
SpringBoot | 第三十八章:基于RabbitMQ实现消息延迟队列方案

前言 >前段时间在编写通用的消息通知服务时,由于需要实现类似通知失败时,需要延后几分钟再次进行发送,进行多次尝试后,进入定时发送机制。此机制,在原先对接银联支付时,银联的异步通知也...

oKong
2019/07/21
202
2
SpringBoot | 第十二章:RabbitMQ的集成和使用

前言 上节讲了缓存数据库的使用,在实际工作中,一般上在系统或者应用间通信或者进行异步通知(登录后发送短信或者邮件等)时,都会使用消息队列进行解决此业务场景的解耦问题。这章节讲解下消...

oKong
2018/07/25
949
0

没有更多内容

加载失败,请刷新页面

加载更多

1核2G云服务哪家便宜?用来学习和搭建博客

前言: 又到一年续费时,我们来盘点哪些云厂商新手活动给力?有人说我又不是新手,有啥用?你要知道你作为家里唯一一位程序员,有强大的家庭后盾,比如爸爸妈妈爷爷奶奶叔叔阿姨......... 不过...

王念博客
23分钟前
302
1
JavaScript 箭头函数:适用与不适用场景

JavaScript 箭头函数:适用与不适用场景 现代 JavaScript 中最引人注目的功能之一是引入了箭头函数,用 => 来标识。 这种函数有两大优点 – 非常简洁的语法,和更直观的作用域和 this的绑定。...

王囧草
32分钟前
56
0
Docker快速入门

1 几个概念 Docker可以把开发的软件代码以及软件所依赖的所有运行时环境、依赖类库都打包成一个容器镜像,因此使用docker打包软件可以让程序员开发的程序运行在各种不同的计算机硬件环境中。...

即将秃头的Java程序员
33分钟前
68
0
Zookeeper-03-权限管理

Zookeeper-03-权限管理 用的不多,暂时先不整理了

moon888
35分钟前
36
0
渲染学习笔记——GPU应用阶段

1.GPU流水线 注:绿色可编程,橙色可控不可编程,红色完全不可控 2.顶点着色器 顶点着色器计算速度快于片元着色器,所以很多中间数据在顶点着色器计算。 3.裁剪 4.屏幕映射 5.三角形 6.片元着...

myctrd
41分钟前
61
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部