文档章节

【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务

 本人秃顶程序员
发布于 01/29 16:50
字数 1258
阅读 757
收藏 9

←←←←←←←←←←←← 快!点关注

让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务。首先,Spring Cloud Stream首先有什么好处?因为Spring AMPQ提供了访问AMPQ工件所需的一切。如果您不熟悉Spring AMPQ,请查看此repo,其中包含许多有用的示例。那么为什么要使用Spring Cloud Stream ......?

Spring Cloud Stream概念

  • Spring Cloud Stream通过Binder概念将使用过的消息代理与Spring Integration消息通道绑定在一起。支持RabbitMQ和Kafka。
  • Spring Cloud Stream将基础架构配置从代码中分离为属性文件。这意味着即使您更改了底层代理,您的Spring Integration代码也将是相同的!

示例中的Spring Cloud Stream概念(RabbitMQ)

让我们有一个名为streamInput的交换,它有两个队列streamInput.cities和streamInput.persons。现在让我们将这两个队列插入两个消息通道citiesChannel和personsChannel来消费来自它的传入消息。使用Spring AMPQ,您需要创建SimpleMessageListenerContainer并在代码中连接基础结构。但这有很多样板代码。使用Spring Cloud Stream,您可以将AMPQ配置分离到属性文件:

spring.cloud.stream.bindings.citiesChannel.destination=streamInput  
spring.cloud.stream.bindings.citiesChannel.group=cities  
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true  
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities

spring.cloud.stream.bindings.personsChannel.destination=streamInput  
spring.cloud.stream.bindings.personsChannel.group=persons  
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true  
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons  

配置详细信息

在类路径上使用RabbitMQ Binder,每个目标都映射到TopicExchange。在示例中,我创建了名为streamInput的TopicExchange, 并将其附加到两个消息通道citiesChannel和personsChannel。

spring.cloud.stream.bindings.citiesChannel.destination = streamInput   
spring.cloud.stream.bindings.personsChannel.destination = streamInput  

现在您需要了解RabbitMQ绑定器的灵感来自Kafka,队列的消费者被分组到消费者组中,其中只有一个消费者将获得消息。这是有道理的,因为您可以轻松扩展消费者。

因此,让我们创建两个队列streamInput.persons和streamInput.cities并将它们附加到streamInput TopicExchange和提到的消息通道

# This will create queue "streamInput.cities" connected to message channel citiesChannel where input messages will land.  
spring.cloud.stream.bindings.citiesChannel.group=cities 

# Durable subscription, of course.  
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true 

# AMPQ binding to exchange (previous spring.cloud.stream.bindings.<channel name>.destination settings).  
# Only messages with routingKey = 'cities' will land here.  
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities 

spring.cloud.stream.bindings.personsChannel.group=persons  
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true  
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons  

连接属性到Spring Integration

好的,到目前为止我创建了两个队列。StreamInput.cities绑定到citiesChannel。StreamInput.persons绑定到peopleChannel。

<destination>.<group>是Spring Cloud Stream约定的队列命名,现在让我们将它连接到Spring Integration:

package com.example.spring.cloud.configuration;

import org.springframework.cloud.stream.annotation.Input;  
import org.springframework.messaging.SubscribableChannel;

/**  
 \* Created by tomask79 on 30.03.17.  
 */  
public interface SinkRabbitAPI {

    String INPUT_CITIES = "citiesChannel";

    String INPUT_PERSONS = "personsChannel";

    @Input(SinkRabbitAPI.INPUT_CITIES)  
    SubscribableChannel citiesChannel();

    @Input(SinkRabbitAPI.INPUT_PERSONS)  
    SubscribableChannel personsChannel();  
}  

Spring Boot启动时加载这个属性

package com.example.spring.cloud;

import com.example.spring.cloud.configuration.SinkRabbitAPI;  
import com.example.spring.cloud.configuration.SourceRabbitAPI;  
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  
import org.springframework.cloud.stream.annotation.EnableBinding;  
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication  
@EnableBinding({SinkRabbitAPI.class})  
public class StreamingApplication {

    public static void main(String\[\] args) {  
        SpringApplication.run(StreamingApplication.class, args);  
    }  
}  

在此之后,我们可以创建消费者从绑定的消息通道中的队列接收消息:

import com.example.spring.cloud.configuration.SinkRabbitAPI;  
import com.example.spring.cloud.configuration.SourceRabbitAPI;  
import org.springframework.cloud.stream.annotation.StreamListener;  
import org.springframework.integration.support.MessageBuilder;  
import org.springframework.messaging.MessageChannel;  
import org.springframework.messaging.handler.annotation.SendTo;  
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**  
 \* Created by tomask79 on 30.03.17.  
 */  
@Service  
public class ProcessingAMPQEndpoint {

    @StreamListener(SinkRabbitAPI.INPUT_CITIES)  
    public void processCity(final String city) {  
        System.out.println("Trying to process input city: "+city);  
    }

    @StreamListener(SinkRabbitAPI.INPUT_PERSONS)  
    public void processPersons(final String person) {  
        System.out.println("Trying to process input person: "+person);  
    }  
}  

RabbitMQ绑定器和代理配置

Spring Cloud Stream如何知道在哪里寻找消息中间件?如果在类路径中找到RabbitMQ绑定器,则使用默认RabbitMQ主机(localhost)和端口(5672)连接到RabbitMQ服务器。如果您的消息中间件配置在不同端口,则需要配置属性:

spring:  
  cloud:  
    stream:  
      bindings:  
        ...  
      binders:  
          rabbitbinder:  
            type: rabbit  
            environment:  
              spring:  
                rabbitmq:  
                  host: rabbitmq  
                  port: 5672  
                  username: XXX  
                  password: XXX  

测试消息消费

  • 安装并运行RabbitMQ代理
  • rabbitmq.git
  • mvn clean install
  • java -jar target / streaming-0.0.1-SNAPSHOT.jar
  • 现在使用路由键'cities'或'persons'在streamInput Exchange上发布消息...输出应该是:
Started StreamingApplication in 6.513 seconds (JVM running for 6.92)   
Trying to process input city: sdjfjljksdflkjsdflkjsdfsfd  
Trying to process input person: sdjfjljksdflkjsdflkjsdfsfd  

使用Spring Cloud Stream重新传递消息

您通常希望在进入DLX交换之前再次尝试接收消息。首先,让我们配置Spring Cloud Stream尝试重新发送失败消息的次数:

spring.cloud.stream.bindings.personsChannel.consumer.maxAttempts = 6  

这意味着如果从streamInput.persons队列接收的消息出错,那么Spring Cloud Stream将尝试重新发送六次。让我们试试,首先让我们修改接收端点以模拟接收崩溃:

 @StreamListener(SinkRabbitAPI.INPUT_PERSONS)  
    public void processPersons(final String person) {  
        System.out.println("Trying to process input person: "+person);  
        throw new RuntimeException();  
    }  

如果我现在尝试使用人员路由键将某些内容发布到streamInput交换中,那么这应该是输出:

Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
 Retry Policy Exhausted  
        at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover  
(RejectAndDontRequeueRecoverer.java:45) ~\[spring-rabbit-1.7.0.RELEASE.jar! /:na\]  
        at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterc         

建议将Spring Cloud Stream 用于事件驱动的MicroServices,因为它可以节省时间,而且您不需要为Java中的AMPQ基础架构编写样板代码。

写在最后:

秃顶程序员的不易,看到这里,点了关注吧!
点关注,不迷路,持续更新!!!

© 著作权归作者所有

共有 人打赏支持
粉丝 5
博文 10
码字总数 19585
作品 0
长沙
私信 提问
加载中

评论(5)

南望山小飞
南望山小飞
无图无真相
r
ryannj
我就想看看你有秃
银杏卡卡
银杏卡卡
https://gitee.com/zkpursuit/kaka-notice-lib
本人实现的事件领域模型框架,不依赖消息队列,完全以事件驱动。
欢迎点评!
K
KeeperZoo
为我们的发际线和将来的发际线点赞
ZeroneLove
ZeroneLove
作者名字就表明编程资历,哈哈😅
【Spring Cloud Stream】异步任务

一、前言 前两篇博客提高了用线程池和消息队列才实现异步任务。本篇博客谈一谈用SpringCloud Stream来实现异步任务。 Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。它...

qq_26545305
2018/05/20
0
0
SpringCloud实战9-Stream消息驱动

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。   应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Clo...

狂小白
2018/05/29
0
0
Config Server——使用Spring Cloud Bus自动刷新配置

前文我们讨论了使用 端点手动刷新配置,但是如果所有微服务节点的配置都需要手动去刷新的话,那必然是一个繁琐的工作,并且随着系统的不断扩张,会变得越来越难以维护。因此,实现配置的自动...

eacdy0000
2017/10/13
0
0
SpringCloud 微服务 (十一) Spring Cloud Stream MQ-API

壹 上节学习了基础的RabbitMQ,本次继续学习MQ相关API---SpringCloud Stream 看到Stream大致会联想到流,input,output等信息,官网解释SpringCloud Stream是给微服务应用构建消息队列驱动的能力...

_大侠__
2018/07/05
0
0
【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream

一、前言 在前面的博客中,基本上已经把springcloud系列的大部分都介绍过了。如果有太明白的小白,还是建议从小编的第一篇博客进行学习。 在这篇博客中,小白向大家介绍一个消息事件驱动框架...

kisscatforever
2018/04/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

精品书籍推荐

JavaScript书籍推荐 1、[JavaScript高级程序设计(第3版)] 2、你不知道的JavaScript(中卷) 3、ES6标准入门(第二版)阮一峰

轻轻的往前走
29分钟前
6
0
JVM(六)为什么新生代有两个Survivor分区?

本文会使用排除法的手段,来讲解新生代的区域划分,从而让读者能够更清晰的理解分代回收器的原理,在开始之前我们先来整体认识一下分代收集器。 分代收集器会把内存空间分为:老生代和新生代...

王磊的博客
34分钟前
20
0
程序员最喜欢的15款文本编辑器推荐

程序员最喜欢的15款文本编辑器推荐 2017年09月18日 17:30:50 kangle_zhu 阅读数:59390 转载地址:http://www.cr173.com/html/50553_1.html 很多时候比如编程查看代码或者打开各种文档下我们...

linjin200
36分钟前
16
0
如何在php后端及时推送消息给客户端

walkor大神,目前需求是这样的: 有一群商家在后台网页处理批量导入产品 -》 服务器接受请求 -》 开始foreach一个一个处理导入请求; 我现在想每成功导入一个就推送到前台显示已经导入成功,...

dragon_tech
55分钟前
19
0
Java利用hanlp完成语句相似度分析的案例详解

分享一篇hanlp分词工具使用的小案例,即利用hanlp分词工具分析两个中文语句的相似度的案例。供大家一起学习参考! 在做考试系统需求时,后台题库系统提供录入题目的功能。在录入题目的时候,...

左手的倒影
今天
20
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部