一、概述
自SpringCloud2020开始,官方就标记了@StreamListener、@Input等注解为过期类,新增了函数式消息接收机制。到SpringCloud2022,过期类全部被移除,只能使用函数式消息接收机制。本文通过前后版本配置对比进行SpringCloudStream4版本下的消息配置介绍。
二、SpringCloud2020及之前版本
2.1公共配置
Pipeline
public interface TestPipeline {
String INPUT_TEST = "input_test";
String OUTPUT_TEST = "output_test";
@Input(ServiceWorkerPipeline.INPUT_TEST)
SubscribableChannel listen();
@Output(ServiceWorkerPipeline.OUTPUT_TEST)
MessageChannel send();
}
2.2生产者配置
application.properties
spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit
spring.cloud.stream.bindings.output_test.binder=rabbit
spring.cloud.stream.bindings.output_test.destination=test
spring.cloud.stream.bindings.output_test.content-type=application/json
发送消息
@Component
public class AsyncMsgUtil {
private final Logger LOGGER = LoggerFactory.getLogger(AsyncMsgUtil.class);
@Autowired
private TestPipeline testPipeline;
public void sendEmailMsg(String email, String title, TemplateKey4Mail key, Map<String, Object> args) {
try {
this.LOGGER.info("email={}", email);
Message<MailMsgData> message = MessageBuilder.build(new MailMsgData(email, title, key, args), null);
boolean flag = this.testPipeline.send().send(message);
this.LOGGER.info("send status={}", flag);
} catch (Exception e) {
this.LOGGER.error("ERROR", e);
}
}
}
2.3消费者配置
application.properties
spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit
spring.cloud.stream.bindings.output_test.binder=rabbit
spring.cloud.stream.bindings.output_test.destination=test
spring.cloud.stream.bindings.output_test.content-type=application/json
消费消息代码
@StreamListener(target = TestPipeline.INPUT_TEST, condition =
"headers['action']=='test'")
public void test(User user,@Header(value = "caller") Long callerId) {
//do something
}
2.4总结
通过以上示例,可以看到,SpringCloud2020及之前版本,是通过一个Pipeline类声明输入输出通道。声明时主要声明INPUT和OUTPUT,INPUT的值对应生产者配置文件里spring.cloud.stream.bindings.<binding name>.*中的binding name;同理,OUTPUT的值也对应消费者配置文件里的binding name。
另外消费消息时,通过注解@StreamListener指定通道,以及筛选指定header的消息。被注解的方法,不能有返回值,参数会自动注入。
三、SpringCloud2020及之后版本
3.1生产者配置
application.properties
spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit
发送消息
@Component
public class AsyncMsgUtil {
private final Logger LOGGER = LoggerFactory.getLogger(AsyncMsgUtil.class);
@Autowired
private StreamBridge bridge;
public void sendEmailMsg(String email, String title, TemplateKey4Mail key, Map<String, Object> args) {
try {
this.LOGGER.info("email={}", email);
Message<MailMsgData> message = MessageBuilder.build(new MailMsgData(email, title, key, args), null);
boolean flag = this.bridge.send("output_test-in-0", message);
this.LOGGER.info("send status={}", flag);
} catch (Exception e) {
this.LOGGER.error("ERROR", e);
}
}
}
3.2消费者配置
application.properties
spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit
spring.cloud.function.definition=input_test
#此配置消息持久化,若不需要可注释
spring.cloud.stream.bindings.input_test-in-0.group=worker
消费消息代码
@Bean
Consumer<JsonResult<User>> input_test() throws Exception {
return jsonResult -> {
System.out.println("testmq-----" + jsonResult);
};
}
3.3总结
相比于之前,减少了很多配置,但是减少的前提是增大了约定。主要是:
1、bridge.send("output_test-in-0", message)中“output_test-in-0”对应生产者配置文件中spring.cloud.stream.bindings.<binding name>.*的binding name,生产者可只配置rabbit信息,其它通过消费者配置实现最少化配置。
2、消费者取消@StreamListener,使用普通的@Bean注解,所以配置里要指定哪些是rabbit的消费者,使用配置spring.cloud.function.definition指定,配置值是方法名,多个以英文分号进行。
3、消费者配置spring.cloud.stream.bindings.<binding name>.*中的binding name命名有特定规则:<binding name>_in_0,同时<binding name>也是@ Bean注解的方法名。
4、如果生产者0配置,那么消费者配置时需要额外配置下destination属性,比如生产者发送消息的aaa,那么消费者的配置应该为:
spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit
spring.cloud.function.definition=input_test
#此配置消息持久化,若不需要可注释
spring.cloud.stream.bindings.input_test-in-0.destination=aaa
spring.cloud.stream.bindings.input_test-in-0.group=worker
5、函数式无法进行同一个通道按header接收消息,这块和之前不一致,所以生产者发送消息时,最好用枚举,避免随意发送而没地接收的情况。
6、如果生产者在发生消息时按照<binding name>_in_0格式发送,且消息不需要持久化,那么生产者和消费者可以都只配置rabbit信息,不用配置其他任何信息。
有兴趣的可以关注下微信公共号。