Spring Cloud Bus 消息总线介绍 原 荐

Spring Cloud Bus 消息总线介绍
                                                原
                                                    荐

作者 | 洛夜 来源 | 阿里巴巴云原生公众号

在 Spring 生态中玩转 RocketMQ 系列文章:

  • 《如何在 Spring 生态中玩转 RocketMQ?》
  • 《罗美琪和春波特的故事...》
  • 《RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?》
  • 《使用 rocketmq-spricurling溃疡名词解释ng-boot-starter 来配置、发送和消费 RocketMQ 消息》
  • 《Spring Cloud Stream 体系及原理介绍》

本文配套可交互教程已登录阿里云知行动手实curls验室,PC 端登录 start.aliyun.com 在浏览器中立即体验

Spring Cloud Bus 消息总线介绍
                                                原
                                                    荐

Spring Cloud Bus 对自己的定位是 Spring Cloud 体系内的消息总线,使用 message brokspringer 来连接分布式系统的所有节点。Bus 官方的spring面试题 Referespringboot面试题nce 文档比较简单,简单curls到连一张图都没有。

这是最新版的 Spcurlingring Cloud Bucurlss 代码结构(代码量比较少):

Spring Cloud Bus 消息总线介绍
                                                原
                                                    荐

Bus 实例演示

在分析 Bus 的实现之前,我们先来看两个使用 Spring Cloud Bus 的简单例子

1. 所有节点的配置新增

Bus 的例子比较简单,因为 Bus 的 AutoConfiguration 层都有了默认的配置,只需要引入消息中间件对应的 Spring Cloud Stream 以及 Spring Cloud Bus 依赖即可,之后所有启动curly的应用都会使用同一个 Topic 进行消息的接收curling溃疡名词解释和发送。curl

Bus 对应的 Demo 已经放到了 github 上,spring该 Demo 会模拟启动 5 个节点,只需要对其中任意的一个实例新增配置项,所有节点都会新增该配置项。

Demo 地址:https://github.com/fangjian0423/rspringmvc的工作原理ocketmq-binder-demo/tree/master/rocketmq-bus-dcurlyemo

访问任意节点提供的 Controller 提供的获取curls配置的地址(key 为hangzhcurlou):

curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'

所有节点返回的结果都springmvc的工作原理是 unkncurlyown,因为所有节点的配置中没有hangzhou这个 key。

Bus 内部提供了EnvironmentBusEndpoint这个 Endpoint 通过 message broker 用来新增/更新配置。

访问任意节点该 Endpoint对应的url:/actuator/bus-env?name=hangzhou&value=alibaba 进curls行配置项的新增(比如访问 node1 的url):

curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'

然后再次访问所有节点/bus/env获取配置:

$ curl -X GET 'http://localhost:10001/bus/enapache+rocketmqv?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10002springboot常用注解/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://locaspringcloud五大组件lhost:10003/bus/env?key=hangcurlszhou'
unknown%
~ ⌚
$curling溃疡 curl -Xcurl怎么读 GET 'htcurl命令tp:curled//localhost:10004/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhocurledu&value=alibabaspringcloud五大组件' -H 'content-type: application/json'
~ ⌚
$ curl -X GEcurly是什么意思中文T 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'httpspring是什么意思://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhouspring面试题'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'httspring面试题p://localhost:10001/springcloudbus/env?key=curlyhangzhou'
alibaba%

可以看到curly是什么意思中文,所有节点都新增了一个 key 为hanspringboot常用注解gzapache+rocketmqhou的配置,且对应的 value 是alispringboot常用注解baba。这个配置项是通过 Bus 提供的Envirospringcloud五大组件nmentBusEndpoint 完springboot面试题成的。

这里引用 程序猿DD 画curling的一张图片,Spring Cloud Config 配合 Bus 完成所有节点配springboot面试题置的刷新来描述之前的实例(本文实例不是刷新,而是新增配置curly是什么意思中文,但是流程是一样的):

Spring Cloud Bus 消息总线介绍
                                                原
                                                    荐

2. 部分节点的配置修改

比如在springboot nocurlde1 上指定 destination 为 rospringcketmq-bus-node2 ( node2 配置了 spring.clouapache+rocketmqd.bus.id 为rocketmq-bus-node2:10002,可以匹配上) 进行配置的修改:

cuspringcloud五大组件rl -X POST 'http://localhoscurling溃疡名词解释t:10001/aspringboot常用注解ctuator/bus-env/rocketmq-bus-ncurl什么意思ode2?name=hangzhou&value=xihu' -H 'content-type: application/json'

访问/bus/env 获取配置(由于在 node1 上发送消息,Bus 也会对发送方的节点 node1 进行配置修改):

~ ⌚
$ curl -X POST 'htspringboot面试题tp://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'
~ ⌚
$ curl -X GET 'htapache+rocketmqtp://localhost:10005/bus/env?kspringcloudey=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'hcurlingttp://locacurling溃疡名词解释lhost:10004/buspringmvc的工作原理s/env?key=hacurlyngzhou'
alibspringaspringcloudba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bspringcloud五大组件us/curling溃疡env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/curlingbus/env?key=hangzhou'
xihu%
~ ⌚
$ curl -X GET 'hspringbootttp://localhost:10001/buspringboot面试题s/env?key=hangzhou'
xihu%

可以看到,只springcloud五大组件有 node1 和 node2 修改了配置,其余的 3 个节点配置未改变。

Bus 的实现

1. Bus 概curling念介绍

1)事件

Bus 中定义了远程事件RemoteApplicationEvent,该事件继承了 Spring 的事件ApplicaspringtionEvent,而且它目前有 4 个具体的实现:

Spring Cloud Bus 消息总线介绍
                                                原
                                                    荐

  • EnvironmentChangeRemoteApplicationEvent:远程环境变更事件。主要用于接收一个 Map<String,String> 类型的数据并更新到 Spring 上下文中Ecurling溃疡名词解释nvironment 中的事件。文中的实例就是使用这个事件并配合EnvironmentBusEndpoint 和EnvironmentChangeListener 完成的。
  • AckRemspring是什么意思otcurledeApspringboot常用注解plicationEvent:远程确认事件。Bus 内部成功接收到远程事件后会发送回AckRemospringboot面试题teApcurly是什么意思中文plicationEvent确认事件进行确认。
  • RefreshRemoteApplicationEvent: 远程配置刷新事件。配合@RefrcurlingeshScope 以及所有的@Cspringmvc的工作原理onfiguracurltionProperties注解修饰的配置类的动态刷新。
  • UnknownRemoteApplicationEvent:远程未知事件。Bus 内部消息体进行转换远程事件的时候如果发生异常会统一包装成该事件springboot

Bus 内部还存在一个非RemoteApplicationEvent事件 -SentApplicationEvent消息发送事件,配合springcloud五大组件 Trace 进行远程消息发送的记录。

这些事件会配合Aspringboot面试题pplicationListener进行操spring_框架作,比如EnvironmentChanspringboot面试题geRemoteApplicationEvent配了EnvironmentChangeListener进行配置的新增/修改:

public class EnvironmentChangeListener
implements ApplicationListener<EnvironmentChangeRemoteApplicatiocurl怎么读nEvent> {
private static Log log = LogFactcurlory.getLog(Environmenspring_框架tChangeListener.class);
@Autowirspringclouded
private Environmespringmvc的工作原理ntManager env;
@Override
public void onApplicationEvent(EnvironmentChancurlinggeRemoteApplicationEvent event) {
Map<String, String> values = event.getValues();
log.info("Received remote espringcloud五大组件nvironment change request. Keys/values tspring面试题o update "
+ values);
for (Map.Entry<String, String> entry : values.entrySet()) {
env.setProperty(entry.getKey()spring面试题, entry.getValue());
}
}
}

收到其它节点发送来EnvironmentChangeRemoteApplicatcurlingionEven事件之后调用EnvcurlyironmentManager#setProperty进行配置的设置,该方法内部针对每一个配置项都会springboot发送一个EnvironmentChangeEvent事件,然后被ConfigurationPropertspringcloudiesRebinder所监听,进行 rebind 操作新增/更新curling配置。

2)Actuator Espringmvc的工作原理ndpoint

Bus 内部暴露了 2 个 Endpoint,分别是EnspringvironmentBusEndpoint和RefreshBusEndpoint,进行配置的新增/修改以及全局配置刷新。curled它们对应的 Endpoint id 即 url 是bus-env和bus-refresh。

3)配置

Bus 对于消息的发送必定涉及到 Topic、Group 之类的信息,这些内容都被封装到了BusProperties中,其默认的配置前缀为spring.cloud.bus,比如:

  • spring.cloud.bus.springboot常用注解refresh.enabled用于开启/关闭全局刷新的 Listener。
  • spring.cloud.bus.env.enabled 用于开启/关闭配置新springcloud增/修改的 Endpoint。
  • spring.cloud.bus.ack.enabled 用于开启开启/关闭AckRemoteApplicationEvent事件的发送。
  • spring.cloud.bus.trace.ecurlednabled 用于springcloud开启/关闭springmvc的工作原理息记录 Trace 的 Listener。

消息发送涉及到的 Topic 默认用的是springCloudBus,可以配置进行修改,Group 可以设置成广播模式或使用 UUID 配合 offset 为 lastapache+rocketmqest 的模式。

每个 Bus 应用都有一个对应的 Bus id,官方取值方式较复杂:

${vcap.application.name:${spring.application.name:application}}:${vcap.appspringlicationcurl.instance_index:${spring.application.index:${local.server.port:$springboot{server.port:0}}}}:${vcap.application.instspringbootance_id:${random.vcurledalue}}

建议手动配置 Bus id,因为 Bus 远程事件中的 destination 会根据 Bus id 进行匹配:

spring.cloud.bus.id=${spring.application.name}-${server.port}

2curl. Bus 底层分析

Bus 的底层分析无非牵扯到这几个方面:

  • 消息是如何发送的
  • 消息是如何接收的
  • destination 是如何匹配的
  • 远程事件收到后如何触发下一个 action

BusAutoConfiguration自动化配置类被@EnableBinding(SpringCloudBusClient.class)所修饰。

@EnableBinding的apache+rocketmq用法在文章《Spring Cloud Stream 体系及原理介绍》中已经说明,且它的 value 为SpringCloudBusClient.class,会在SpringCloudBusClient中基于代理创curling溃疡名词解释建出 input 和 oucurlytput 的DirectChannel:

public interface SpringCloudBusClient {
String INPUT = "springCloudBusInput";
String OUTPUT = "springCloudBusOutput";
@Output(SpringClspringcloud五大组件oudBusClient.OUTPUT)
MessageChannel springCloudBuscurl怎么读Output();
@Input(SpringClcurledoudBusClientcurly.INPUT)
SubscribableChannel springCloudBusInput();
}

spspring面试题ringCloudBusInput 和 springCloudBusOuspringboottput 这curly两个 Binding 的属性curl怎么读可以通过配置文件进行修改(比如修改 topic):

spring.cloud.stream.bindings:
scurl什么意思pringCloudBusInput:
destination: my-bus-topic
springCloudBusOutput:
destination: my-bus-spring_框架topicurlsc

消息的接收和发送:

// BusAutoConfigspring_框架uration
@EventListenecurledr(clacurl什么意思sses = RemoteApplicationEvent.class) // 1
public void acceptLocal(RemoteApplicationEvent event) {
if (this.serviceMatcher.isFromSelf(event)
&& !(event instanceof AckRemoteApplicacurl命令tionEvent)) { // 2
this.cloudBusOutboundChannel.send(McurlsessageBuilder.wicurl什么意思thPayload(event).build()); // 3
}
}
@StreamListener(SpringCloudBusClient.INPUT) // 4
public void acceptRemospringcloud五大组件te(RemoteApplicationEvent event) {
if (event instanceof AckRemoteApplicationEvent) {
if (this.bus.getTspringboot常用注解race().isEnabled() && !this.serspringviceMatcher.isFromSelf(event)
&& this.applicationEventPublisher != null) { // 5
this.spring_框架applicationEventPublisher.publishEvent(evenspringmvc的工作原理t);
}
// If it's an ACK we are finished processing at this point
return;
}
if (this.serviceMatcher.isForSelf(event)
&&curling溃疡; this.applicatcurling溃疡iocurl什么意思nEventPcurl怎么读ublisher != null) { // 6
if (!this.servspringboot常用注解iceMatcher.isFromSelf(event)) { // 7
this.springbootapplicatcurlsionEventPublisher.publishEvent(event);
}springboot常用注解
if (this.bus.getAck().isEnabled()) { // 8
AckRemoteApplicationEvent ack = new AckRemoteApplcurly是什么意思中文icationEvent(this,curl
tspringhis.serviceMatcher.getServiceId(),
this.bus.getAckcurls().getDestinationService(),
event.getDesspringcloud五大组件tinationServicurl命令ce(), event.getId(), event.getClass());
this.cloudBcurlyusOutboundChannel
.send(MessageBuilder.withPayload(ack).build());
this.applicationEventcurl怎么读Publisher.publishEvent(ack);
}
}
if (this.bucurlings.getTrace().isEnabled() &&springboot this.appspringboot面试题licationEventPublisher != null) { // 9
// We are set to register sent events so publish it for local consumption,
// irrspringbootespective of the origspring是什么意思in
this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
evspringboot常用注解ent.getOriginService(), event.getDestinationService(),
event.getId(), event.getClass()));
}
}
  1. 利用 Spring 事件的监听机制监听本地所有的RemspringoteApplicationEvent远程事件(比如bus-env会在本地发送EnvironmentChangeRemoteApplicationEvent事件,bus-refresh会在本地发送RefreshRemoteApplicationEvent事件,这些事件在这里都会被监听到curling)。

  2. 判断本地接收到的事件不是AckRemoteApplicaspring_框架tionEcurl命令vent远程确认事件(不然会死循环,一直接收消息,发送消息springmvc的工作原理...)以及该事件是应用自身发送出去的(事件发送方是应用自身),如果都满足执行步骤 3。

  3. 构造 Message 并将该远程事件作为 payload,然后使用 Spring Cloud Stream 构造的 Binding name 为 springCloudBusOutput 的 MessageChannel 将消息发送到 broker。

4.@StreamListener注解消费 Springspring_框架 Cloud Stream 构造的 Binding name 为 springCloudBusInput 的 MessageChannel,接收的消息为远程消息。

  1. 如果该远程事件是AckRemoteApplicatispringcloud五大组件onEvent远程确认事件并且应用开启了消curling溃疡名词解释息追踪 trace 开关,同时该远程事件不是应用自身发送的(事件发送方不是应用自身spring_框架,表示事件是其curled它应用发送过来的),那么本地发送AckRemoteApplicationEvent远程确认事件表示应用确认收到了其它应用发送过来的远程事件,流程结束。

  2. 如果该远程事件是其它应用发送给应用自身的(事curly件的接收方是应用自身),那么进行步骤 7 和 8,否spring则执行步骤 9。

  3. 该远程事件不是应用自身发送(事件发送方不是应用自身)的话,将该事件以本地的方式发送出去curled。应用自身一开始已经在本地被对应的消息接收方处理了,无需再次发送。

  4. 如果开启了AckRemoteApplicationEvent远程确认事件的开关,构造AckRemoteApplicationEvencurl什么意思t事件并在远程和本地都发送该事件(本地发送是因为步骤 5 没有进行本地AckRemoteApplicationEvent事件的发送,springboot也就是自身应用对自身应用确认; 远程发送是为了告诉其它应用,自身应用收到了消息)。

  5. 如果开启了消息记录 Trace 的开关,本地构造并发送SentApplicationEvent事件。

bus-env触发后所有节点的Envirocurl什么意思nmentChangeListener监听到了配置的变化,控制台都会打印出以下信息:

o.s.c.b.curledevent.Environmespring是什么意思ntChangeListener  : Received remote environment change request. Kespringmvc的工作原理ys/values to update {hangzhou=alibaba}

如果在本地监听远程确认事件 AckRemcurledoteApplicationEvent,都会收到所有节点的信息,比如 node5 节springboot点的控制台监听到的 AckRemotecurlyApplicationEvent事件如下:

ServiceId [rocketmq-bus-node5:10005] listenecurl命令rs on {"type":"AspringckRemoteApplicaspringtioapache+rocketmqnEvent","timestamp":1554124670484,"originScurling溃疡名词解释ervice":"rocketmq-bus-node5:10005","destinationService":"**","id":"375f0426-c24e-4904-bce1-5e09371fc9bc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationServcurl命令icespringboot面试题":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"curlingAckRemoteApplicationEvent","timestamp":1554124670184,"originService":"rocketmq-bus-nospringmvc的工作原理de1:10001","destincurlingationService":"**","id":"91f06cfspringboot面试题1-4bd9-4dd8-9526-9299a35bb7cc","ackId":"750d033f-356springclouda-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.curl怎么读cloud.bus.event.Espringboot常用注解nspringboot面试题vironmentChangeRemoteApplicationEvent"}
Servspringmvc的工作原理iceId [rocketmq-bus-node5:10005] listeners ocurl怎么读n {"tycurlpe":"AckRespringmotcurlyeApplicationspringboot面试题Event","timestamp":1554124670402,curl什么意思"originService":"rspringocketmq-bus-node2:10002","destinationService":"**","id":"7df3963c-7c3e-4549-9a22-a23fa90a6b85"spring是什么意思,"accurlingkId":"curls750d033f-356a-4aad-8cf0-3481ace869curls8c","ackDestinationService":"**","event":"org.springframcurl命令ework.cloudcurling.bus.event.Envspring面试题ironmentChspringangeRcurling溃疡emoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"Accurling溃疡kRemoteApplicationEvespring面试题nt","timestamp":1554124670406,"originService":"rocketmq-bus-node3:10003","destinationService":"**","id":"728b45ee-5e26-46ccurls2-af1a-e8d1571e5d3a","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670427,"originService":"rocketmq-bus-node4:10004","destinationService":"**","id"spring:"1812fd6d-6f98-4e5b-a38a-4b11springboot常用注解aee08aeb","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestcurlinationService":"**","event":"org.springfrcurling溃疡amework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}

那么回到本章节开头提到的 4 个问题,我们分别做一下解答:

  • 消息是如何发送的: 在BusAutoConfiguratspringcloudion#acceptLocal方法中通过 Sprspringing Cloud Stream 发送事件到springCloudBustopic 中。
  • 消息是如何接收的: 在BusAutoConfiguration#acceptRemote方法中通过 Spring Cloud Stream 接收springClcurly是什么意思中文oudBustopic 的消息。
  • destination 是如何curl命令匹配的:在springcloudBusAutoConfiguration#acceptRspringcloudemote方法中接收远程事件方法里对 destinationspring面试题 进行匹配。
  • 远程springboot常用注解事件收到后如何触发下一个 action: Bus 内部通过 Spring 的事件机制接收本地的RemoteApplicationEvent具体的实现事件再做下一步的动作(比如Ecurling溃疡nvironmentChangeListener接收了EnvironmentChangeRemoteApplicationEvent事件,RefreshListener接收了RefreshRemoteApplicationEvenspring是什么意思t事件)。

总结

Spring Cloud Bus 自身内容还是比较少的,不过还是需要提前了解 Sprincurl什么意思g Cloud Stream 体系以及 Spring 自身的事件机制,在此基础上,才能更好地理解 Spring Cloud Bus 对本地事件和远程springboot常用注解事件的处理逻辑spring_框架

目前 Bus 内置的远程事件较少,大多数为配置相关的事件,我们可以springboot面试题继承RemoteApplicationEvent并配合@RemoteApplicatispringboot面试题onEspring是什么意思ventScan注解构建自身的微服务消息体系。

spring者简介

方剑(花名:洛夜),GitHub ID @fangjian04curled23,开源爱好者,阿里巴巴高级开发工程师,阿里云产品 EDAS 开发,Spring Cloud Alibaba 开源项目负责人之一。