Spring Cloud Bus 消息总线

原创
04/11 17:39
阅读数 172

什么是消息总线?

在微服务架构的系统中, 我们通常会使用轻量级的消息代理来构建 个共用的消息主 题让系统中所有微服务实例都连接上来, 由于该主题中产生的消息会被所有实例监听和消 费, 所以我们称它为消息总线。 在总线上的各个实例都可以方便地广播 些需要让其他连 接在该主题上的实例都知道的消息, 例如配置信息的变更或者其他 些管理操作等。 由于消息总线在微服务架构系统中被广泛使用, 所以它同配置中心 样, 几乎是微服 务架构中的必备组件。 Spring Cloud 作为微服务架构综合性的解决方案,对此自然也有自己 的实现, 这就是本章我们将要具体介绍的 Spring Cloud Bus 。 通过使用 Spring Cloud Bus,
可以非常容易地搭建起消息总线, 同时实现了 些消息总线中的常用功能, 比如, 配合 Spring Cloud Config 实现微服务应用配置信息的动态更新等。

消息代理

消息代理 (Message Broker) 种消息验证、 传输、 路由的架构模式。 它在应用程序 之间起到通信调度并最小化应用之间的依赖的作用, 使得应用程序可以高效地解耦通信过 程。 消息代理是 个中间件产品, 它的核心是 个消息的路由程序, 用来实现接收和分发 消息, 并根据设定好的消息处理流来转发给正确的应用。 它包括独立的通信和消息传递协 议, 能够实现组织内部和组织间的网络通信。
 
• 将消息路由到 个或多个目的地。
• 消息转化为其他的表现方式。
• 执行消息的聚集、 消息的分解, 并将结果发送到它们的目的地, 然后重新组合响应
返回给消息用户。
• 调用Web服务来检索数据。
• 响应事件或错误。
• 使用发布-订阅模式来提供内容或基千主题的消息路由。
目前已经有非常多的开源产品可以供大家使用, 比如:
• ActiveMQ
• Kafka
RabbitMQ
RocketMQ
当前版本的Spring  Cloud Bus仅支待两款中间件产品: RabbitMQ和Kafka。

RabbitMQ整合Spring Cloud Bus

因为SpringCloud基千Spring Boot, 在上 节中我们已经体验了Spring Boot与 RabbitMQ的整 合, 所以在SpringCloud Bus中使用RabbitMQ也是非常容易配置的。
下面我们来具体动手尝试整个配置过程。
• 准备工作: 这里我们不创建新的应用, 但需要用到上 章中已经实现的 关于Spring Cloud Config的几个工程, 若读者对其还不了解, 建议先阅读第8章的内容。
• con巨g-repo: 定义在Git仓库中的 个目录,其中存储了应用名为中dispace 的多环境配置文件, 配置文件中有 个 from参数。
• config-server-eureka: 配置了Git仓库, 并注册到了Eureka的服务端。
• con巨g-c巨ent-eureka: 通过 Eureka发现ConfigServer的客户端, 应用名 为 中也space, 用来访问配置服务器以获取配置信息。 该应用中提供了 /from接口, 它会获取 con巨g-repo/di尘space-dev.properties中的 from属性并返回。
306 第9章 · 消息总线: Spring Cloud Bus
• 扩展con巨g-c巨ent-eureka应用 。
修改pom.xml, 增加spring-cloud-s迳江er-bus-amqp模块(注意spring- boot-starter-actuator模块也是必需的, 用 来提供刷新端点)。
<dependency>
<groupid>org.springframework.cloud</groupid>
<artifactid>spring-cloud-starter-bus-amqp</artifactid>
</dependency>
• 在配置文件中增加关千RabbitMQ的连接和用户信息。
spring.rabbitrnq.host=localhost
spring.rabbitrnq.port=5672
spring.rabbitrnq.usernarne = springcloud
spring.rabbi mq.password = l23456
• 启动 config-server-eureka, 再启动 两个con丘g-c巨ent-eureka C分别在
不同的端口上, 比如7002、 7003)。 我们可以在 config-c巨ent-eureka中的控 制台中看到如下内容, 在启动时, 客户端程序多了 个/bus/refresh请求。
o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/bus/refresh],methods=[POST]}" onto public void  org.springfrarnework.cloud.bus.endpoint.RefreshBusEndpoint.refresh(java.lang.String)
• 先访问两个con巨g-c止en七-eureka的/from 请求, 会返回当前config repo/d沁ispace-dev.proper巨es中的from属性。
• 接着, 修改con巨g-repo/di中space-dev .properties中的from属性值, 并发送POST请求到其中的 个/bus/refresh。
• 最后, 再分别访问启动的两个con巨g-c归ent-eureka的/from请求, 此时这 两个请求都会返回最新的 con丘g-repo/di中space-dev.proper巨es 中的 from属性。 到 这里, 我们已经能够通过SpringCloud Bus来实时更新 总线上的属性配置了 。

原理分析

当我们将系统启动起来之后, 图中"Service A"的三个实例会请求ConfigServer以 获 此时, 若我们需要修改"Service A"的属性。

首先, 通过Git管理工具去仓库中修改 对应的属性值,但是这个修改并不会触发"Service A"实例的属性更新。我们向"Service A" 的实例3发送POST请求, 访问/bus/refresh接口。 此时,"Service A"的实例3就会 将刷新请求发送到消息总线中, 该消息事件会被"Service A"的实例l和实例2从总线中 获取到, 并重新从 ConfigServer中获取它们的配置信息, 从而实现配置信息的动态更新。

而从Git仓库中配置的修改到发 起 /bus/refresh的POST请求这一步可以通过Git 所以在 Web Hook中就不需要维护所有节点内容来进行更新, 从而解决了上一章中仅通过 Web Hook来逐个进行刷新的问题。

指定刷新范围

在上面的例子中,我们通过向服务实例 请求SpringCloud Bus的/bus/refresh接口, 从而触发 总线上其他服务实例的/refresh。 但是在 些特殊场景下, 我们希望可以刷新 微服务中某个具体实例的配置。
Spring Cloud Bus 对这种场景也有很好的支待, /bus/refresh 接口提供了 destination参数, 用来定位具体要 刷新的应用程序。 比如, 我们可以请求 /bus/refresh?destination = customers:9000, 此时总线上的各应用实例会根据 destination属性的值来判断是否为自己的实例名, 若符合才进行配置刷新, 若不符合 就忽略该消息。
destination参数除了可以定位具体的实例之外, 还可以用来定位具体的服务。 定 位服 务 的原理是通过使用 Spring的PathMatcher (路径正则匹配)来实现的 , 比如 /bus/refresh?destination = customers:**, 该请求会触发 customers服务的所 有实例进行刷新。

架构优化

既然SpringCloud Bus的 /bus/refresh接口提供了针对服务和实例进行配置更新的 参数, 那么我们的架构也可以相应做出 些调整。 在之前的架构中, 服务的配置更新需要 通过向具体服务中的某个实例发送请求, 再触发对整个服务集群的配置更新。 虽然能实现 功能, 但是这样的结果是, 我们指定的应用实例会不同千集群中的其他应用实例, 这样会 增加集群内部的复杂度, 不利于将来的运维工作。 比如, 需要对服务实例进行迁移, 那么 我们不得不修改Web Hook中的配置等。 所以要尽可能地让服务集群中的各个节点是对等 的。
因此, 我们将之前的架构做了 些调整, 如下图所示。

我们主要做了以下这些改动:
1.在ConfigServer中也引入SpringCloud Bus, 将配置服务端也加入到消息总线中来。
2. /bus/refresh请求不再发送到具体服务实例上, 而是发送给Config Server, 并 通过destination参数来指定需要更新配置的服务或实例。
通过上面的改动,我们的服务实例不需要再承担触发配置更新的职责。 同时, 对于Git 的触发等配置都只需要针对ConfigServer即可, 从而简化了集群上的 些维护工作。

Kafka整合Spring Cloud Bus

在介绍Kafka之前,我们已经通过引入spring-cloud-starter-bus-amqp模块, 完成了使用RabbitMQ来实现消息总线。若我们要使用Kafka来实现消息总线 时, 只需把 spring-cloud-starter-bus-amqp替换成spring-cloud-starter-bus-kafka 模块,在pom.xml的dependency节点中进行修改, 具体如下:
<dependency> 
    <groupId>org.springframework.cloud</groupId> 
    <artifactId>spring-cloud-starter-bus-kafka</artifactId> 
</dependency>
如果在启动 Kafka 时均采用了默认配置, 那么我们不需要再做任何其他配置就能在本 地实现从 RabbitMQ Kafka 的切换。可以尝试把刚刚搭建的 ZooKeeper Kafka 启动起来, 并将修改为 spring-cloud-starter-bus-kafka 模块的 config-server config-cotent 启动起来。
config-server 启动时, 我们可以在控制台中看到如下输出:
2016-09-28 22:11:29.627 INFO 15144 --- [ main] 
o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound: 
springCloudBus
2016-09-28 22:11:29.642 INFO 15144 --- [-localhost:2181] 
org.IOitec.zkclien.ZkEventThread : Starting ZkClient event thread. 
016-09-28 22:11:30.290 INFO 15144 --- [ main] 
o.s.i.kafka.support.ProducerFactoryBean : Using producer properties =>
{bootstrap.servers=localhost:9092, linger.ms=O, acks=l, compression.type=none,
batch.size=l6384)
2016-09-28 22:11:30.298 INFO 15144 --- [ main] 
o.a.k.clients.producer.ProducerConfig : ProducerConfig values: 
2016-09-28 22: 11: 30. 322 INFO 15144 --- [ main) 
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding
{message-handler:ou七bound.springCloudBus} as a subscriber to the'springCloudBusOutput'
channel
2016-09-28 22: 11: 30. 322 INFO 15144 --- [ main) 
o.s.integration.channel.DirectChannel : Channel 
'config-server:7001.springCloudBusOutput' has 1 subscriber(s). 
2016-09-28 22:11:30.322 INFO 15144 --- [ main) 
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus
2016-09-28 22: 11: 31. 465 INFO 15144 --- [ main] 
s.i.k.i.KafkaMessageDrivenChannelAdapter : started
org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@4178
cb34
2016-09-28 22: 11: 31. 467 INFO 15144 --- [ main] 
o.s.c.s.b.k.KafkaMessageChanne1Binder$7 : Adding
{message-handler:inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282aOd5a
30b) as a subscriber to the 'bridge.springCloudBus' channel
2016-09-28 22:11:31.467 INFO 15144 --- [ main] 
o.s.c.s.b.k.KafkaMessageChanne1Binder$7 : started
inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b
从控制台的输出内容我们可以看到, config-server 连接到了 Kafka 中, 并使用了
名为 springCloudBus Topico 此时, 我们可以使用 kafka-topics --list --zookeeper localhost:2181 命令来查看当前 Kafka 中的 Topic 。 若已成功启动了 config-server 并配置正确, 可以 Kafka 中看到已经多了 个名为 springCloudBus Topico
 
我们再启动配置了 spring-cloud-starter-bus- kafka 模块的 config-client, 可以看到控制台中输出了如下内容:
2016-09-28 22:43:55.067 INFO 6136 --- [ main] 
o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound: 
springCloudBus 
2016-09-28 22:43:55.078 INFO 6136 --- [-localhost:2181] 
org.IOitec.zkclient.ZkEventThread : Starting ZkClient event thread. 
2016-09-28 22:50:38.584 INFO 828 --- [ main] 
o.s.i.kafka.support.ProducerFactoryBean : Using producer proper七ies => 
{bootstrap.servers=localhost:9092, manger.ms=O, acks=l, compression.type=none, 
batch.size=l6384} 
2016-09-28 22:50:38.592 INFO 828 --- [ main] 
o.a.k.clients.producer.ProducerConfig : ProducerConfig values: 
2016-09-28 22:50:38.615 INFO 828 --- [ main] 
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding 
{message-handler: outbound. springCloudBus} as a subscriber to the 'springCloudBusOutput' 
channel 
2016-09-28 22:50:38.616 INFO 828 --- [ main] 
o.s.integration.channel.DirectChannel : Channel 
'di立space:7002.springCloudBusOutput' has 1 subscriber(s). 
2016-09-28 22:50:38.616 INFO 828 --- [ main] 
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus 
2016-09-28 22:50:39.162 INFO 828 --- [ main) 
s.i.k.i.KafkaMessageDrivenChannelAdapter : started
org.springframework.in七egration.kafka.inbound.KafkaMessageDrivenChannelAdapter@60cf
855e 
2016-09-28 22:50:39.162 INFO 828 --- [ main) 
o.s.c.s.b.k.KafkaMessageChanne1Binder$7 : Adding 
{message-handler:inbound.springClouc!Bus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee 
216) as a subscriber to the 'bridge.springClouc!Bus' channel 
2016-09-28 22:50:39.163 INFO 828 --- [ main) 
o.s.c.s.b.k.KafkaMessageChanne1Binder$7 : started 
inbound.springClouc!Bus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216
可以看到, config-content 启 动时输出了类似的内容, 它们都订阅了名为 springCloudBus Topic 。 从这里我们也可以知道, 在消息总线上的节点, 从结构上来 说, 不论是 config-server 还是 config-content, 它们都是对等的。 在启动了 config-server config-c让ent 之后,为了更明显地观察消息总线刷 新配置 的效果, 我们可以在本地启动多个不同端口的 config-content 。 此时, 我们的 config-server 以及多个 config-content 都已经连接到了由 Kafka 实现的消息总线 上。 我们可以先访问各个 con巨g-c让ent 上 的 /from 请求, 查看它获取到的配置内容。 然后, 修改 Git 中对应的参数内容, 再访问各个 con丘g-c巨ent 上的 /from 请求, 可以 看到配置内容并没有改变。 最后, 我们向 con巨g-server 发 送 POST 请 求: /bus/ refresh, 此时再去访间各个config-content 上的 /from请求, 就能获得最新 的配置信息,各客户端上的配置都已经加载为最新的Git 配置内容。
从config-client的控制台中, 我们可以看到如下内容:
2016-09-29 08:20:34.361 INFO 21256 --- [ kafka-binder-1] 
o.s.cloud.bus.event.RefreshListener : Received remote refresh request. Keys refreshed [from) 
RefreshListener监听类记录了收到远程刷新请求, 并刷新了from属性的日志。

深入理解

启动消费者控制台之后,我们向con丘g-server发送POST请求: /bus/refresh, 此时在控制台中可以看到类似如下的内容:
content Type "application/json"
{
"type": "RefreshRemoteApplicationEvent", 
"timestamp": 1475073160814, 
"originService": "config-server: 7001", 
"destinationService": "*: * *", 
"id": "bbfbf495-39d8-4ff9-93d6-174873ff7299"
}

contentType "application/json"
{
“type": "AckRemoteApplicationEvent", 
"timestamp": 1475073160821, 
"originService": "config-server: 7001", 
"destinationService": "*: **", 
"id": "lf794774-10d6-4140-a80d-470983c6c0ff", 
"ackid": "bbfbf495-39d8-4ff9-93d6-174873ff7299", 
"ackDestinationService": "*: **", 
"event": "org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent"
}

contentType "application/json"
{
"type": "AckRemoteApplicationEvent", 
"timestamp": 1475075467554, 
"originService": "didispace: 7002", 
"destinationService": "*: **", 
"id": "7 56015le-f60c-4 9cd-8167-b69le84 6ad08", 
"ackid": "21502725-28f5-4dl 9-a98a-f8114fa4 fldc", 
"ackDestinationService": "*:**", 
"event": "org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent"
}
• type: 消息的事件类型。在上面的例子中,包含了RefreshRemoteApplicationEvent 和AckRemoteAppticationEvent。其中,RefreshRemoteApplicationEvent 事件就是我们用来刷新配置的事件,而AckRemoteApp巨ca巨onEvent是响应消 息已经正确接收的告知消息 事件。
• timestamp: 消息的时间戳。
• originService: 消息的来源服务实例。
• destinationService: 消息的目标服务实例。 上面示例中的 ** 代表了总线上 的所有服务实例。 如果想要指定服务或是实例, 在之前介绍RabbitMQ实现消息总 线时已经提过, 只需要通过使用des巨nation参数来定位具体要刷新的应用实例 即可, 比如发起/bus/refresh?des巨nation = d沁ispace请求, 就可以得到 如下的刷新事件消息,其中destinationService为didispace:**, 表示总 线上所有didispace服务的实例。
content Type "application/json"
{
    "type": "RefreshRemoteApplicationEvent", 
    "timestamp": 1475131215007, 
    "originService": "config-server: 7001", 
    "destinationService": "didispace:**", 
    "id": "667fe948-e9b2-447f-be22-3c8acf647ead"
}
• id: 消息的唯 标识。
上面的消息内容是RefreshRemoteApplicationEvent和AckRemoteApplicationEvent 类型共有的, 下面几个属性是AckRemoteApplicationEvent所特有的,分别表示如下 含义。
• ackid:Ack消息对应的消息来源。我们可以看到第 条AckRemoteApplication Event的ackid对应了 RefreshRemoteApplicationEvent的id, 说明这条 Ack是告知该 RefreshRemoteApplicationEvent事件的消息已经被收到。
• ackDestinationService: Ack 消息的目标服务实例。 可以看到这里使用的是 *   ** , 所以消息总线上所有的实例都会收到该Ack消息。
• event: Ack 消息的来源事件。 可以看到上例中的两个Ack均来源于刷新配置的 RefreshRemoteApplicationEvent事件, 我们在测试的时候由于启动了两个 config-client, 所以有两个实例接收到了配置刷新事件, 同时它们都会返回 Ack消息 。 由于ackDestinationService为 ** , 所以两个 config-client 都会收到对RefreshRemoteApplicationEvent事件的Ack消息。
 
 

 

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部