top of page

Event Driven Microservices with Spring Cloud Stream and RabbitMQ

The world of microservices is dominated by REST based applications. When we talk about microservices, a natural assumption is a set of individual services talking to each other over HTTP(S) REST. But Event Driven microservices have important and crucial roles to play in any modern, cloud based architecture.


The usage of events to communicate application states or data is not a new concept. Organisations have been using messaging systems such as IBM MQ for several decades. But those were heavyweight, commercial and required dedicated infrastructure to be maintained. With the introduction of AMQP protocol and several open source implementations for it, the usage of events have become easy to adopt.


In an Event Driven Architecture, applications communicate with each other by sending and/or receiving events or messages. These can contain just the metadata about a state / state change or the actual content itself in the body. The communication can be point-to-point or publish-subscribe.


Spring Cloud Stream

Spring Cloud Stream is a project within the Spring Cloud ecosystem. It is a framework that provides a Spring based programming model and adds an abstraction layer on top of the messaging middleware. It enables Spring applications to seamlessly connect to the underlying infrastructure without worrying about the boilerplate code. Spring (VMWare) maintains the binder implementations for RabbitMQ, Kafka, Kafka Streams and Amazon Kinesis while Google PubSub, Azure EventHub, Apache RocketMQ and Solace PubSub binders are maintained by the corresponding organisations.


Spring Cloud Stream can be used in 2 different ways

  • To create a streaming data pipeline using Spring Cloud Dataflow

  • To create Event Driven Microservices using point-to-point or pub-sub models


This article explains the latter - use of Spring Cloud Stream with RabbitMQ to develop Event Driven Microservices.


Spring Cloud Stream with RabbitMQ


RabbitMQ

RabbitMQ is a widely popular messaging platform. It has open source and commercial editions. It is lightweight and can be easily deployed on laptops; directly or using Docker


Using Docker Image:

RabbitMQ can be installed using the below docker command. It’ll install the latest version from docker hub, provided you have already logged in to docker hub.

docker pull rabbitmq

When running the container, do NOT run the rabbitmq container as it doesn’t provide any GUI to browse the exchanges and queues


Run the management plugin using the below command. It’ll start a rabbitmq container with a management GUI.

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management

The UI an be accessed using http://localhost:15672/ with default credentials as guest.


Microservices

Let’s take a look at 3 simple microservices — Producer, Processor and Consumer. Producer will accept a string through a REST endpoint and publish a message to a RabbitMQ topic. Processor will subscribe from the topic, convert the string to uppercase and publish to an output topic. Consumer will subscribe to the topic and print the value in console. These 3 services will demonstrate the Source - Processor - Sink concept in Spring Cloud Stream.


Spring Cloud Stream with RabbitMQ


With the introduction of Functional Programming, the setup of Source, Processor and Sink has become much easier. We will use Functional Programming in this article.


The final code for all 3 services are available in the Git repo.

The world Topic and Exchange are interchangeably used in this article since RabbitMQ represents a Topic as an Exchange and one or more queues bound to that Exchange


Producer

All 3 services require the Cloud Stream, Spring for RabbitMQ and lombok dependencies from start.spring.io. Since Producer will accept the the request via a REST endpoint, it needs the Web dependency as well. This is known as Source service in Spring Cloud Stream.


Create a simple Controller that accepts a String and sends it to a topic.

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class ValueController {

    private StreamBridge streamBridge;

    public ValueController(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @GetMapping("values/{value}")
    public ResponseEntity<String> values(@PathVariable String value) {
        log.info("Sending value {} to topic", value);
        streamBridge.send("values-topic", value);
        return ResponseEntity.ok("ok");
    }
}

As you can see, there is no code or configuration in the Producer microservice that links it to RabbitMQ. The addition of the RabbitMQ binder in the dependency did all the bindings for us. This makes it very easy to switch the underlying messaging provider.


Processor

Processor is a pure Spring Cloud Stream application. It continuously listens to a topic and upon receiving a message, processes it and publishes to an output topic. As a result, this service requires only the Cloud Stream and Spring for RabbitMQ dependencies.


Add a Component class to process the message.

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.function.Function;

@Slf4j
@Component
public class ValueProcessor {

    @Bean
    public Function<String, String> convertToUppercase() {
        return (value) -> {
            log.info("Received {}", value);
            String upperCaseValue = value.toUpperCase();
            log.info("Sending {}", upperCaseValue);
            return upperCaseValue;
        };
    }
}

The processor is represented as a java.util.Function that accepts a String and responds with a String. Functional programming avoids the need for any configurations. When the app starts, Spring will automatically create an input topic and register this method as a listener. On receiving a message, the code will convert the input to uppercase and send to an output topic. Spring will automatically create an output topic as well.


But these topics are created with a default naming standard. They are created as javaMethodName-in-<index> and javaMethodName-out-<index> where index corresponds to the index of the application instance. So, when this app is run in local, the Exchanges will get created as convertToUppercase-in-0 and convertToUppercase-out-0. But the Producer microservice publishes the event to an Exchange named as values-topic. So, unless we override the default Exchange names created by Spring, the message sent by Producer will not be read by Processor as they’ll be sending and listening to different Exchanges.


Map the Processor to value-topic with the below configuration

spring:
  cloud:
    stream:
      bindings:
        convertToUppercase-in-0:
          destination: values-topic
          group: processor
        convertToUppercase-out-0:
          destination: uppercase-values-topic

Adding group is optional but it’ll create a consumer group and a persistent queue. Leaving the group will result in the creation of an anonymous queue that’ll be destroyed when the application stops. Consumer Groups also helps with High Availability and Load Balance but since this is a sample app with just one instance, the primary reason for adding a group is to avoid the creation of anonymous queue.


The output exchange/topic name is also defined in the same yml. The next service (Consumer) has to listen to this exchange to read the message.


Consumer

The list of dependencies for Consumer is same as Processor. It is a terminal service, known as Sink, in the Spring Cloud Stream.

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;

@Slf4j
@Component
public class ValuesConsumer {

    @Bean
    public Consumer<String> onReceive() {
        return (message) -> {
          log.info("Received the value {} in Consumer", message);
        };
    }
}

As seen above, the Sink service is defined by creating a java.util.Consumer bean that acts as the receiver. Since this uses the Functional programming, the default binding is created with the method name, as mentioned in the Processor section. In order to consume the message that Processor sends to uppercase-topic, a configuration needs to be added.

spring:
  cloud:
    stream:
      bindings:
        onReceive-in-0:
          destination: uppercase-values-topic
          group: consumer

With this, we have created a fully functional Spring Cloud Stream. The final code is available in Git. Let’s test this and see how it behaves.


Testing

Start all 3 services. Invoke the Producer by passing a string in lower case. For example, http://localhost:8080/values/hello

Since these are async, stream applications, the response in the browser only indicates a success from the Producer. The logs for Processor should show the below message

Received hello
Sending HELLO

The logs for Consumer should show the below message

Received the value HELLO in Consumer

This shows that the input string given in the URL passed through all 3 services as expected.


Conclusion

Development of event driven microservices with Spring Cloud Stream is simple and easy. It can be done with very minimal code and configuration, thanks to the support for Functional Programming. As evident from the above microservices, Spring Cloud Stream avoids the need for any boilerplate code and removes any dependency for a particular messaging middleware implementation in the code or configuration. Changing RabbitMQ to another implementation such as Azure ServiceBus or EventHub is as simple as changing the gradle dependencies. This helps teams to migrate their on-prem event driven microservices to any of the public clouds without the need to make any code or configuration changes. I will cover this in a subsequent post.



Source: Medium - Sudeep Moothedath


The Tech Platform

0 comments

Comments


bottom of page