SpringCloudStream4-rabbit配置

原创
2023/04/19 15:03
阅读数 60

一、概述

    自SpringCloud2020开始,官方就标记了@StreamListener、@Input等注解为过期类,新增了函数式消息接收机制。到SpringCloud2022,过期类全部被移除,只能使用函数式消息接收机制。本文通过前后版本配置对比进行SpringCloudStream4版本下的消息配置介绍。

二、SpringCloud2020及之前版本

2.1公共配置

Pipeline

public interface TestPipeline {

	String INPUT_TEST = "input_test";
	String OUTPUT_TEST = "output_test";

	@Input(ServiceWorkerPipeline.INPUT_TEST)
	SubscribableChannel listen();

	
	@Output(ServiceWorkerPipeline.OUTPUT_TEST)
	MessageChannel send();
}

2.2生产者配置

application.properties

spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit

spring.cloud.stream.bindings.output_test.binder=rabbit
spring.cloud.stream.bindings.output_test.destination=test
spring.cloud.stream.bindings.output_test.content-type=application/json

发送消息

@Component
public class AsyncMsgUtil {
	private final Logger LOGGER = LoggerFactory.getLogger(AsyncMsgUtil.class);
	@Autowired
	private TestPipeline testPipeline;

	public void sendEmailMsg(String email, String title, TemplateKey4Mail key, Map<String, Object> args) {
		try {
			this.LOGGER.info("email={}", email);
			Message<MailMsgData> message = MessageBuilder.build(new MailMsgData(email, title, key, args), null);
			boolean flag = this.testPipeline.send().send(message);
			this.LOGGER.info("send status={}", flag);
		} catch (Exception e) {
			this.LOGGER.error("ERROR", e);
		}
	}
}

2.3消费者配置

application.properties

spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit

spring.cloud.stream.bindings.output_test.binder=rabbit
spring.cloud.stream.bindings.output_test.destination=test
spring.cloud.stream.bindings.output_test.content-type=application/json

消费消息代码

@StreamListener(target = TestPipeline.INPUT_TEST, condition =
	 "headers['action']=='test'")
public void test(User user,@Header(value = "caller") Long callerId) {
      //do something
}

2.4总结

    通过以上示例,可以看到,SpringCloud2020及之前版本,是通过一个Pipeline类声明输入输出通道。声明时主要声明INPUT和OUTPUT,INPUT的值对应生产者配置文件里spring.cloud.stream.bindings.<binding name>.*中的binding name;同理,OUTPUT的值也对应消费者配置文件里的binding name。

    另外消费消息时,通过注解@StreamListener指定通道,以及筛选指定header的消息。被注解的方法,不能有返回值,参数会自动注入。

三、SpringCloud2020及之后版本

3.1生产者配置

application.properties

spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit

发送消息

@Component
public class AsyncMsgUtil {
	private final Logger LOGGER = LoggerFactory.getLogger(AsyncMsgUtil.class);
	@Autowired
	private StreamBridge bridge;

	public void sendEmailMsg(String email, String title, TemplateKey4Mail key, Map<String, Object> args) {
		try {
			this.LOGGER.info("email={}", email);
			Message<MailMsgData> message = MessageBuilder.build(new MailMsgData(email, title, key, args), null);
			boolean flag = this.bridge.send("output_test-in-0", message);
			this.LOGGER.info("send status={}", flag);
		} catch (Exception e) {
			this.LOGGER.error("ERROR", e);
		}
	}
}

3.2消费者配置

application.properties

spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit

spring.cloud.function.definition=input_test
#此配置消息持久化,若不需要可注释
spring.cloud.stream.bindings.input_test-in-0.group=worker

消费消息代码

@Bean
	Consumer<JsonResult<User>> input_test() throws Exception {
		return jsonResult -> {
			System.out.println("testmq-----" + jsonResult);
		};
	}

3.3总结

    相比于之前,减少了很多配置,但是减少的前提是增大了约定。主要是:

        1、bridge.send("output_test-in-0", message)中“output_test-in-0”对应生产者配置文件中spring.cloud.stream.bindings.<binding name>.*的binding name,生产者可只配置rabbit信息,其它通过消费者配置实现最少化配置。

        2、消费者取消@StreamListener,使用普通的@Bean注解,所以配置里要指定哪些是rabbit的消费者,使用配置spring.cloud.function.definition指定,配置值是方法名,多个以英文分号进行。

        3、消费者配置spring.cloud.stream.bindings.<binding name>.*中的binding name命名有特定规则:<binding name>_in_0,同时<binding name>也是@    Bean注解的方法名。

        4、如果生产者0配置,那么消费者配置时需要额外配置下destination属性,比如生产者发送消息的aaa,那么消费者的配置应该为:

spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit

spring.cloud.function.definition=input_test
#此配置消息持久化,若不需要可注释
spring.cloud.stream.bindings.input_test-in-0.destination=aaa
spring.cloud.stream.bindings.input_test-in-0.group=worker

        5、函数式无法进行同一个通道按header接收消息,这块和之前不一致,所以生产者发送消息时,最好用枚举,避免随意发送而没地接收的情况。

        6、如果生产者在发生消息时按照<binding name>_in_0格式发送,且消息不需要持久化,那么生产者和消费者可以都只配置rabbit信息,不用配置其他任何信息。

有兴趣的可以关注下微信公共号。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部