Microservice + RabbitMQ application - 4

We are moving forward from where we left of to add rabbitmq messaging mechanism on top on our microsvice with spring cloud. In this 4th post of the series, we will create the listener (consumer) service to be able to retrieve messages queued in rabbitmq. We will create a web service project with spring initializr using spring AMQP and @RabbitListener methods. Let's keep the grand architecture here as usual.

Creating the mail service

There will be a mailing service in our system which we will register to Eureka but won't access via gateway. Let's begin with creating a project in spring initializr like below.

Let me briefly explain the dependencies you already know. This project will be able to respond restful call with spring web services. It will fetch the properties of spring boot project from config server, therefore config repo, with config client dependency. The service will be registered to Eureka with eureka discovery client dependency. Lombok will create getter and setter methods automatically but the main functionality we will use from lombok is the embeded SLF4J library. We will use it for logging as a bonus for this project. Bootstrap dependency will enable the project to set config server properties before connecting to it in the bootstrap phase. Lastly, Spring RabbitMQ is the AMQP implementation. This is the api that listens to rabbitmq queues and sends feedbacks.

You should also add the small util project that we have defined the constant values and utility classes. This will let the project know the custom classes.

			
<dependency>
	<groupId>com.aldimbilet</groupId>
	<artifactId>util</artifactId>
	<version>0.0.1-SNAPSHOT</version>
</dependency>		
			
		

Before diving into AMQP, you need the config repository file named ab-mailservice-local.properties in github private repo. You can also directly download config repository here and add it to your github. The settings in this file should be like this:

			
eureka.client.service-url.defaultZone=http://aldimbilet:eureka@localhost:4442/eureka
			
		

Then you can move on to bootstrap.properties and set profile name, config server connection and app name like this:

			
spring.application.name=ab-mailservice
eureka.instance.instance-id=${spring.application.name}:${random.int(1,10000)}
spring.cloud.config.discovery.service-id=ab-config-server
spring.cloud.config.fail-fast=true
spring.cloud.config.username=aldimbilet
spring.cloud.config.password=config
spring.profiles.active=local
			
		

Than one of the most important property is set in application.properties. We will prevent requeueing of the failed messages, so that rabbitmq can redirect them to deadletter queue and they are not sent over and over.

			
server.port = 0
spring.rabbitmq.listener.simple.default-requeue-rejected=false
			
		

And the bonus. The logging system. It is very easy to integrate slf4j in spring boot with lombok. Since i am not using any kind of logging or monitoring in rabbitmq, i would like to have some sort of logging to know what's going on. You can create a file named logback-spring.xml inside the resources folder, right next to application.properties file. The content of the file could be like this:

			
<configuration debug="false">
	<include resource="/org/springframework/boot/logging/logback/base.xml" />
	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
		<encoder>
			<pattern>My custom logger -> %msg%n</pattern>
		</encoder>
	</appender>
	<root level="info">
		<!-- Every logging inside the application above the info level -->
		<!-- will use the STDOUT named appender with its special format -->
		<!-- This is customizable for specific packages -->
		<!-- It also supports different logging outputs like files -->
		<appender-ref ref="STDOUT" />
	</root>
</configuration>	
			
		

At this point, you can launch up the system with the help of this post. You can also run the mail service and see it in eureka console. But the main purpose of this service is to listen to rabbitmq, so the real deal starts now.

RabbitListener methods

There will be only one class in this service, since we are going to pretend like we are sending emails. Also there is no DB operations or security measures here, therefore we don't have config or repo classes. I am keeping the concept as simple as i can, in order to keep the focus on rabbitmq. We have sent the user info from MVC application to send email to the users. The utility classes were defined in util project already and we have added the dependency to pom file above.

We will have 2 methods in the main class. One will listen to payment events, one will listen to event cancelations. This time, i will write the comments inside the code for detailed explanation. I have also injected SLF4J with lombok. All of the necessary code is below :)

			
import java.util.Random;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Payload;
import com.aldimbilet.pojos.UserInfoPojo;
import lombok.extern.slf4j.Slf4j;

// Lombok directly injects an SLF4J logger inside this class
// This is lombok.extern.slf4j.Slf4j annotation, not usual SLF4J
// This way you can use log.info or log.error directly anywhere you want
@Slf4j
@SpringBootApplication
public class AldimbiletMailServicesApplication
{
	public static void main(String[] args)
	{
		SpringApplication.run(AldimbiletMailServicesApplication.class, args);
	}

	// If you forget this, the app will give "Listener method could not be invoked with the incoming message" error
	// And also you will get "Cannot convert from [[B] to [<your custom class>] for GenericMessage" error as root cause
	@Bean
	public Jackson2JsonMessageConverter producerJackson2MessageConverter()
	{
		return new Jackson2JsonMessageConverter();
	}

	// This method is a rabbitmq queue listener, you can't and won't listen exchanges or routing keys
	// It is listening for the messages in the queue named "emailReceiptQueue"
	// It is working with FIFO logic (first in first out)
	// Make sure you have a converter bean defined somewhere to receive custom classes
	// "emailReceiptQueue" is a topic exchange queue
	// And that topic receives messages with "email.receipt.*" format
	// It could be "email.receipt.special" or "email.receipt.normal" or ...etc
	// You don't have to specify @Payload annotation
	@RabbitListener(queues = "emailReceiptQueue")
	public void sendReceiptEmail(@Payload UserInfoPojo userInfo)
	{
		// Lombok gave us the logger and we can use it
		log.info("sending email receipt to '" + userInfo.getEmail() + "'");
		while (new Random().nextInt(99999999) != 55555555)
		{
			// Some imaginary operation that takes time
			// Purely for demonstration purposes
			// You would probably run a thread and wait for it to complete here
		}
		log.info("Handled the message and done with it");
		// RabbitMQ waits for acknowledgement to send the next message inside the queue
		// Acknowledgement means this method execution is done, even without any return value
		// That is why there is a weird while loop, it simulates this long running process
	}

	// This method is a rabbitmq queue listener, you can't and won't listen exchanges or routing keys
	// It is listening for the messages in the queue named "emailCancelationQueue"
	// It is working with FIFO logic (first in first out)
	// Make sure you have a converter bean defined somewhere to receive custom classes
	// "emailCancelationQueue" is a direct exchange queue
	// And that topic receives messages with exactly "email.cancelation" format, nothing else
	// You don't have to specify @Payload annotation
	@RabbitListener(queues = "emailCancelationQueue")
	public void sendCancelationEmail(@Payload UserInfoPojo userInfo) throws Exception
	{
		// Lombok gave us the logger and we can use it
		log.info("sending cancelation email to '" + userInfo.getEmail() + "'");
		while (new Random().nextInt(99999999) != 55555555)
		{
			// Some imaginary operation that takes time
			// Purely for demonstration purposes
			// You would probably run a thread and wait for it to complete here
		}
		if (new Random().nextInt(10) == 3)
		{
			// Randomly throw an error to imulate dead letters
			throw new Exception("bişeyler bişeyler");
		}
		log.info("Handled the message and done with it");
		// RabbitMQ waits for acknowledgement to send the next message inside the queue
		// Acknowledgement means this method execution is done, even without any return value
		// That is why there is a weird while loop, it simulates this long running process
	}
}
			
		

As you can see in the comments above, we are only listening to 2 queues of the 3 existing queues. The 3rd queue is the deadletter queue and it is managed by rabbitmq. Rabbitmq redirects the failed messages to deadletter queue without retrying or anything. So it looks like the concept of undeliverable in rabbitmq looks like the messages that returns with an exception alongside an ACK info. So the message must be listened and sent in order to be considered as undeliverable. Even if there is an exception, the api signals and acknowledgement. When the queue is not listened, there won't be any undeliverable messages. When the queue is listened, if an exception occurs in the consumer and if the retry count is reached, the message will be redirected to deadletter queue. Otherwise the message would bounce back and forth inbetween. Even it is not the best practice, you can consider messages undeliverable after certain timeout.

With these codes, we ahve completed the consumer side. If you run this service, it will start listening the queues and fetching the messages that we have sent in the previous post. Some of them will throw errors and end up in deadletter queue. Even though we have 2 different routing keys for payment events, the same method will send the emails. Of course you could implement an if-else logic here but it wouldn't make sense to listen to the same queue in that case. Our purpose was to show how different routing keys can end up in the same topic. You must see logs like the image below when this service starts running.

Since there will be delays inbetween the operations, you can see these queues in running state in the rabbitmq management console.

If you have seen the logs in the console and there are no messages left in ready state in rabbitmq, this means the operation was succesful. We have utilized rabbitmq and listened to the messages. Next up we will question ourselves. Did we realy take the right decisions or are we using it the right way. We will defend ourselves because you can't just get a giant product like rabbitmq and use it. People will ask you know. See you at the next post :)


Leave a comment