文档章节

spring-boot-rabbitmq动态管理

Mr---D
 Mr---D
发布于 2017/12/26 13:27
字数 877
阅读 1520
收藏 15

使用spring boot + rabbitmq的时候,在开发过程中,可能会想要临时停用/启用监听,或修改监听消费者数量。如果每次修改都重启比较浪费时间,所以研究了一下不停机就启用停用监听或修改一些配置

一. 关于rabbitmq监听的配置

  • 配置属性类:RabbitProperties,包含rabbitmq的认证、监听、发送者以及其他的一些配置
  • 自动配置类:RabbitAutoConfiguration,主要配置rabbitmq的连接工厂和发送者等,不包含监听的配置
  • rabbitmq监听的配置是RabbitAnnotationDrivenConfiguration,是通过RabbitAutoConfiguration引入的
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
    ...
}
  • RabbitAnnotationDrivenConfiguration中主要就是监听工厂的配置、监听工厂,但是这里也只是创建bean,并没有真正的初始化
  • 通过配置里的bean类名,分析一下,rabbitmq的监听肯定是由监听工厂创建的,所以找到监听工厂SimpleRabbitListenerContainerFactory
@Bean
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
		SimpleRabbitListenerContainerFactoryConfigurer configurer,
		ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    return factory;
}
  • 既然自动配置里面没有初始化监听,那就应该是在其他地方调用的,进入监听工厂类中,发现有initializeContainer(SimpleMessageListenerContainer instance)方法,猜测初始化肯定与这个方法有关,所以查看有哪些地方调用,于是找到RabbitListenerEndpointRegistry.createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory)方法中有创建监听容器和初始化的代码
/**
 * Create and start a new {@link MessageListenerContainer} using the specified factory.
 * @param endpoint the endpoint to create a {@link MessageListenerContainer}.
 * @param factory the {@link RabbitListenerContainerFactory} to use.
 * @return the {@link MessageListenerContainer}.
 */
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
		RabbitListenerContainerFactory<?> factory) {

    MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
    
    if (listenerContainer instanceof InitializingBean) {
    	try {
            ((InitializingBean) listenerContainer).afterPropertiesSet();
    	}
    	catch (Exception ex) {
            throw new BeanInitializationException("Failed to initialize message listener container", ex);
    	}
    }
    
    int containerPhase = listenerContainer.getPhase();
    if (containerPhase < Integer.MAX_VALUE) {  // a custom phase value
    	if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
            throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
            		this.phase + " vs " + containerPhase);
    	}
    	this.phase = listenerContainer.getPhase();
    }
    
    return listenerContainer;
}
  • 继续找调用这个方法的地方,找到RabbitListenerEndpointRegistrar.afterPropertiesSet()方法之后,发现调用的地方很多了

    调用RabbitListenerEndpointRegistrar.afterPropertiesSet()的地方

  • 看看afterPropertiesSet方法,是InitializingBean接口中的,猜测应该是spring容器创建bean之后都会调用的bean初始化的方法,所以查找找到RabbitListenerEndpointRegistrar是在哪里创建的实例。原来是在RabbitListenerAnnotationBeanPostProcessor中的私有属性,而RabbitListenerAnnotationBeanPostProcessor是在RabbitBootstrapConfiguration这个自动配置里面初始化的,所以这就找到rabbitmq初始化监听的源头了

二. 动态管理rabbitmq监听

  • 回到最初的问题,想要动态的启用停用mq的监听,所以先看看初始化配置的类,既然有初始化,那可能会有相关的管理,于是在RabbitListenerEndpointRegistry中找到了start()和stop()方法,里面有对监听容器进行操作,主要源码如下
/**
 * @return the managed {@link MessageListenerContainer} instance(s).
 */
public Collection<MessageListenerContainer> getListenerContainers() {
    return Collections.unmodifiableCollection(this.listenerContainers.values());
}
	
@Override
public void start() {
    for (MessageListenerContainer listenerContainer : getListenerContainers()) {
    	startIfNecessary(listenerContainer);
    }
}

/**
 * Start the specified {@link MessageListenerContainer} if it should be started
 * on startup or when start is called explicitly after startup.
 * @see MessageListenerContainer#isAutoStartup()
 */
private void startIfNecessary(MessageListenerContainer listenerContainer) {
    if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
    	listenerContainer.start();
    }
}

@Override
public void stop() {
    for (MessageListenerContainer listenerContainer : getListenerContainers()) {
    	listenerContainer.stop();
    }
}
  • 写个controller,注入RabbitListenerEndpointRegistry,使用start()和stop()对监听进行启用停用的操作,并且RabbitListenerEndpointRegistry实例还可以获取监听容器,对监听的一些参数也能进行修改,比如消费者数量。代码如下:
import java.util.Set;

import javax.annotation.Resource;

import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.itopener.framework.ResultMap;

/**
 * Created by fuwei.deng on 2017年7月24日.
 */
@RestController
@RequestMapping("rabbitmq/listener")
public class RabbitMQController {

    @Resource
    private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
    
    @RequestMapping("stop")
    public ResultMap stop(){
    	rabbitListenerEndpointRegistry.stop();
    	return ResultMap.buildSuccess();
    }
    
    @RequestMapping("start")
    public ResultMap start(){
    	rabbitListenerEndpointRegistry.start();
    	return ResultMap.buildSuccess();
    }
    
    @RequestMapping("setup")
    public ResultMap setup(int consumer, int maxConsumer){
    	Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds();
    	SimpleMessageListenerContainer container = null;
    	for(String id : containerIds){
    		container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id);
    		if(container != null){
    			container.setConcurrentConsumers(consumer);
    			container.setMaxConcurrentConsumers(maxConsumer);
    		}
    	}
    	return ResultMap.buildSuccess();
    }
    
}

© 著作权归作者所有

Mr---D
粉丝 108
博文 17
码字总数 37748
作品 0
成都
程序员
私信 提问
RabbitMQ 入门 与 RabbitMQ 在 Spring Boot 中的使用

Message Broker与AMQP简介 Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集...

Anur
2018/09/13
487
0
Spring Boot RabbitMQ 优先级队列

Docker With RabbitMQ 官方 Docker 镜像仓库地址 https://hub.docker.com/_/rabbitmq 本地运行 RabbitMQ 访问可视化面板 地址:http://127.0.0.1:15672/ 默认账号:guest 默认密码:guest S...

Anoyi
02/23
0
0
SpringCloud(第 037 篇)通过bus/refresh半自动刷新ConfigClient配置

SpringCloud(第 037 篇)通过bus/refresh半自动刷新ConfigClient配置 - 一、大致介绍 二、实现步骤 2.1 添加 maven 引用包 2.2 添加应用配置文件(springms-config-client-refresh-bus/src/...

HMILYYLIMH
2017/10/18
125
0
阿里云Kubernetes SpringCloud 实践进行时(2): 分布式配置管理

简介 为了更好地支撑日益增长的庞大业务量,我们常常需要把服务进行整合、拆分,使我们的服务不仅能通过集群部署抵挡流量的冲击,又能根据业务在其上进行灵活的扩展。随着分布式的普及、服务...

osswangxining
2018/05/25
0
0
Spring Boot整合RabbitMQ实例

什么是消息? 消息是一个或者多个实体之间沟通的一种方式并且无处不在。 自从计算机发明以来,计算机以多种多样的方式发送消息,消息定义了软硬件或者应用程序之间的沟通方式。消息总是有一个...

英雄有梦没死就别停
2018/06/27
305
1

没有更多内容

加载失败,请刷新页面

加载更多

Java Enum 底层原理

public enum WeekDay{ Mon("Monday"), Tue("Tuesday"), Wed("Wednesday"), Thu("Thursday"), Fri( "Friday"), Sat("Saturday"), Sun("Sunday"); private final String day; ......

MtrS
23分钟前
2
0
Python入门项目之锁屏单词壁纸,教你节约时间学习英语哦

很多人认为学python或者其他的编程语言,一定需要不错的英语,真的是这样吗? 不知道,但我知道怎么用python实现电脑锁屏单词壁纸,这对于每天都要多次面对锁屏屏幕的人来说,可以帮你节约每...

这人就爱编程
29分钟前
4
0
EasyPoi教程和使用案例

EasyPoi教程和使用案例 先上文档:http://easypoi.mydoc.io/ 基于Apache poi 开发的EasyPoi,比起poi更加简单易用,但是功能没有poi强大,。 特性总结: 优点: 通过简单的注解和模板 语言(熟...

Koro-Tong
29分钟前
3
0
ZetCode 绘图教程

来源:ApacheCN ZetCode 翻译项目 译者:飞龙 协议:CC BY-NC-SA 4.0 贡献指南 本项目需要校对,欢迎大家提交 Pull Request。 请您勇敢地去翻译和改进翻译。虽然我们追求卓越,但我们并不要求...

ApacheCN_飞龙
43分钟前
5
0
==和equals的区别是什么?

== 是关系运算符,equals() 是方法,结果都返回布尔值 Object 的 == 和 equals() 比较的都是地址,作用相同 == 作用: 基本类型,比较值 引用类型,比较内存地址 不能比较没有父子关系的两个...

ConstXiong
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部