Mikroservis + RabbitMQ uygulaması - 3

RabbitMQ ile mikroservisler konusundaki 3. yazı olan bu yazıya başlarken durumu özetleyeyim. Burada mikroservis mimarisi ile yazılmış bir projemiz bulunuyor. Bunun üzerine temel işlevlerini kullanacak şekilde rabbitmq entegre ediyoruz. Bu işlem için bütün projeyi değiştirmeyeceğiz veya baştan yazmayacağız. Bunun yerine message broker 'ların rolüne, rabbitmq 'nun özelliklerine ve kazanımlarımıza ve dezavantajlarımıza odaklanacağız. Daha önce mail servisine rabbitmq ile ulaşacağımızı zöylemiştik ve rabbitmq 'yu kurmuştuk. Bu yazıda MVC uygulamamızdan mesajları (payload) rabbitmq 'ya göndermeye başlayacağız. Büyük resmimiz yine burada dursun.

Spring AMQP ile mesaj gönderimi

Yaptığımız uygulama spring cloud uygulaması olduğu için spring cloud kütüphanelerini kullanmaya devam edeceğiz. Rabbitmq 'ya mesajlarımızı gönderecek olan bağımlılık yani dependency Spring AMQP 'dir. Bu kütüphane en basit anlamda sizin uygulamanızı rabbitmq 'ya bağlar arka planda. Rabbitmq 'nun nerede olduğunu (bir servis çalışıyordu) ve mesajları nasıl göndereceğini bilir. Farklı yerlerde kurulu rabbitmq 'lar için de ayarlar yapmanız pekala mümkün. Öncelikle MVC uygulamasının pom dosyasına aşağıdaki bağımlılığı ekleyelim.

		
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
		
	

Bu kütüphaneyi kullanabilmek için yapmanız gereken şey exchange, queue ve routing key 'lere göre binding 'ler oluşturmaktadır. Spring boot otomatik olarak bir rabbittemplate (connector) oluşturur ve bean 'lerinizi enjecte eder. Bean oluşturmak için en çok kullanılan yöntem nedir? Tabi bir konfigürasyon sınıfıdır. Aşağıdaki gibi

			
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

@Component
// Burada karmaşa olmaması için bean isimleri mecbur oluyor
// Spring boot ortamında aynı bean türünde birden fazlasını oluşturuyoruz
// Bean isimleri verilmezse yanlış yerde yanlış bean inject edilebilir
public class RabbitConfig
{
	@Bean(name = "emailReceiptTopic")
	public TopicExchange rabbitTopic()
	{
		return new TopicExchange("emailReceiptTopic");
	}

	@Bean(name = "emailReceiptQueue")
	public Queue emailReceiptQueue()
	{
		return new Queue("emailReceiptQueue", true);
	}

	@Bean
	@DependsOn(value =
	{ "emailReceiptTopic", "emailReceiptQueue" })
	public Binding emailReceiptBinding(TopicExchange exchange, Queue emailReceiptQueue)
	{
		// emailReceiptQueue kuyruğunu bir routing key ifadesi ile bir exchange 'e bağladık
		// Exchange içerisinde "email.receipt." ifadesi ile başlayan routing key 'li mesajar emailReceiptQueue 'ya yönlendirilecek
		return BindingBuilder.bind(emailReceiptQueue).to(exchange).with("email.receipt.*");
	}

	@Bean(name = "emailCancelationDirect")
	public DirectExchange rabbitDirect()
	{
		return new DirectExchange("emailCancelationDirect");
	}

	@Bean(name = "emailCancelationQueue")
	public Queue emailCancelationQueue()
	{
		// Bu kuyrukta iletilemeyen mesajlar "email.cancelation.deadletter" route bilgisi ile deadLetterExchange 'e gönderilecek
		// Deadletter isimli exchange burada direct exchange türünde ve topic veya fanout da olabilirdi
		return QueueBuilder.durable("emailCancelationQueue").withArgument("x-dead-letter-exchange", "deadLetterExchange").withArgument("x-dead-letter-routing-key", "email.cancelation.deadletter").build();
	}

	@Bean
	@DependsOn(value =
	{ "emailCancelationDirect", "emailCancelationQueue" })
	public Binding deadLetterBinding(@Value("emailCancelationDirect") DirectExchange direct, @Value("emailCancelationQueue") Queue emailCancelationQueue)
	{
		// emailCancelationQueue kuyruğunu bir routing key ile bir exchange 'e bağladık
		// Exchange içerisinde tam olarak "email.cancelation" ifadesine sahip routing key 'li mesajar emailCancelationQueue 'ya yönlendirilecek
		return BindingBuilder.bind(emailCancelationQueue).to(direct).with("email.cancelation");
	}

	@Bean(name = "deadLetterExchange")
	public DirectExchange deadLetterExchange()
	{
		return new DirectExchange("deadLetterExchange");
	}

	@Bean(name = "deadLetterQueue")
	public Queue deadLetterQueue()
	{
		return QueueBuilder.durable("deadLetterQueue").build();
	}

	@Bean
	@DependsOn(value =
	{ "deadLetterExchange", "deadLetterQueue" })
	public Binding emailCancelationBinding(@Value("deadLetterExchange") DirectExchange deadExchange, @Value("deadLetterQueue") Queue deadLetterQueue)
	{
		// Mesaj alıcı serviste application properties içinde "spring.rabbitmq.listener.simple.default-requeue-rejected=false" ifadesini unutmayınız !!
		// Belli bir routing key ile deadLetterQueue 'yu deadLetterExchange 'e bağladık
		// deadLetterExchange 'e tam olarak "email.cancelation.deadletter" ifadesine sahip routing key ile gelen mesajlar deadLetterQueue 'ya yönlendirilecek
		return BindingBuilder.bind(deadLetterQueue).to(deadExchange).with("email.cancelation.deadletter");
	}

	@Bean
	private RabbitTemplate setReturnCallback(ConnectionFactory connectionFactory)
	{
		RabbitTemplate myCustomTemplate = new RabbitTemplate(connectionFactory);
		// Burası zorunlu olmayan bir loglama işlemi
		ConfirmCallback confirmCallback = new ConfirmCallback()
		{
			@Override
			public void confirm(CorrelationData correlationData, boolean ack, String cause)
			{
				// Bu kısım mesajın rabbitmq 'ya ulaşma durumu ile ilgili bilgi aldığımız yer
				// Burada mesajlar alıcı servise gitmese ve deadletter 'a düşse bile ack bilgisi true döner
				// Dönen verimiz boş gelir çünkü sadece mesaj gönder metodu kullanıyoruz, dönen bir veri beklemiyoruz
				System.err.println("Returned: " + correlationData.getReturned());
				System.err.println("Ack: " + ack);
				System.err.println("Cause: " + cause);
				System.err.println();
			}
		};
		myCustomTemplate.setConfirmCallback(confirmCallback);
		// Kendi custom sınıflarınızı mesaj olarak gönderebilmek için bir converter kullanmanız gerekiyor
		// Aksi takdirde "SimpleMessageConverter only supports String, byte[] and Serializable payloads" hatası alırsınız
		// Template oluşturulmadan önce bu bean oluşmazsa yine aynı hatayı alırsınız
		// Spring AMQP 'deki varsayılan converter "org.springframework.messaging.converter.SimpleMessageConverter" dır
		myCustomTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
		return myCustomTemplate;
	}
}
			
		

Bu kodlarda varsayılan connection factory ile RabbitTemplate sınıfını da oluşturduk. Bu connection zaten rabbitmq 'ya nerede bağlanacağını biliyordu. Ayrıca confirmcallback kullanarak küçük bir loglama da yapmış olduk. Confirmcallback yapısının çalışması için application.properties dosyasında aşağıdaki gibi bir ayar yapmamız gerekiyor.

			
spring.rabbitmq.publisher-confirm-type=correlated
			
		

Bu @Component sınıfı oluşturulacak ve 1 topic ve 2 direct exchange şeklinde 3 exchange, yani 3 binding rabbitmq 'da oluşacaktır. Bu 3 exchange için 3 kuyruk da oluşacaktır. Yani 3 işlem için 1 'er adet exchange, queue ve binding oluşturuyoruz. Sistemde birisi bilet aldığında alındı bilgisini mail ile gönderiyor gibi yapacağız. Bir etkinlik iptal edildiğinde ise bütün kullanıcılara mail gönderiyor gibi yapacağız. Bu işlemde mail servisine ulaşamayan veya hata alan mesajlar deadletter kuyruğuna düşecek ve orada kalacak. Kimse o kuyruğu dinlemiyor çünkü.

Bu bean 'leri MVC uygulamamızda kullanacağız. Fakat ekranlarla ilgili bütün kodları yazmayacağım. Yapmanız gereken şey bir RabbitTemplate sınıfını bağlamak. Zaten konfigürasyonu config sınıfında yukarıda vardı. Mesaj göndermek için bu template üzerinden template.convertAndSend metodunu kullanabilirsiniz. Jackson2JsonMessageConverter ile kendi sınıflarınızı mesajlara çevirmiş olacaksınız. Bu metodu MVC ugulamasında başarılı payment işleminde çalıştırabilirsiniz. Ben burada topic exchange mantığını ifade edebilmek adına bir mantık işlettim. ID 'si çift sayı olanlara özel mail gönderecek şekilde routing key belirledim.

ID 'si tek sayı olanlar için de aynı topic exchange 'e gidecek mesajlar çünkü "email.receipt.special" ve "email.receipt.normal" ifadeleri "email.receipt.*" ile başlıyor. Topic exchange 'de routing için böyle bi tanımlama yaptığımız için aslında burada topic "email.receipt." olmuş oluyor.

			
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

// See RabbitConfig
private RabbitTemplate template;

// Aynı topic ifadesindeki farklı routing key 'lerin aynı kuyruğa gidişini gözlemlemek için
if (infoPojo.getId() % 2 == 0)
{
	// Uygulamada bir yerde converter için bean tanımlaması yaptığınızdan emin olunuz
	// Sistemdeki topic exchange belli bir routing key formatını bekliyor
	template.convertAndSend(emailReceiptTopic.getName(), "email.receipt.special", infoPojo);
}
else
{
	// "email.receipt." ile başladığı için aynı kuyruğa gidiyor
	template.convertAndSend(emailReceiptTopic.getName(), "email.receipt.normal", infoPojo);
}
			
		

Burada bir fonksiyonumuz daha var. Bir etkinliği iptal etmek için bir sayfa tasarladım. Bu blog yazısının odağı olmadığı için yine ekranlarla iligli detayları atlıyorum. İptal mailleri rabbitmq içerisinde bir direct exchange 'e gidecek. Çalışma yapısını göstermek için sistemdeki bütün kullanıcılara mail gönderiyoruz. Burada ayrıca CorrelationData kullanarak gönderdiğimiz mesajın rabbitmq 'ya iletilip iletilmediği bilgisini alıyoruz. Buna acknowledgement deniyor. Bu data ise başka mesajlarla karıştırılmaması için unique bir değer istiyor.

			
for (UserInfoPojo userInfoPojo : resp)
{
	CorrelationData correlationData = new CorrelationData(userInfoPojo.getId().toString());
	// correlationData için kullanıcı id 'sini takip numarası olarak vermiş olduk
	// Iptal işlemi için gereken routing key tam olarak "email.cancelation" olmak zorunda
	template.convertAndSend(emailCancelationDirect.getName(), "email.cancelation", userInfoPojo, correlationData);
}
			
		

RabbitMQ yöntim konsolu

Şimdi bu mesajların nereye gittiğini merak ediyorsunuzdur. Rabbitmq ile ilgili herhangi bir tanım yapmadığınız halde spring amqp rabbitmq ile konuşur ve gerekli exchange, queue ve binding bilgilerini oluşturur. Bu yapıların hepsini rabbitmq yönetim konsolunda görebilirsiniz. Burada ayrıca monitoring, reporting ve ayarlar için ekranlar da var fakat biz konuyu dağıtmayacağız. Yönetim konsoluna ulaşmak için bir plugin kurmanız gerekiyor. Windows ortamında çalışıyorsanız kurulumu aşağıdaki komutla yapabilirsiniz:

<rabbitmq kurulum klasörü>\sbin>rabbitmq-plugins enable rabbitmq_management

Yönetim konsolunu hazırladıktan sonra http://localhost:15672 adresinden ulaşabilirsiniz. Varsayılan kullanıcı adı ve şifresi guest - guest olacaktır. Exchanges sekmesini açtığınızda oluşturulan exchange 'ler listelenecektir. Burada varsayılan exchange 'ler de var ve kullanılabilir fakat isimleri programcılara birşey ifade etmeyeceği için mantıklı olmayacaktır.

RabbitMQ 'ya mesajlar gönderdiyseniz bunları queues sekmesinde görebilirsiniz. aşağıdaki resimdeki sayılara takılmayın. Burada ready kısmında görülenler göndericiden alınmış mesajların sayısıdır. Birileri bu kuyruğu dinlemeye başlayana kadar bunları boşta tutar. Dinleyici metodlar için mail servisini ve rabbitlistener 'ları oluşturacağız ve bu sayılar sıfırlanacak. unacked kısmında yazan sayı ise kuyrukta gönderilmeyi bekleyen mesajlardır. Bir bottleneck durumu oluşup oluşmadığını buradan anlayabilirsiniz. total sayısı ise bu ikisinin toplamını ifade ediyor. Burada dinleyiciden ack alamamış olan mesajlar deadletter kuyruğuna düşecektir ve orada ready durumunda duracaktır. Kimse bu kuyruğu dinlemediği için ready olarak deadletter 'da kalır.

Kuyrukların state bilgisi ise dinenip dinlenmediğini gösteriyor. Normalde kuyruklar idle olarak duruyor ve birileri dinlemeye başladığında mesajları kuyruğa alıp sıra ile göndermeye başlıyor. DLX and DLK ifadeleri bir kuyrukta iletilemeyen mesajlar için deadletter kuyruğuna yönlendirme yapıldığını ve bu işlem için de routing key kullanıldığını gösteriyor. İletilemeyen mesaj ne demek? Bunun simülasyonunu bir sonraki yazıda mail servisinde yapacağız. Son olarak eğer kendiniz yönetim konsolunda exchange ve kuyruk oluşturmak isterseniz aşağıdaki ekranlardaki gibi yapabiliyorsunuz.

Fark ettiyseniz burada da config sınıfında yaptığımız gibi bütün ayarları yapabiliyoruz. Bu noktada rabbitmq 'ya mesaj göndermeyi başardık. Şimdi bu kuyrukları dinlememiz gerekiyor. Yeni yazacağımız mail servisinde bir api olacak. Bu api ayağa kalktığında dinlediği kuyrukları bilgilendirecek. Aynısı tersten de geçerli. Kuyruklar da kendilerine gelen mesajları dinleyicilere haber verecek (observer pattern da olduğu gibi). Bir sonraki yazıda görüşmek üzere :)


Bir yorum yazabilirsiniz