文档章节

Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

程序猿DD
 程序猿DD
发布于 2018/12/27 22:11
字数 749
阅读 245
收藏 7

应用场景

有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:

@StreamListener(value = TestTopic.INPUT)
public void receiveV1(String payload, @Header("version") String version) {
    if("1.0".equals(version)) {
        // Version 1.0
    }
    if("2.0".equals(version)) {
        // Version 2.0
    }
}

那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在@StreamListener注解中提供了一个不错的属性condition,可以用来优化这样的处理结构。

动手试试

下面通过编写一个简单的例子来具体体会一下这个属性的用法:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        /**
         * 消息生产接口
         *
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());
            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());
            return "ok";
        }

    }

    /**
     * 消息消费逻辑
     */
    @Slf4j
    @Component
    static class TestListener {

        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")
        public void receiveV1(String payload, @Header("version") String version) {
            log.info("Received v1 : " + payload + ", " + version);
        }

        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")
        public void receiveV2(String payload, @Header("version") String version) {
            log.info("Received v2 : " + payload + ", " + version);
        }

    }

    interface TestTopic {

        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();

    }

}

内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另外一条消息的头信息中包含version=2.0。在消息监听类TestListener中,对TestTopic.INPUT通道定义了两个@StreamListener,这两个监听逻辑有不同的condition,这里的表达式表示会根据消息头信息中的version值来做不同的处理逻辑分发。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-content-route
spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

2018-12-24 15:50:33.361  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v1 : hello, 1.0
2018-12-24 15:50:33.363  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v2 : hello, 2.0

从日志中可以看到,两条带有不同头信息的消息,分别通过不同的监听处理逻辑输出了对应的日志打印。

本文首发:http://blog.didispace.com/spring-cloud-starter-finchley-7-6/

© 著作权归作者所有

共有 人打赏支持
程序猿DD
粉丝 416
博文 80
码字总数 107057
作品 4
闵行
私信 提问
Spring Cloud (十五)Stream 入门、主要概念与自定义消息发送与接收

前言 不写随笔的日子仿佛就是什么都没有产出一般……上节说到要学Spring Cloud Bus,这里发现按照官方文档的顺序反而会更好些,因为不必去后边的章节去为当前章节去打基础,所以我们先学习S...

东北小狐狸
2018/07/31
0
0
Spring Cloud Stream如何消费自己生产的消息?

在上一篇《Spring Cloud Stream如何处理消息重复消费》中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题。本文将继续说说在另外一个被经常问到的问题:如果...

程序猿DD
2018/11/28
0
0
SpringBoot与RocketMQ客户端集成原理解读与示例

本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何...

技术小能手
2018/11/13
0
0
【Spring Cloud Stream】异步任务

一、前言 前两篇博客提高了用线程池和消息队列才实现异步任务。本篇博客谈一谈用SpringCloud Stream来实现异步任务。 Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。它...

qq_26545305
2018/05/20
0
0
Spring Cloud Stream如何处理消息重复消费?

最近收到好几个类似的问题:使用Spring Cloud Stream操作RabbitMQ或Kafka的时候,出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实,在之前的博文以及《Spr...

程序猿DD
2018/11/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

MQ学习-基本概念区分

消息队列 Kafka 涉及的专有名词和术语进行定义和解释,方便您更好地理解相关概念并使用该产品。 Broker: 消息队列 Kafka 集群包含一个或多个消息处理服务器,该服务器被称为 Broker。 Topi...

os1cheng
14分钟前
0
0
腾讯怒怼:靠红包骗用户下载怎么可以叫产品

近日,社交圈出现了大动荡,三款新推出的社交软件全部被微信封杀,对此,腾讯公关总监在回应外界对于1月15日三款社交新产品撼动微信的消息,他呼吁媒体在批评的同时应当尊重事实,“我们尊重...

linux-tao
今天
1
0
面试必考-数据优化

sql语句优化 性能不理想的系统中除了一部分是因为应用程序的负载确实超过了服务器的实际处理能力外,更多的是因为系统存在大量的SQL语句需要优化。 为了获得稳定的执行性能,SQL语句越简单越好...

瑞查德-Jack
今天
2
0
spring aop学习

1.aop和spring aop的关系? aop是一种思想,spring aop是aop的一种实现。 2.Spring aop和AspectJ的关系? 这两种都是实现aop的,spring借助AspectJ的语法实现aop的功能 3.@AspectJ support和...

llsydn
今天
4
0
OSChina 周六乱弹 —— 世界的源代码

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @小鱼丁 :#今日歌曲推荐# 分享Jason Mraz的单曲《Prettiest Friend (Demo)》: 《Prettiest Friend (Demo)》- Jason Mraz 手机党少年们想听歌...

小小编辑
今天
475
12

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部