文档章节

RabbitMQ进阶使用-延时队列的配置(Spring Boot)

w
 wailouci
发布于 10/10 20:25
字数 772
阅读 453
收藏 8

依赖

  • MAVEN配置pom.xml
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • Gradle配置build.gradle
compile('org.springframework.boot:spring-boot-starter-amqp')

连接配置

得益于spring boot的约定大于配置,只需要在application.yml加入下面配置即可。

spring:
  rabbitmq:
    host: host
    port: port
    username: admin
    password: passwd

简单自定义RabbitTemplate和Queue配置

默认的配置还是略显不足,增加序列化配置如下:

@Configuration
public class QueueConfig {

    /**
     * 自动注入为SimpleRabbitListenerContainerFactory的消息序列化转换器
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
    * 持久化交换机
    */
    @Bean(name = "exchange")
    public FanoutExchange exchange() {
        return new FanoutExchange("exchange1", true, false);
    }

    /**
    * 持久化队列
    */
    @Bean
    public Queue queue() {
        return new Queue("queue", true);
    }

    /**
     * 将队列和exchange绑定
     *
     * @return binding
     */
    @Bean
    Binding bindingSmsExchangeSmsQueue() {
        return BindingBuilder.bind(queue()).to(exchange());
    }

}

特殊延时队列的配置

延时队列的用法这里就不详细说了,参考Spring Boot与RabbitMQ结合实现延迟队列的示例,有些场景如未支付订单30分钟过期等,可通过延时队列实现

    @Bean
    public Queue delayQueue(){
        return QueueBuilder.durable("delayQueue")                      //队列名称
                .withArgument("x-message-ttl",10000)                      //死信时间
                .withArgument("x-dead-letter-exchange", "")            //死信重新投递的交换机
                .withArgument("x-dead-letter-routing-key", "queue")//路由到队列的routingKey
                .build();
    }

启动应用测试一下

启动应用,在rabbit管理web查看所有队列

  • 所有队列
  • 查看delayQueue详情,框框中为延时配置 将"x-message-ttl"参数改成20000重启发现问题,控制队列里面的参数也没有修改成功

修改带参数队列失败的问题

问题分析

根据日志提示,队列已经存在而且参数不一致导致,然后查看源码在RabbitAdmin发现下面代码,在创建队列失败的时候会调用logOrRethrowDeclarationException方法,logOrRethrowDeclarationException方法中发布了一个DeclarationExceptionEvent事件,到这里解决思路有,监听这个事件,然后删除相应的队列

	private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException {
		List<DeclareOk> declareOks = new ArrayList<DeclareOk>(queues.length);
		for (int i = 0; i < queues.length; i++) {
			Queue queue = queues[i];
			if (!queue.getName().startsWith("amq.")) {
				if (this.logger.isDebugEnabled()) {
					this.logger.debug("declaring Queue '" + queue.getName() + "'");
				}
				try {
					try {
						DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
								queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
						declareOks.add(declareOk);
					}
					catch (IllegalArgumentException e) {
						if (this.logger.isDebugEnabled()) {
							this.logger.error("Exception while declaring queue: '" + queue.getName() + "'");
						}
						try {
							if (channel instanceof ChannelProxy) {
								((ChannelProxy) channel).getTargetChannel().close();
							}
						}
						catch (TimeoutException e1) {
						}
						throw new IOException(e);
					}
				}
				catch (IOException e) {
					logOrRethrowDeclarationException(queue, "queue", e);
				}
			}
			else if (this.logger.isDebugEnabled()) {
				this.logger.debug(queue.getName() + ": Queue with name that starts with 'amq.' cannot be declared.");
			}
		}
		return declareOks.toArray(new DeclareOk[declareOks.size()]);
	}

	private <T extends Throwable> void logOrRethrowDeclarationException(Declarable element, String elementType, T t)
			throws T {
		DeclarationExceptionEvent event = new DeclarationExceptionEvent(this, element, t);
		this.lastDeclarationExceptionEvent = event;
		if (this.applicationEventPublisher != null) {
			this.applicationEventPublisher.publishEvent(event);
		}
		if (this.ignoreDeclarationExceptions || (element != null && element.isIgnoreDeclarationExceptions())) {
			if (this.logger.isDebugEnabled()) {
				this.logger.debug("Failed to declare " + elementType
						+ ": " + (element == null ? "broker-generated" : element)
						+ ", continuing...", t);
			}
			else if (this.logger.isWarnEnabled()) {
				Throwable cause = t;
				if (t instanceof IOException && t.getCause() != null) {
					cause = t.getCause();
				}
				this.logger.warn("Failed to declare " + elementType
						+ ": " + (element == null ? "broker-generated" : element)
						+ ", continuing... " + cause);
			}
		}
		else {
			throw t;
		}
	}

解决方法

写一个DeclarationExceptionEvent事件监听,处理创建失败的队列,既删除掉

@Component
public class DeclarationExceptionEventListener {

    @Autowired
    private AmqpAdmin rabbitAdmin;

    @EventListener(classes = DeclarationExceptionEvent.class)
    public void listen(DeclarationExceptionEvent event) {
        final Declarable declarable = event.getDeclarable();
        if (declarable instanceof Queue) {
            final Queue queue = (Queue) declarable;
            rabbitAdmin.deleteQueue(queue.getName());
        }
    }
}

改完重启应用,只有一条异常日志(原来4条),还有一条的原因是第一次创建失败发布事件,我们监听了事件进行处理。查看rabbit控制台,参数修改成功。

© 著作权归作者所有

共有 人打赏支持
w
粉丝 6
博文 4
码字总数 4314
作品 0
浦东
Spring Boot与RabbitMQ结合实现延迟队列的示例

背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。 场景一:在订单系统中,一个用户下单之后通常有...

xiaomin0322
05/11
0
0
RabbitMQ 入门 与 RabbitMQ 在 Spring Boot 中的使用

Message Broker与AMQP简介 Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集...

Anur
09/13
0
0
SpringBoot | 第十二章:RabbitMQ的集成和使用

前言 上节讲了缓存数据库的使用,在实际工作中,一般上在系统或者应用间通信或者进行异步通知(登录后发送短信或者邮件等)时,都会使用消息队列进行解决此业务场景的解耦问题。这章节讲解下消...

oKong
07/25
0
0
消息队列1:RabbitMQ解析并基于Springboot实战

目录 RabbitMQ简介 RabitMQ 概念模型 Exchange 类型 代码实战 RabbitMQ简介 AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此...

养码青年
09/07
0
0
Spring Boot+RabbitMQ学习笔记

RabbitMQ RabbitMQ 是一个由 Erlang 语言开发的 的开源实现。最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 消息队列 先从基础开始,加...

听_风
06/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

自己手写一个 SpringMVC 框架

前端框架很多,但没有一个框架称霸,后端框架现在Spring已经完成大一统.所以学习Spring是Java程序员的必修课. Spring 框架对于 Java 后端程序员来说再熟悉不过了,以前只知道它用的反射实现的,...

别打我会飞
15分钟前
0
0
01-《Apache Tomcat 9》之文件索引

《Apache Tomcat 9》是《看Apache官方文档学英语》的第一个专栏!让我们一起在看文档的过程中学英语,在学英语的过程中夯实技术! Documentation Index - 文件索引 Introduction - 介绍 This...

飞鱼说编程
16分钟前
0
0
最近

20181016最近在熟悉业务 关于money的 要涉及到流程中转同步 这个点感觉 业务大于技术 关于业务性的内容 还是要把自己及时清零的好 我们需要好好的梳理下业务内容 业务作为导向 技术提供解决方...

JAVA码猿
26分钟前
0
0
JDK1.8HashMap源码分析

HashMap和Hashtable的主要区别是: 1. Hashtable是线程安全,而HashMap则非线程安全,Hashtable的实现方法里面大部分都添加了synchronized关键字来确保线程同步,因此相对而言HashMap性能会高...

小小明童鞋
36分钟前
15
0
以Redis为例,详谈分布式系统缓存的细枝末节

前言: 在分布式Web程序设计中,解决高并发以及内部解耦的关键技术离不开缓存和队列,而缓存角色类似计算机硬件中CPU的各级缓存。如今的业务规模稍大的互联网项目,即使在最初beta版的开发上...

Java干货分享
38分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部