The Spring Cloud Bus positions itself as a messaging bus within the Spring Cloud system, using a message broker to connect all nodes of a distributed system.

The official Reference documentation for the Bus is relatively simple, so simple that it doesn’t even have a diagram.

This is the most 2.1.0 version of the Spring Cloud Bus code structure (less code)

Bus Example Demonstration

Before we analyze the implementation of the Bus, let’s look at two simple examples of using Spring Cloud Bus.

Configuration of all nodes added

The Bus example is relatively simple because the AutoConfiguration layer of the Bus is configured by default. All you need to do is introduce the Spring Cloud Stream and Spring Cloud Bus dependencies for the messaging middleware. After that, all launched applications will use the same Topic to receive and send messages.

The demo for the Bus is already on github: https://github.com/fangjian0423/rocketmq-binder-demo/tree/master/rocketmq-bus-demo. The demo simulates the launch of 5 nodes, and if you add a configuration item to any one of the instances, it will be added to all nodes.

Access the address provided by the Controller of any node to get the configuration (key is hangzhou).

1
curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'

All nodes return unknown because the key of hangzhou is missing from the configuration of all nodes.

Bus internally provides EnvironmentBusEndpoint which is an Endpoint used to add/update configuration via message broker.

Visit the url /actuator/bus-env?name=hangzhou&value=alibaba of any node to add a configuration item (e.g. visit the url of node1):

1
curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'

Then visit all nodes again /bus/env to get the configuration.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'

~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
alibaba%

You can see that all nodes have a new configuration with key of hangzhou and the corresponding value is alibaba. This configuration item is done through the EnvironmentBusEndpoint provided by Bus.

Spring Cloud Config with Bus completes the refresh of all node configurations to describe the previous example (the example in this article is not a refresh, but a new configuration, but the process is the same).

Configuration modification of some nodes

For example, if you specify destination as rocketmq-bus-node2 on node1 (node2 is configured with spring.cloud.bus.id as rocketmq-bus-node2:10002, which matches), make the following configuration changes.

1
curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'

Access /bus/env to get the configuration (since the message is sent on node1, Bus also makes configuration changes to the sender’s node node1).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'

~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
xihu%
~ ⌚
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
xihu%

As you can see, only node1 and node2 have changed their configurations, while the remaining 3 nodes remain unchanged.

In-depth knowledge of bus

Bus Concept Introduction

Events

The remote event RemoteApplicationEvent is defined in Bus, which inherits from the Spring event ApplicationEvent and which currently has 4 specific implementations.

  • EnvironmentChangeRemoteApplicationEvent: Remote environment change event. This event is mainly used to receive a data of type Map<String, String> and update it to the Environment in Spring context. The examples in this article use this event in conjunction with EnvironmentBusEndpoint and EnvironmentChangeListener.
  • AckRemoteApplicationEvent: The remote acknowledgement event, which is sent back to the AckRemoteApplicationEvent acknowledgement event after the remote event is successfully received inside Bus.
  • RefreshRemoteApplicationEvent: Remote configuration refresh event. Works with @RefreshScope and all configuration classes modified by @ConfigurationProperties annotation for dynamic refreshing
  • UnknownRemoteApplicationEvent: remote unknown event, which is wrapped in the Bus internal message body if an exception occurs when converting a remote event

There is also a non-RemoteApplicationEvent event inside Bus - the SentApplicationEvent message sending event. Logging of remote message sending with Trace

These events work with ApplicationListener, for example EnvironmentChangeRemoteApplicationEvent with EnvironmentChangeListener for adding/modifying configuration.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public class EnvironmentChangeListener
		implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {

	private static Log log = LogFactory.getLog(EnvironmentChangeListener.class);

	@Autowired
	private EnvironmentManager env;

	@Override
	public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {
		Map<String, String> values = event.getValues();
		log.info("Received remote environment change request. Keys/values to update "
				+ values);
		for (Map.Entry<String, String> entry : values.entrySet()) {
			env.setProperty(entry.getKey(), entry.getValue());
		}
	}
}

After receiving the EnvironmentChangeRemoteApplicationEvent event from other nodes, we call EnvironmentManager#setProperty to set the configuration, which internally sends an EnvironmentChangeEvent event for each configuration item. EnvironmentChangeEvent event for each configuration item, which is then listened to by ConfigurationPropertiesRebinder for rebind operation to add/update the configuration.

Actuator Endpoint

Bus exposes 2 Endpoints internally, EnvironmentBusEndpoint and RefreshBusEndpoint, for adding/modifying configuration and global configuration refreshing. Their corresponding Endpoint id i.e. url is bus-env and bus-refresh.

Configuration

Bus for message delivery must involve Topic, Group and so on. These are encapsulated in BusProperties, whose default configuration prefix is spring.cloud.bus.

For example.

  • spring.cloud.bus.refresh.enabled is used to enable/disable the Listener for global refresh.
  • spring.cloud.bus.env.enabled Enable/disable Endpoint for configuration addition/modification.
  • spring.cloud.bus.ack.enabled Enable/disable sending of - AckRemoteApplicationEvent events.
  • spring.cloud.bus.trace.enabled is used to enable/disable the Listener for message logging Trace.

The default Topic used for sending is springCloudBus, which can be modified by configuration, and the Group can be set to broadcast mode or use the UUID with offset of lastest.

Each Bus application has a corresponding Bus id, which is officially taken in the following complex way.

1
${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance_index:${spring.application.index:${local.server.port:${server.port:0}}}}:${vcap.application.instance_id:${random.value}}

It is recommended to configure the Bus id manually, as the destination in the Bus remote event is matched against the Bus id.

1
spring.cloud.bus.id=${spring.application.name}-${server.port}

Bus Underlay Analysis

The underlying analysis of Bus involves no more than a few aspects.

  1. How messages are sent
  2. How the message is received
  3. How the destination is matched
  4. How the next action is triggered after the remote event is received

The BusAutoConfiguration automation configuration class is modified by @EnableBinding(SpringCloudBusClient.class).

The usage of @EnableBinding is explained in the dry run|Introduction to the Spring Cloud Stream system and principles, and its value is SpringCloudBusClient.class, which creates a DirectChannel for input and output based on the proxy in SpringCloudBusClient. creates a DirectChannel for input and output based on the proxy.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public interface SpringCloudBusClient {

	String INPUT = "springCloudBusInput";

	String OUTPUT = "springCloudBusOutput";

	@Output(SpringCloudBusClient.OUTPUT)
	MessageChannel springCloudBusOutput();

	@Input(SpringCloudBusClient.INPUT)
	SubscribableChannel springCloudBusInput();
}

The properties of the springCloudBusInput and springCloudBusOutput Binding can be modified through the configuration file (e.g. by modifying the topic):

1
2
3
4
5
spring.cloud.stream.bindings:
  springCloudBusInput:
    destination: my-bus-topic
  springCloudBusOutput:
    destination: my-bus-topic

Sending of incoming messages.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// BusAutoConfiguration

@EventListener(classes = RemoteApplicationEvent.class) // 1
public void acceptLocal(RemoteApplicationEvent event) {
	if (this.serviceMatcher.isFromSelf(event)
			&& !(event instanceof AckRemoteApplicationEvent)) { // 2
		this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); // 3
	}
}

@StreamListener(SpringCloudBusClient.INPUT) // 4
public void acceptRemote(RemoteApplicationEvent event) {
	if (event instanceof AckRemoteApplicationEvent) {
		if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
				&& this.applicationEventPublisher != null) { // 5
			this.applicationEventPublisher.publishEvent(event);
		}
		// If it's an ACK we are finished processing at this point
		return;
	}
	if (this.serviceMatcher.isForSelf(event)
			&& this.applicationEventPublisher != null) { // 6
		if (!this.serviceMatcher.isFromSelf(event)) { // 7
			this.applicationEventPublisher.publishEvent(event);
		}
		if (this.bus.getAck().isEnabled()) { // 8
			AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
					this.serviceMatcher.getServiceId(),
					this.bus.getAck().getDestinationService(),
					event.getDestinationService(), event.getId(), event.getClass());
			this.cloudBusOutboundChannel
					.send(MessageBuilder.withPayload(ack).build());
			this.applicationEventPublisher.publishEvent(ack);
		}
	}
	if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { // 9
		// We are set to register sent events so publish it for local consumption,
		// irrespective of the origin
		this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
				event.getOriginService(), event.getDestinationService(),
				event.getId(), event.getClass()));
	}
}
  1. Use Spring’s event listening mechanism to listen for all local RemoteApplicationEvent remote events (e.g. bus-env sends EnvironmentChangeRemoteApplicationEvent events locally, bus-refresh sends EnvironmentChangeRemoteApplicationEvent events locally). bus-refresh sends the RefreshRemoteApplicationEvent event locally. These events will be listened to here)
  2. Determine if the event received locally is not an AckRemoteApplicationEvent remote acknowledgement event (otherwise it will be a dead loop, receiving messages and sending messages…) and if the event is sent by the application itself (the sender of the event is the application itself), if both are satisfied, execute step 3
  3. Construct the Message and use the remote event as a payload, then send the message to the broker using a MessageChannel with the Binding name springCloudBusOutput constructed by Spring Cloud Stream.
  4. The @StreamListener annotation consumes a MessageChannel with the Binding name springCloudBusInput constructed by Spring Cloud Stream, and the received message is a remote message.
  5. If the remote event is an AckRemoteApplicationEvent remote acknowledgement event and the application has enabled the message trace switch, and the remote event is not sent by the application itself (the event sender is not the application itself, which means that the event was sent by another application), then the locally sent AckRemoteApplicationEvent The remote acknowledgement event indicates that the application acknowledges the receipt of the remote event sent by another application. End of process
  6. If the remote event is sent from another application to the application itself (the recipient of the event is the application itself), then proceed to steps 7 and 8, otherwise perform step 9
  7. If the remote event is not sent by the application itself (the sender of the event is not the application itself), the event is sent out locally. The application itself has already been handled locally by the corresponding message recipient in the first place, so there is no need to send it again
  8. If the AckRemoteApplicationEvent remote acknowledgement event switch is enabled, construct the AckRemoteApplicationEvent event and send it both remotely and locally (locally because step 5 does not perform local AckRemoteApplicationEvent event is sent locally because step 5 did not perform local AckRemoteApplicationEvent event sending, that is, your own application confirms to your own application; it is sent remotely to tell other applications that your application received the message)
  9. If the message logging trace switch is turned on, the SentApplicationEvent event is constructed and sent locally

The EnvironmentChangeListener of all nodes that listen for configuration changes after the bus-env trigger will print the following message on the console

1
o.s.c.b.event.EnvironmentChangeListener  : Received remote environment change request. Keys/values to update {hangzhou=alibaba}

If the remote acknowledgement event AckRemoteApplicationEvent is listened to locally, all nodes will receive the information, for example, the console of the node5 node listens to the AckRemoteApplicationEvent event as follows.

1
2
3
4
5
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670484,"originService":"rocketmq-bus-node5:10005","destinationService":"**","id":"375f0426-c24e-4904-bce1-5e09371fc9bc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670184,"originService":"rocketmq-bus-node1:10001","destinationService":"**","id":"91f06cf1-4bd9-4dd8-9526-9299a35bb7cc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670402,"originService":"rocketmq-bus-node2:10002","destinationService":"**","id":"7df3963c-7c3e-4549-9a22-a23fa90a6b85","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670406,"originService":"rocketmq-bus-node3:10003","destinationService":"**","id":"728b45ee-5e26-46c2-af1a-e8d1571e5d3a","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670427,"originService":"rocketmq-bus-node4:10004","destinationService":"**","id":"1812fd6d-6f98-4e5b-a38a-4b11aee08aeb","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}

To answer the four questions mentioned at the beginning of this section.

  1. How the message is sent: Send an event to the springCloudBus topic via Spring Cloud Stream in the BusAutoConfiguration#acceptLocal method
  2. How the messages are received: The springCloudBus topic is received in the BusAutoConfiguration#acceptRemote method via Spring Cloud Stream
  3. How destination is matched: The destination is matched in the BusAutoConfiguration#acceptRemote method in the ReceiveRemote event method
  4. How to trigger the next action after the remote event is received: The Bus internally receives the local RemoteApplicationEvent specific implementation event through Spring’s event mechanism and then does the next action (e.g. EnvironmentChangeListener receives the EnvironmentChangeRemoteApplicationEvent event, RefreshListener receives the RefreshRemoteApplicationEvent event)

Summary

The spring Cloud Bus itself is still relatively small, but you need to understand the Spring Cloud Stream system and Spring’s own event mechanism in advance, and then understand the logic of Spring Cloud Bus for handling local and remote events on this basis.

We can inherit RemoteApplicationEvent and build our own microservice messaging system with @RemoteApplicationEventScan annotation.


Reference https://fangjian0423.github.io/2019/04/09/spring-cloud-bus-intro/