文档章节

Spring Cloud构建微服务架构:消息驱动的微服务(消费组)【Dalston版】

程序猿DD
 程序猿DD
发布于 03/06 07:26
字数 737
阅读 378
收藏 3

通过之前的《消息驱动的微服务(入门)》一文,相信很多朋友已经对Spring Cloud Stream有了一个初步的认识。但是,对于《消息驱动的微服务(核心概念)》一文中提到的一些核心概念可能还有些迷糊,下面我们将详细的来学习一下这些概念。本文我们就来学习和使用一下“消费组”这一概念。

使用消费组实现消息消费的负载均衡

通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。

默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能,实现的方式非常简单,我们只需要在服务消费者端设置spring.cloud.stream.bindings.input.group属性即可,比如我们可以这样实现:

  • 先创建一个消费者应用SinkReceiver,实现了greetings主题上的输入通道绑定,它的实现如下:
@EnableBinding(value = {Sink.class})
public class SinkReceiver {

    private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive(User user) {
        logger.info("Received: " + user);
    }
}

  • 为了将SinkReceiver的输入通道目标设置为greetings主题,以及将该服务的实例设置为同一个消费组,做如下设置:
spring.cloud.stream.bindings.input.group=Service-A
spring.cloud.stream.bindings.input.destination=greetings

通过spring.cloud.stream.bindings.input.group属性指定了该应用实例都属于Service-A消费组,而spring.cloud.stream.bindings.input.destination属性则指定了输入通道对应的主题名。

  • 完成了消息消费者之后,我们再来实现一个消息生产者应用SinkSender,具体如下:
@EnableBinding(value = {Source.class})
public class SinkSender {

    private static Logger logger = LoggerFactory.getLogger(SinkSender.class);

    @Bean
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "2000"))
    public MessageSource<String> timerMessageSource() {
        return () -> new GenericMessage<>("{\"name\":\"didi\", \"age\":30}");
    }

}
  • 为消息生产者SinkSender做一些设置,让它的输出通道绑定目标也指向greetings主题,具体如下:
spring.cloud.stream.bindings.output.destination=greetings

到这里,对于消费分组的示例就已经完成了。分别运行上面实现的生产者与消费者,其中消费者我们启动多个实例。通过控制台,我们可以发现每个生产者发出的消息,会被启动的消费者以轮询的方式进行接收和输出。

以下专题教程也许您会有兴趣

本文内容部分节选自我的《Spring Cloud微服务实战》,稍作改变和内容升级

© 著作权归作者所有

共有 人打赏支持
程序猿DD
粉丝 391
博文 66
码字总数 87270
作品 4
闵行
私信 提问
加载中

评论(1)

黑暗骑士C
黑暗骑士C
Java主流技术分享:SpringBoot、SpringCloud、Docker、dubbo、redis、多线程高并发、数据库性能调优等等。https://charllee.github.io/2018/02/27/Java%E6%9C%80%E6%96%B0%E5%AD%A6%E4%B9%A0%E8%B5%84%E6%96%99-2018/
程序猿DD/SpringCloud-Learning

Spring Cloud教程 本项目内容为Spring Cloud教程的程序样例。如您觉得该项目对您有用,欢迎点击右上方的Star按钮,给予支持!! 我的博客:http://blog.didispace.com 我的小密圈(深度交流与...

程序猿DD
2016/11/03
0
0
SpringCloud_学习_00_资源帖

一、官方资料 1.springcloud中文网 2.Spring Cloud 官方中文文档(dalston) 3.spring cloud Quick Start 二、github 精选 1.翟永超-《Spring Cloud构建微服务架构》系列博文Dalston版 2.方志...

rayner
04/13
0
0
Spring Cloud构建微服务架构:消息驱动的微服务(入门)【Dalston版】

之前在写Spring Boot基础教程的时候写过一篇《Spring Boot中使用RabbitMQ》。在该文中,我们通过简单的配置和注解就能实现向RabbitMQ中生产和消费消息。实际上我们使用的对RabbitMQ的starter...

翟永超
2017/12/18
0
0
微服务选择Spring Cloud还是Dubbo?

点击关注 异步图书,置顶公众号 每天与你分享 IT好书 技术干货 职场知识 参与文末话题讨论,每日赠送异步图书。 ——异步小编 在阿里巴巴的生态中,微服务逐渐成为主要的服务形态,伴随着容器...

异步社区
04/27
0
0
Spring Cloud Stream如何处理消息重复消费?

最近收到好几个类似的问题:使用Spring Cloud Stream操作RabbitMQ或Kafka的时候,出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实,在之前的博文以及《Spr...

程序猿DD
11/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

使用Autowired和Qualifier解决多个相同类型的bean如何共存的问题

注意: 实现类UserServiceImpl,MyUserServiceImpl 需要区分:@Service("userServicel") @Service("myUserService") https://blog.csdn.net/russle/article/details/80287763......

qimh
17分钟前
1
0
SQL 语句使用to_char函数时,检索结果有空格

小疯在使用Oracle过程中,使用to_char函数检索表数据时发现检索结果前面会有一个空格,对后续开发有影响。问题很好解决,比较直接对可以做一下trim处理。但是小疯很疑惑为什么会有空格呢,于...

野小疯
19分钟前
1
0
对接比特币钱包的PHP开发包

BtcTool是一个基于第三方服务和离线裸交易实现的PHP比特币应用开发包,适合不希望部署本地 节点旳PHP开发者,开发包主要包含以下特性: 利用第三方服务获取指定地址的utxo集合 离线生成消费裸...

汇智网教程
37分钟前
1
0
【自用】 VHD to VHDX

VHDX: 在VHD 2TB 的基础上提供 64TB的容量。 支持逻辑扇区大小为 4KB,和每块的大小为 256MB,来优化虚拟磁盘性能。 比VHD提供更高的安全性、可靠性和性能。 convert-VHD –path d:\Hyper-v...

Tensor丨思悟
49分钟前
3
0
30 岁转行做Python开发晚吗?而且是零基础

最近有小伙伴问小编,30 岁转行做Python开发晚吗? 小编想说,其实无论男女,只要想学,有这个动力,就直接去行动。无论年龄,无论性别,只要你想一直勇往直前,那么想做的就去做吧~这里有一...

糖宝lsh
今天
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部