Spring Cloud Stream is used within the Spring Cloud architecture to build highly scalable, event-driven microservices.

There is a lot of content in Spring Cloud Stream itself and it has many external dependencies. To get familiar with Spring Cloud Stream, you need to know the following.

  • Spring Framework(Spring Messaging, Spring Environment)
  • Spring Boot Actuator
  • Spring Boot Externalized Configuration
  • Spring Retry
  • Spring Integration
  • Spring Cloud Stream

The purpose of this article is to introduce Spring Cloud Stream, and with so much knowledge, we will try to take you through Spring Cloud Steam in the simplest way possible (we will replace Spring Cloud Stream with SCS later).

Before we understand SCS, we must understand the Spring Messaging and Spring Integration projects, first let’s look at these two projects.

Spring Messaging

The Spring Messaging module is a module in the Spring Framework whose role is to unify the programming model for messages.

For example, the message Messaging corresponds to a model that includes a message body Payload and a message header:

1
2
3
4
5
package org.springframework.messaging;
public interface Message<T> {
	T getPayload();
	MessageHeaders getHeaders();
}

The message channel MessageChannel is used to receive messages and the send method can be called to send messages to this message channel :

1
2
3
4
5
6
7
8
@FunctionalInterface
public interface MessageChannel {
	long INDEFINITE_TIMEOUT = -1;
	default boolean send(Message<?> message) {
		return send(message, INDEFINITE_TIMEOUT);
	}
	boolean send(Message<?> message, long timeout);
}

How are the messages in the message channel consumed? It is implemented by the subscribable channel SubscribableChannel, a sub-interface of the message channel, which is subscribed to by the MessageHandler message processor:

1
2
3
4
public interface SubscribableChannel extends MessageChannel {
	boolean subscribe(MessageHandler handler);
	boolean unsubscribe(MessageHandler handler);
}

MessageHandler really consumes/processes messages:

1
2
3
4
@FunctionalInterface
public interface MessageHandler {
	void handleMessage(Message<?> message) throws MessagingException;
}

Spring Messaging internally derives other features based on the messaging model, such as

  • Message reception parameter and return value handling: Message reception parameter handler HandlerMethodArgumentResolver with @Header, @Payload and other annotations; Message return value handler after reception HandlerMethodReturnValueHandler with @SendTo annotations
  • Message body content converter MessageConverter
  • Unified abstract message sending template AbstractMessageSendingTemplate
  • Message channel interceptor ChannelInterceptor

Spring Integration

Spring Integration provides an extension to the Spring programming model to support Enterprise Integration Patterns.

Spring Integration is an extension to Spring Messaging. It introduces a number of new concepts, including message routing MessageRoute, message distribution MessageDispatcher, message filtering Filter, message transformation Transformer, message aggregation Aggregator, message splitting Splitter and so on. Also provided are MessageChannel implementations DirectChannel, ExecutorChannel, PublishSubscribeChannel, etc.; MessageHandler implementations MessageFilter, ServiceActivatingHandler, ServiceFilter, ServiceFilter, ServiceFilter, etc. ServiceActivatingHandler, MethodInvokingSplitter, etc.

Several ways of message processing

Segmentation of the message.

Aggregation of messages.

Filtering of messages.

Distribution of messages.

Let’s try Spring Integration with the simplest example:

1
2
3
4
5
6
7
SubscribableChannel messageChannel = new DirectChannel(); // 1

messageChannel.subscribe(msg -> { // 2
  System.out.println("receive: " + msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build()); // 3

This code explains.

  1. Construct a subscribable message channel messageChannel
  2. Use MessageHandler to consume the messages in this message channel
  3. send a message to this message channel, the message is finally consumed by the MessageHandler in the message channel

Finally the console prints: receive: msg from alibaba

DirectChannel has an internal message dispatcher of type UnicastingDispatcher, which distributes to the corresponding message channel MessageChannel, as can be seen from the name, UnicastingDispatcher is a unicast dispatcher, which can only select only one message channel. So how to choose it? The internal LoadBalancingStrategy load balancing strategy is provided, and by default only the polling implementation can be extended.

Let’s modify the above code a bit and use multiple MessageHandlers to handle messages.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
SubscribableChannel messageChannel = new DirectChannel();

messageChannel.subscribe(msg -> {
  System.out.println("receive1: " + msg.getPayload());
});

messageChannel.subscribe(msg -> {
  System.out.println("receive2: " + msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

Since the message dispatcher inside DirectChannel is UnicastingDispatcher unicast and uses a polling load balancing strategy, the two consumptions here correspond to the two MessageHandlers respectively. The console prints out.

1
2
receive1: msg from alibaba
receive2: msg from alibaba

Since there is a unicast message dispatcher UnicastingDispatcher, there must also be a broadcast message dispatcher, that is BroadcastingDispatcher, which is used by the message channel PublishSubscribeChannel. The broadcasting message dispatcher distributes messages to all MessageHandlers.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
SubscribableChannel messageChannel = new PublishSubscribeChannel();

messageChannel.subscribe(msg -> {
  System.out.println("receive1: " + msg.getPayload());
});

messageChannel.subscribe(msg -> {
  System.out.println("receive2: " + msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

Sends two messages, both of which are consumed by all MessageHandlers. Console printing.

1
2
3
4
receive1: msg from alibaba
receive2: msg from alibaba
receive1: msg from alibaba
receive2: msg from alibaba

Spring Cloud Stream

SCS encapsulates Spring Integration, introducing concepts such as Binder, Binding, @EnableBinding, @StreamListener, etc.; integrates with Spring Boot Actuator, providing / bindings, /channels endpoint; integration with Spring Boot Externalized Configuration, providing externalized configuration classes such as BindingProperties, BinderProperties, etc.; enhancements to the message sending The SCS is an enhancement to Spring Integration and provides externalized configuration classes such as BindingProperties, BinderProperties, etc.

SCS is an enhancement to Spring Integration and is integrated with the Spring Boot system and is the foundation of Spring Cloud Bus. It shields the implementation details of the underlying messaging middleware, hoping to unify the set of APIs for sending/consuming messages, and the implementation details of the underlying messaging middleware are done by the Binder of each messaging middleware.

The Binder is a component that provides integration with external messaging middleware and constructs a Binding, providing two methods bindConsumer and bindProducer for constructing producers and consumers respectively. The current official implementations are Rabbit Binder and Kafka Binder, and Spring Cloud Alibaba has an internal implementation of RocketMQ Binder.

As you can see from the diagram, the Binding is the bridge between the application and the messaging middleware, and is used to consume and produce messages.

Let’s take a look at the simplest example using RocketMQ Binder and then analyze the underlying processing principles.

Startup class and sending of messages:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class }) // 1
public class SendAndReceiveApplication {
  
	public static void main(String[] args) {
		SpringApplication.run(SendAndReceiveApplication.class, args);
	}
  
    @Bean // 2
	public CustomRunner customRunner() {
		return new CustomRunner();
	}

	public static class CustomRunner implements CommandLineRunner {

		@Autowired
		private Source source;

		@Override
		public void run(String... args) throws Exception {
			int count = 5;
			for (int index = 1; index <= count; index++) {
				source.output().send(MessageBuilder.withPayload("msg-" + index).build()); // 3
			}
		}
	}
}

Receipt of messages.

1
2
3
4
5
6
7
8
9
@Service
public class StreamListenerReceiveService {

	@StreamListener(Sink.INPUT) // 4
	public void receiveByStreamListener1(String receiveMsg) {
		System.out.println("receiveByStreamListener: " + receiveMsg);
	}

}

The code is very simple and does not involve RocketMQ related code, the messages are sent and received based on the SCS system. If you want to switch to rabbitmq or kafka, just change the configuration file, no code changes are needed.

We analyze the principle of this code:

  1. The two interface properties of @EnableBinding, Source and Sink, are provided internally by SCS, which constructs the BindableProxyFactory based on Source and Sink, and the corresponding output and input methods return a MessageChannel that is a DirectChannel. The value corresponding to the annotation modified by the output and input methods is the name of the binding in the configuration file.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    public interface Source {
        String OUTPUT = "output";
        @Output(Source.OUTPUT)
        MessageChannel output();
    }
    public interface Sink {
        String INPUT = "input";
        @Input(Sink.INPUT)
        SubscribableChannel input();
    }
    

    The names of the bindings in the configuration file are output and input, corresponding to the values in the annotations on the methods of the Source and Sink interfaces.

    1
    2
    3
    4
    5
    6
    7
    
    spring.cloud.stream.bindings.output.destination=test-topic
    spring.cloud.stream.bindings.output.content-type=text/plain
    spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
    
    spring.cloud.stream.bindings.input.destination=test-topic
    spring.cloud.stream.bindings.input.content-type=text/plain
    spring.cloud.stream.bindings.input.group=test-group1
    
  2. Construct the CommandLineRunner and execute the run method of the CustomRunner when the application starts.

  3. Call the output method of the Source interface to get the DirectChannel and send the message to this channel. This is the same code as in the previous Spring Integration chapter

    • After the output in Source sends a message to the DirectChannel message channel, it is processed by the MessageHandler, AbstractMessageChannelBinder#SendingHandler, and then it is delegated to the MessageHandler created by It will then be delegated to the MessageHandler created by AbstractMessageChannelBinder#createProducerMessageHandler (this method is implemented by a different messaging middleware)
    • The MessageHandler returned by the AbstractMessageChannelBinder#createProducerMessageHandler method of the different messaging middleware will internally convert the Spring Message into the Message model of the corresponding middleware and send it to the broker of the corresponding middleware.
  4. Use @StreamListener for message subscription. Note that the value corresponding to Sink.input in the annotation is “input”, which will be configured according to the value corresponding to the binding’s name of input in the configuration file

    • The AbstractMessageChannelBinder#createConsumerEndpoint method corresponding to different message middleware will use the Consumer to subscribe to the message, and will internally convert the Message model corresponding to the middleware into a Spring Message after the message is subscribed.
    • After message conversion, the Spring Message will be sent to the message channel with the name input
    • The StreamListenerMessageHandler corresponding to @StreamListener subscribes to the message channel with the name input and consumes the message

This process is a bit verbose, so I’ll summarize it in a diagram (the yellow part involves the Binder implementation of each message middleware and the basic subscription publishing function of MQ).

At the end of the SCS chapter, let’s look at a piece of code from SCS about the way messages are handled.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@StreamListener(value = Sink.INPUT, condition = "headers['index']=='1'")
public void receiveByHeader(Message msg) {
  System.out.println("receive by headers['index']=='1': " + msg);
}

@StreamListener(value = Sink.INPUT, condition = "headers['index']=='9999'")
public void receivePerson(@Payload Person person) {
  System.out.println("receive Person: " + person);
}

@StreamListener(value = Sink.INPUT)
public void receiveAllMsg(String msg) {
  System.out.println("receive allMsg by StreamListener. content: " + msg);
}

@StreamListener(value = Sink.INPUT)
public void receiveHeaderAndMsg(@Header("index") String index, Message msg) {
  System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg);
}

Did you notice that this code is similar to the code that receives requests in the Spring MVC Controller? In fact, their architecture is similar, and the Spring MVC classes for handling parameters and return values in the Controller are

  • org.springframework.web.method.support.HandlerMethodArgumentResolver
  • org.springframework.web.method.support.HandlerMethodReturnValueHandler

The classes for handling parameters and return values in Spring Messaging have been mentioned before, and are

  • org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver
  • org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler

Their class names are exactly the same, and even the internal method names are the same.

Summary

This is a summary of the SCS system related class descriptions.


Reference https://fangjian0423.github.io/2019/04/03/spring-cloud-stream-intro/