Derinlemesine yazılım eğitimleri için kanalımı takip edebilirsiniz...

.NET Core – MassTransit Kullanarak RabbitMQ İle Messaging

Merhaba,

Günümüzde geliştirilen birçok enterprise uygulama, ihtiyaçlar doğrultusunda birbirinden bağımsız platformlarda, distributed bir şekilde çalışmaktadır. Bu yapıların birbirleriyle olan iletişimleri genellikle Messaging yapıları sayesinde gevşek bağlılıkla(loosely coupled) ve asenkron bir şekilde gerçekleştirilmekte ve böylece ölçeklenebilirlik(scalability) ve esneklik(flexibility) sağlanmaktadır. Lakin günümüzde kullanılan birçok messaging yapılanmasından hangisini kullanırsanız kullanın; hata yönetimi, yeniden deneme, bekletme, transaction vs. gibi durumlarla karşı karşıya kalınmakta ve bu şekilde distributed olan uygulamaları çalıştırmak ve yönetmek zorlaşmaktadır. İşte bu zorlukların üstesinden gelmek ve yönetilebilirliği kolaylaştırmak için bir Enterprise Service Bus (ESB) teknolojisi kullanmamız gerekmektedir. Bu duruma istinaden bu içeriğimizde, messaging yapısı olan RabbitMQ kuyruk sisteminin kullanıldığı distributed sistemlerde, ESB teknolojisi olarak MassTransit Framework’ünü ele alacak, nasıl ve ne amaçla kullanıldığını derinlemesine inceliyor olacağız.

Öncelikle Enterprise Service Bus’un(ESB) ve MassTransit’in ne olduğunu daha detaylı açarak başlayalım.

Enterprise Service Bus (ESB) Nedir?

Servisler arası entegrasyon sağlayan komponentlerin bütünüdür diyebiliriz. Yani, bir sistemde bulunan birden çok birbirinden bağımsız servisin, kendi aralarındaki iletişimini ve etkileşimini sağlayan bir sistemdir. Temelde client API için servisler arası ulaşım/transport işlemlerini daha yüksek abstraction seviyesinde çözmek için kullanılmaktadır. Böylece distributed olan uygulamaları rahatlıkla çalıştırmayı ve yönetmeyi amaçlamaktadır.

MassTransit Nedir?

.NET Core - MassTransit Kullanarak RabbitMQ İle Messaging
.NET için geliştirilmiş olan, distributed uygulamaları rahatlıkla yönetebilmeyi ve çalıştırmayı amaçlayan, ücretsiz, open source bir Enterprise Service Bus Framework’üdür. Messaging tabanlı, gevşek bağlı(loosely coupled) ve asenkron olarak tasarlanmış dağınık sistemlerde yüksek dereceli kullanılabilirlik, güvenilirlik ve ölçeklenebilirlik sağlayabilmek için hizmetler oluşturmayı oldukça kolaylaştırmaktadır. İçeriğimiz boyunca MassTransit framework’ü ile seyredeceğimiz için dilerseniz avantajlarını hızlıca değerlendirelim.

MassTransit, tamamen farklı uygulamalar arasında message-based communication yapabilmemizi sağlayan bir transport gateway’idir.

Avantajları

  • Open source ve ücretsizdir,
  • Kullanımı kolaydır,
  • Güçlü mesaj desenleri(message pattern)ni barındırır
  • Distributed transaction sağlar,
  • Test edilebilirdir,
  • Monitoring/İzleme özelliği mevcuttur,
  • Transport işlemlerinin kompleksliğini düşürür,
  • Multiple transport desteği sağlar,
  • Hata yönetimi sağlar,
  • Scheduling/zamanlama mevcuttur,
  • Request/Response pattern’larını destekler,
  • Message broker exchange’lerini yönetebilir
  • vs.
  • vs.

Peki ne amaçla, hangi durumlarda kullanırız?
Microservice architecture yaklaşımlarda tüm servisleri ayırdıktan sonra(yani distributed yaklaşım sergilendikten sonra), asenkron operasyonlara ihtiyaç olursa ve gerçekleştirmeye karar verilirse bu tarz durumlarda message broker’lar kullanılmaktadır. Bu şekilde distributed yaklaşımlarda message broker kullandığı taktirde makalemizin ilk paragraflarında da bahsedildiği gibi hata yönetimi, yeniden deneme, bekletme, transaction vs. gibi meydana gelen durumları rahatlıkla yönetebilmek için tercih edebilir, kullanabiliriz.

Temel Düzeyde MassTransit Kütüphanesinin Kullanımı
Temel düzeyde MassTransit kütüphanesinin kullanımını örneklendirebilmek için bir Console uygulamasından faydalanabiliriz. İlgili uygulamaya dotnet add package MassTransit komutu aracılığıyla MassTransit kütüphanesini yükledikten sonra aşağıdaki kodun geliştirilmesi yeterli olacaktır :

    class Program
    {
        class Message
        {
            public string Text { get; set; }
        }
        static async Task Main(string[] args)
        {
            var bus = Bus.Factory.CreateUsingInMemory(factory =>
                factory.ReceiveEndpoint("test_queue", endpoint =>
                    endpoint.Handler<Message>(async handler => Console.Out.WriteLine($"***\nReceived : { handler.Message.Text}\n***"))
                )
            );

            //Kanal başlatılıyor
            await bus.StartAsync();

            await Task.Run(async () =>
            {
                while (true)
                {
                    Message message = new Message
                    {
                        Text = Console.ReadLine()
                    };
                    if (message.Text.ToUpper() == "C")
                    {
                        //Kanal sonlandırılıyor
                        await bus.StopAsync();
                        break;
                    }
                    await bus.Publish(message);
                }
            });
        }
    }

.NET Core - MassTransit Kullanarak RabbitMQ İle MessagingKod bloğunu incelerseniz eğer 32. satırda publish edilen message, 10. satırda receive edilmektedir. Uygulama derlenip, çalıştırıldığında yandaki görseldeki gibi işlevsellik gösterecektir.

RabbitMQ Nedir?

Asenkron bir şekilde mesajlaşmayı sağlayan open source mesaj kuyruk sistemidir. Makale süreci boyunca RabbitMQ kullanılacaktır. Bu sisteme dair bir bilginiz yoksa eğer tarafımca yazılmış a’dan z’ye RabbitMQ Yazı Dizisinden faydalanabilirsiniz.

MassTransit ile RabbitMQ kullanımını örneklendirmeden önce MassTransit’in mesaj iletim yollarını bilmekte fayda vardır. MassTransit Publish ve Send olmak üzere iki farklı yolla mesaj iletiminde bulunmaktadır.

  • Publish
    .NET Core - MassTransit Kullanarak RabbitMQ İle MessagingEvent tabanlı mesaj iletim yöntemidir. Özünde publish-subscribe pattern’ini uygular. Event publish edildiğinde, o event’e abone olan tüm queue’lara mesaj iletilecektir.
     
     
     
  • Send
    .NET Core - MassTransit Kullanarak RabbitMQ İle MessagingCommand(komut) tabanlı mesaj iletim yöntemidir. Hangi kuyruğa mesaj iletimi gerçekleştirilecekse endpoint olarak bildirilmesi gerekir.

Temel Düzeyde MassTransit Kütüphanesinin RabbitMQ İle Kullanımı

Bu örnekte Send fonksiyonu ile mesajı Command olarak iletmeyi inceleyeceğiz…

MassTransit ile RabbitMQ uygulamasının kullanılabilmesi için ilk olarak ilgili kuyruk sisteminin yüklü olması gerekmektedir. Bunun için direkt olarak Docker‘dan yaralanabilir ve ilgili platform üzerinde ayağa kaldırabilirsiniz. (İlgili makale için bakınız : Docker’da RabbitMQ Ayağa Kaldırma) Ya da RabbitMQ için cloud’u tercih edebilir ve CloudAMQP‘de bir hesap açabilir ve kullanabilirsiniz.

Bizler bu içeriğimizde, MassTransit’i RabbitMQ ile birlikte kullanabilmek için sadece cloud RabbitMQ üzerinden örneklendirecek ve ele alacağız. Örneklendirme için yine Console uygulamaları kullanılacaktır. ‘Producer’ ve ‘Consumer’ olmak üzere iki farklı console uygulaması oluşturulmalı ve her iki uygulamaya dotnet add package MassTransit.RabbitMQ komutu eşliğinde MassTransit.RabbitMQ kütüphanesi kurulmalıdır. Ayrıca her iki projede de iletilecek mesajların türünü belirtebilmek için ‘Shared'(değişebilir, opsiyoneldir) isimli class library’nin oluşturulması ve her iki proje tarafından referans edilmesi gerekmektedir. ‘Shared’ın oluşturulma ve illa mesaj türünün bu katmanda tanımlanma nedenlerini birkaç satır sonra açıklayacağım.

İlk olarak ‘Shared’ projesinde mesaj türünü tanımlayarak başlayalım :
.NET Core - MassTransit Kullanarak RabbitMQ İle Messaging

    public interface IMessage
    {
        string Text { get; set; }
    }

Şimdi MassTransit içerisinde kullanılacak olan mesaj türlerinin neden bu şekilde farklı bir katmanda tanımlandığına değinebiliriz. MassTransit, iletilecek olan mesajların türünü tanımlarken, içerisinde metot barındırmayan ve sadece read-only property imzaları barındıran class yahut interface kullanılmasını tavsiye eder. Ayrıca bu türlerin ‘Producer’ ve ‘Consumer’lar için aynı namespace’den gelecek şekilde ayarlanması gerekmektedir. Yani ‘Shared’da olduğu gibi ortak bir projeden referans alınması kastedilmektedir.

İletilecek mesajların türünü tanımladıktan sonra bu mesajları oluşturacak ve kuyruklara gönderecek olan ‘Producer’ uygulamasını inşa ederek devam edebiliriz.

using MassTransit;
using Shared;
using System;
using System.Security.Authentication;
using System.Threading.Tasks;

namespace Producer
{
    public class Message : IMessage
    {
        public string Text { get; set; }
    }
    class Program
    {
        static async Task Main(string[] args)
        {
            string rabbitMqUri = "amqps://wmfjyalu:X0GmitgZM7iXUPmT4uymccexqiTKoxJq@shark.rmq.cloudamqp.com/wmfjyalu";
            string queue = "test-queue";
            string userName = "wmfjyalu";
            string password = "X0GmitgZM7iXUPmT4uymccexqiTKoxJq";

            var bus = Bus.Factory.CreateUsingRabbitMq(factory =>
            {
                factory.Host(rabbitMqUri, configurator =>
                {
                    configurator.Username(userName);
                    configurator.Password(password);
                });
            });

            var sendToUri = new Uri($"{rabbitMqUri}/{queue}");
            var endPoint = await bus.GetSendEndpoint(sendToUri);

            await Task.Run(async () =>
            {
                while (true)
                {
                    Console.Write("Mesaj yaz : ");
                    Message message = new Message
                    {
                        Text = Console.ReadLine()
                    };
                    if (message.Text.ToUpper() == "C")
                        break;
                    await endPoint.Send<IMessage>(message);
                    Console.WriteLine("");
                }
            });
        }
    }
}

MassTransit ile ‘Producer’de üretilen mesajı ‘Consumer’a iletmek için Send fonksiyonunu kullanacağız. Send fonksiyonu ilgili mesajı nereye ileteceğini bilebilmek için bizden bir endpoint istemektedir. Bu endpoint’te mesajın hangi queue’ya gönderileceği tutulmaktadır. Dolayısıyla Send fonksiyonu hangi kuyruğa mesaj göndereceğini bildiği için bir command olarak iletecek, o şekilde işlevsellik gösterecektir. Dolayısıyla yukarıdaki kod bloğunu incelerseniz eğer 31. satırda ilgili kuyruğun uri’si oluşturulmakta ve 32. satırda ise ‘bus’ üzerinden ‘GetSendEndpoint’ fonksiyonu ile ilgili uri’ye karşılık endpoint elde edilmektedir. 45. satırda ise ilgili endpoint üzerinden Send fonksiyonu çağrılmakta ve IMessage interface’i türünden mesaj nesnesi gönderilmektedir. 9. satıra göz atarsanız eğer iletiyi gönderecek olan ‘Message’ sınıfı ‘Shared’ katmanındaki ‘IMessage’dan türeyecek şekilde inşa edilmekte ve message instance’i ilgili arayüz türünden değer alan Send fonksiyonuna parametre olarak verilmektedir.

Şimdi sıra ‘Consumer’ı geliştirmeye gelmiştir.

using MassTransit;
using Shared;
using System;
using System.Security.Authentication;
using System.Threading.Tasks;

namespace Consumer
{
    public class Message : IMessage
    {
        public string Text { get; set; }
    }
    public class MessageConsumer : IConsumer<IMessage>
    {
        public async Task Consume(ConsumeContext<IMessage> context)
            => Console.WriteLine($"Gelen mesaj : {context.Message.Text}");
    }
    class Program
    {
        static async Task Main(string[] args)
        {
            string rabbitMqUri = "amqps://wmfjyalu:X0GmitgZM7iXUPmT4uymccexqiTKoxJq@shark.rmq.cloudamqp.com/wmfjyalu";
            string queue = "test-queue";

            string userName = "wmfjyalu";
            string password = "X0GmitgZM7iXUPmT4uymccexqiTKoxJq";

            var bus = Bus.Factory.CreateUsingRabbitMq(factory =>
            {
                factory.Host(rabbitMqUri, configurator =>
                {
                    configurator.Username(userName);
                    configurator.Password(password);
                });

                factory.ReceiveEndpoint(queue, endpoint => endpoint.Consumer<MessageConsumer>());
            });
            await bus.StartAsync();
            Console.ReadLine();
            await bus.StopAsync();
        }
    }
}

‘Consumer’, ‘Producer’ın kuyruğa attığı mesajları ‘IConsumer’dan türeyen bir nesneyle karşılamak zorundadır. Yukarıdaki kod bloğunu incelerseniz eğer 36. satırda ‘ReceiveEndpoint’ metodu ile kuyruk dinlenmekte ve endpoint.Consumer<MessageConsumer>() komutu ile gelen mesaj ‘MessageConsumer’ ile karşılanmaktadır. 13. satıra bakarsanız eğer ‘MessageConsumer’ biraz önce bahsedildiği gibi ‘IConsumer’ arayüzünü uygulamakta ve generic olarak yine ‘Shared’da ki ‘IMessage’dan türemiş ‘Message’ type’ını kullanmaktadır. Burada ilgili arayüzün implemente ettirdiği ‘Consume’ fonksiyonu kuyruktan gelecek olan mesajı elde etmemizi sağlayacak olan fonksiyondur.

Bu geliştirmelerden sonra uygulamaları derleyip, ayağa kaldırdığımızda aşağıdaki gibi bir işlevsellik söz konusu olacaktır.

.NET Core - MassTransit Kullanarak RabbitMQ İle MessagingYukarıda görüldüğü üzere MassTransit birbirinden bağımsız iki uygulama arasında RabbitMQ üzerinden kolayca iletişim kurmamızı sağlamaktadır. Burada tüm yönetimi üstlenmekte ve gerekli iletimsel arayüzü uygulamaktadır. Bunu anlayabilmek için RabbitMQ cloud’a göz atılması yeterli olacaktır.
.NET Core - MassTransit Kullanarak RabbitMQ İle MessagingMesaj iletimleri için ismi belirtilen queue adında(test-queue) bir kuyruk oluşturmuş ve
.NET Core - MassTransit Kullanarak RabbitMQ İle Messagingmesajları yönetmek için de birkaç exchange(MassTransit default olarak Fanout exchange kullanmaktadır) dahi oluşturmuş ve tüm bunları bizim yerimize otomatik olarak gerçekleştirmiştir.

Temel Düzeyde MassTransit Kütüphanesinin RabbitMQ İle Kullanımı – 2

Bu örnekte Publish fonksiyonu ile mesajı Event olarak iletmeyi inceleyeceğiz…

MassTransit ile RabbitMQ kullanırken ayriyetten Publish fonksiyonuyla tüm kuyruklara mesaj iletebilmekteyiz. Bu fonksiyon ile MassTransit, var olan kuyrukların hepsine abone olan bir veya birden fazla consumer’lara mesajı iletebilmek için publish neticesinde bir event fırlatmaktadır ve bu event’a subscribe olan tüm consumer’lar ilgili mesajın gönderildiğine dair bilgilendirilmiş olacaktır. Haliyle MassTransit bu yöntemde ‘Siz bizi aramayın, biz sizi ararız!‘ desturuyla yola çıkmakta ve böylece Inversion of Control ve Dependency Injection prensiplerini uygulamaktadır.

Olayı pratik olarak ele alırsak eğer birden fazla farklı kuyruğa bağlı olan ‘Consumer1’, ‘Consumer2’ ve ‘Consumer3’ uygulamaları olduğunu düşünelim ve bunlara publish ile mesaj iletisinde bulunan bir ‘Producer’ uygulaması olduğunu varsayalım.
Tabi süreçte tüm uygulamalarda ortak veri türünü temsil edecek olan ‘IMessage’ arayüzünün tüm uygulamalardan erişilebilir olması için ‘Shared’ katmanınıda unutmayalım. Bu katman bir önceki ‘Send’ örneklendirmesindekiyle birebir aynı amaca hizmet etmektedir ve iletilecek mesaj türünün sadece tek bir namespace altında olmasını ve uygulamalarda bu ortak noktadan referans edilmesini sağlamaktadır. Hatırlarsanız MassTransit uygulamasında mesaj türüne dair bu durumun önemli olduğuna önceden değinmiş ve aksi halde mesaj iletimi üzerinde problem yaşanacağını ifade etmiştik.

Şimdi gelin consumer’ları sırayla kodlayalım ve en son producer’ı ele alalım.

‘Consumer1’;

using MassTransit;
using Shared;
using System;
using System.Threading.Tasks;

namespace Consumer1
{
    public class Message : IMessage
    {
        public string Text { get; set; }
    }
    public class MessageConsumer : IConsumer<IMessage>
    {
        public async Task Consume(ConsumeContext<IMessage> context)
            => Console.WriteLine($"test-queue-1 Gelen mesaj : {context.Message.Text}");
    }
    class Program
    {
        static async Task Main(string[] args)
        {
            string rabbitMqUri = "amqps://wmfjyalu:X0GmitgZM7iXUPmT4uymccexqiTKoxJq@shark.rmq.cloudamqp.com/wmfjyalu";
            string queue = "test-queue-1";

            string userName = "wmfjyalu";
            string password = "X0GmitgZM7iXUPmT4uymccexqiTKoxJq";

            var bus = Bus.Factory.CreateUsingRabbitMq(factory =>
            {
                factory.Host(rabbitMqUri, configurator =>
                {
                    configurator.Username(userName);
                    configurator.Password(password);
                });

                factory.ReceiveEndpoint(queue, endpoint => endpoint.Consumer<MessageConsumer>());
            });
            await bus.StartAsync();
            Console.ReadLine();
            await bus.StopAsync();
        }
    }
}

‘Consumer1’in içeriğine göz atarsanız eğer ‘test-queue-1’ isimli bir kuyruğu dinlemektedir. Ayrıca gelecek mesajları karşılayacağı ‘Message’ nesnesi ‘Shared’ katmanındaki ‘IMassage’dan türemiştir. Bu operasyonu tüm consumer’lar için ve hatta product için yapacağımızı önceden olduğu gibi son kez tekrar ediyorum.

‘Consumer2’;

using MassTransit;
using Shared;
using System;
using System.Threading.Tasks;

namespace Consumer2
{
    public class Message : IMessage
    {
        public string Text { get; set; }
    }
    public class MessageConsumer : IConsumer<IMessage>
    {
        public async Task Consume(ConsumeContext<IMessage> context)
            => Console.WriteLine($"test-queue-2 Gelen mesaj : {context.Message.Text}");
    }
    class Program
    {
        static async Task Main(string[] args)
        {
            string rabbitMqUri = "amqps://wmfjyalu:X0GmitgZM7iXUPmT4uymccexqiTKoxJq@shark.rmq.cloudamqp.com/wmfjyalu";
            string queue = "test-queue-2";

            string userName = "wmfjyalu";
            string password = "X0GmitgZM7iXUPmT4uymccexqiTKoxJq";

            var bus = Bus.Factory.CreateUsingRabbitMq(factory =>
            {
                factory.Host(rabbitMqUri, configurator =>
                {
                    configurator.Username(userName);
                    configurator.Password(password);
                });

                factory.ReceiveEndpoint(queue, endpoint => endpoint.Consumer<MessageConsumer>());
            });
            await bus.StartAsync();
            Console.ReadLine();
            await bus.StopAsync();
        }
    }
}

‘Consumer2’ ise ‘test-queue-2’ isimli kuyruğu dinlemektedir.

‘Consumer3’;

using MassTransit;
using Shared;
using System;
using System.Threading.Tasks;

namespace Consumer3
{
    public class Message : IMessage
    {
        public string Text { get; set; }
    }
    public class MessageConsumer : IConsumer<IMessage>
    {
        public async Task Consume(ConsumeContext<IMessage> context)
            => Console.WriteLine($"test-queue-3 Gelen mesaj : {context.Message.Text}");
    }
    class Program
    {
        static async Task Main(string[] args)
        {
            string rabbitMqUri = "amqps://wmfjyalu:X0GmitgZM7iXUPmT4uymccexqiTKoxJq@shark.rmq.cloudamqp.com/wmfjyalu";
            string queue = "test-queue-3";

            string userName = "wmfjyalu";
            string password = "X0GmitgZM7iXUPmT4uymccexqiTKoxJq";

            var bus = Bus.Factory.CreateUsingRabbitMq(factory =>
            {
                factory.Host(rabbitMqUri, configurator =>
                {
                    configurator.Username(userName);
                    configurator.Password(password);
                });

                factory.ReceiveEndpoint(queue, endpoint => endpoint.Consumer<MessageConsumer>());
            });
            await bus.StartAsync();
            Console.ReadLine();
            await bus.StopAsync();
        }
    }
}

ve sonuncu consumer’ımız da ‘test-queue-3’ isimli kuyruğu dinlemektedir. Burada consumer sayısı sınırsız bir şekilde arttırılabilir. Ya da ilgili kuyruk isimleri parametre olarak alınıp, bir uygulamanın birden fazla instance’ında farklı kuyruk isimlerini dinleyecekleri şekilde ayağada kaldırılabilir. Amma velakin ben burada fiziksel olarak üç farklı uygulama üzerinden örneklendirmeyi yeterli görmekteyim. Artık bu consumer’lara gerekli mesajları iletecek olan producer’ı ele alabiliriz.

‘Producer’;

using MassTransit;
using Shared;
using System;
using System.Threading.Tasks;

namespace Producer
{
    public class Message : IMessage
    {
        public string Text { get; set; }
    }
    class Program
    {
        static async Task Main(string[] args)
        {
            string rabbitMqUri = "amqps://wmfjyalu:X0GmitgZM7iXUPmT4uymccexqiTKoxJq@shark.rmq.cloudamqp.com/wmfjyalu";
            string queue = "test-queue";
            string userName = "wmfjyalu";
            string password = "X0GmitgZM7iXUPmT4uymccexqiTKoxJq";

            var bus = Bus.Factory.CreateUsingRabbitMq(factory =>
            {
                factory.Host(rabbitMqUri, configurator =>
                {
                    configurator.Username(userName);
                    configurator.Password(password);
                });
            });

            await Task.Run(async () =>
            {
                while (true)
                {
                    Console.Write("Mesaj yaz : ");
                    Message message = new Message
                    {
                        Text = Console.ReadLine()
                    };
                    if (message.Text.ToUpper() == "C")
                        break;
                    await bus.Publish<IMessage>(message);
                    Console.WriteLine("");
                }
            });
        }
    }
}

‘Producer’ uygulaması görüldüğü üzere 41. satırda ‘IMessage’ türünden iletiyi ‘Publish’ etmektedir. Dikkat ederseniz publish esnasında ne bir kuyruk adı bildirilmekte ne de herhangi bir endpoint tanımlaması yapılmaktadır. Publish, davranışsal olarak sistemdeki tüm kuyruklara gerekli mesajları iletmek için vardır. Publish neticesinde fırlatılacak event’a subscribe olan tüm consumer’lar mesajları ilgili kuyruklardan tüketebilecek ve böylece bu operasyon MassTransit sayesinde efektif bir şekilde yürütülmüş olacaktır.

Geliştirme aşamasından sonra uygulamaları derleyip, çalıştırırsak eğer;
.NET Core - MassTransit Kullanarak RabbitMQ İle Messaginguygulamaların başarılı bir şekilde mesaj tabanlı iletişim kurduğunu gözlemleyebilmekteyiz.

Publish neticesinde RabbitMQ cloud’una göz atarsak eğer;

Queue Exchanges
.NET Core - MassTransit Kullanarak RabbitMQ İle Messaging .NET Core - MassTransit Kullanarak RabbitMQ İle Messaging
görüldüğü üzere ilgili kuyruklar ve gerekli exchanges’ler MassTransit sayesinde otomatik olarak oluşturulmaktadır.

Birden Fazla Farklı Türde Mesaj Gönderme
.NET Core - MassTransit Kullanarak RabbitMQ İle MessagingProducer, consumer’lara birden fazla farklı türlerde mesajlarda iletmek isteyebilir. Böyle bir durumda bu mesajların türleri yandaki gibi ‘Shared’ katmanında modellenmeli ve operasyonel olarak aşağıdaki gibi işlem gerçekleştirilmelidir :
‘Producer’;

using MassTransit;
using Shared;
using System;
using System.Threading.Tasks;

namespace Producer
{
    public class MessageA : IMessageA
    {
        public string Text { get; set; }
    }
    public class MessageB : IMessageB
    {
        public string Text { get; set; }
    }
    public class MessageC : IMessageC
    {
        public string Text { get; set; }
    }
    class Program
    {
        static async Task Main(string[] args)
        {
            string rabbitMqUri = "amqps://wmfjyalu:X0GmitgZM7iXUPmT4uymccexqiTKoxJq@shark.rmq.cloudamqp.com/wmfjyalu";
            string queue = "test-queue";
            string userName = "wmfjyalu";
            string password = "X0GmitgZM7iXUPmT4uymccexqiTKoxJq";

            var bus = Bus.Factory.CreateUsingRabbitMq(factory =>
            {
                factory.Host(rabbitMqUri, configurator =>
                {
                    configurator.Username(userName);
                    configurator.Password(password);
                });
            });

            await bus.Publish<IMessageA>(new MessageA { Text = "mesaj a" });
            await bus.Publish<IMessageB>(new MessageB { Text = "mesaj b" });
            await bus.Publish<IMessageC>(new MessageC { Text = "mesaj c" });

            Console.WriteLine("Mesajlar gönderilmiştir.");

            Console.Read();
        }
    }
}

Her bir farklı türdeki mesaj ayrı ayrı publish edilmelidir. Tabi burada Send fonksiyonuda tercih edilebilir.

‘Consumer’;

using MassTransit;
using Shared;
using System;
using System.Threading.Tasks;

namespace Consumer
{
    public class MessageA : IMessageA
    {
        public string Text { get; set; }
    }
    public class MessageAConsumer : IConsumer<IMessageA>
    {
        public async Task Consume(ConsumeContext<IMessageA> context)
            => Console.WriteLine($"Gelen mesaj : {context.Message.Text}");
    }
    public class MessageB : IMessageB
    {
        public string Text { get; set; }
    }
    public class MessageBConsumer : IConsumer<IMessageB>
    {
        public async Task Consume(ConsumeContext<IMessageB> context)
            => Console.WriteLine($"Gelen mesaj : {context.Message.Text}");
    }
    public class MessageC : IMessageC
    {
        public string Text { get; set; }
    }
    public class MessageCConsumer : IConsumer<IMessageC>
    {
        public async Task Consume(ConsumeContext<IMessageC> context)
            => Console.WriteLine($"Gelen mesaj : {context.Message.Text}");
    }
    class Program
    {
        static async Task Main(string[] args)
        {
            string rabbitMqUri = "amqps://wmfjyalu:X0GmitgZM7iXUPmT4uymccexqiTKoxJq@shark.rmq.cloudamqp.com/wmfjyalu";
            string queue = "test-queue-1";

            string userName = "wmfjyalu";
            string password = "X0GmitgZM7iXUPmT4uymccexqiTKoxJq";

            var bus = Bus.Factory.CreateUsingRabbitMq(factory =>
            {
                factory.Host(rabbitMqUri, configurator =>
                {
                    configurator.Username(userName);
                    configurator.Password(password);
                });

                factory.ReceiveEndpoint(queue, endpoint =>
                {
                    endpoint.Consumer<MessageAConsumer>();
                    endpoint.Consumer<MessageBConsumer>();
                    endpoint.Consumer<MessageCConsumer>();
                });
            });
            await bus.StartAsync();
            Console.ReadLine();
            await bus.StopAsync();
        }
    }
}

consumer ise tüm türlere karşılık birer nesne kalıtmalı(MessageA, MessageB, MessageC) ve bunların ‘IConsumer’ sınıflarını(MessageAConsumer, MessageBConsumer, MessageCConsumer) oluşturmalıdır. 55 – 57. satır aralığına göz atarsanız eğer ilgili kuyruğa gelen farklı türlerdeki tüm mesajlar karşılanmakta ve türüne göre ilgili ‘IConsumer’ türevi sınıf tetiklenmektedir.
.NET Core - MassTransit Kullanarak RabbitMQ İle MessagingTabi burada gelen mesajların türlerini aşağıdaki gibi farklı varyasyonlarla da ayırt edebilmekteyiz :

                .
                .
                .
                factory.ReceiveEndpoint(queue, endpoint =>
                {
                    endpoint.Consumer<MessageAConsumer>();
                    endpoint.Consumer(() => new MessageBConsumer());
                    endpoint.Consumer(typeof(MessageCConsumer), type => Activator.CreateInstance(type));
                });
                .
                .
                .

İstediğiniz varyasyonu kullanabilirsiniz. Hepsi farklı yöntemle lakin aynı mahiyette işlevsellik gösteren mesaj alıcı overload’lardır.

Aşırı Yoğunluktan Kilitlenme/Deadlock Durumlarına Karşı Önlem Alma
Şu ana kadar MassTransit’in bizim yerimize channel, exchange, queue vs. gibi iletişim süreçlerini otomatik olarak yönettiğini gözlemlemiş olduk. Tüm bunların yanında, çalışma zarfında aşırı yoğunluktan bir deadlock oluştuğunu ve sistemin durduğunu varsayarsak, böyle bir durumda nasıl bir aksiyon alacağımızı? düşünelim ve gelin bunu sorgulayalım…😒Tamam panik yapmayın. Bu durumu sorgulamaya gerek kalmaksızın MassTransit bizim yerimize çözüm getirmekte ve bünyesinde dahili olarak bir CircuitBreaker barındırmaktadır.😁

‘CircuitBreaker’ı olası herhangi bir kesintiye karşı consumer’lar da kullanmamız icap edecektir. Bu duruma örnek olarak aşağıdaki herhangi bir consumer’a karşılık gelen kodu ele alabiliriz :

            .
            .
            .
            var bus = Bus.Factory.CreateUsingRabbitMq(factory =>
            {
                factory.Host(rabbitMqUri, configurator =>
                {
                    configurator.Username(userName);
                    configurator.Password(password);
                });

                factory.ReceiveEndpoint(queue, endpoint => endpoint.Consumer<MessageConsumer>());
                factory.UseCircuitBreaker(configurator =>
                {
                    configurator.TrackingPeriod = TimeSpan.FromMinutes(1);
                    configurator.TripThreshold = 15;
                    configurator.ActiveThreshold = 10;
                    configurator.ResetInterval = TimeSpan.FromMinutes(5);
                });
            });
            .
            .
            .

Yukarıdaki ‘CircuitBreaker’ konfigürasyonunu incelerseniz eğer;

  • TrackingPeriod
    Hata durumlarından sonra ne kadar süre takipte kalınacağını ifade etmektedir.
  • TripThreshold
    Alınan taleplerin yüzdelik olarak ne kadarının hatalı olacağını ifade etmektedir.
  • ActiveThreshold
    Üst üste alınabilecek hata sayısını ifade etmektedir.
  • ResetInterval
    Hata alındığında ne kadar süre beklenmesi gerektiğini ifade etmektedir.

Dolayısıyla bu yapılanmalardan şöyle bir davranışsal durum ortaya çıkmaktadır; Alınan taleplerin %15’inin hatalı olması(TripThreshold) yahut art arda 10 adedinin hatalı gelmesi(ActiveThreshold) durumunda, sistem 5dk bekleyecek(ResetInterval) ve bu bekleyişten sonra 1dk boyunca takipte kalınacak(TrackingPeriod) ve eğer birdaha hata söz konusu olursa ‘TripThreshold’ ve ‘ActiveThreshold’ limitlerini beklemeksizin direkt olarak 5 dk süre ile beklemeye geçecektir(ResetInterval). Ve döngü bundan sonra bu şekilde devam edecektir.

Olur Olmaz Kesintilere Karşı Önlem Alma
Uygulamalarla yahut servislerle olan bağlantıların kopma durumlarında MassTransit mesaj kayıplarına izin vermemekte ve ilgili mesajları queue akışını bozmamak için -hata alınan queue-.error adında bir kuyruk oluşturup buraya taşımaktadır. Haliyle bu kesintiler kalıcı olmayıp, geçici olacağından dolayı tarafımızca belirtilen bir miktar ve aralıkta yeniden mesajları işlemesini isteyebiliriz. Böyle bir durumda retry pattern’ı benimsemiş olan ‘UseMessageRetry’ extension fonksiyonunu kullanabiliriz.

            .
            .
            .
            var bus = Bus.Factory.CreateUsingRabbitMq(factory =>
            {
                factory.Host(rabbitMqUri, configurator =>
                {
                    configurator.Username(userName);
                    configurator.Password(password);
                });

                factory.ReceiveEndpoint(queue, endpoint => endpoint.Consumer<MessageConsumer>());
                factory.UseMessageRetry(r => r.Immediate(5));
            });
            .
            .
            .

Görüldüğü üzere consumer tarafında ‘UseMessageRetry’ ile yapılan kontrolde ‘Immediate’ fonksiyonu ile 5 defa denemesini istemekteyiz. Eğer ki, hata alınan kuyruktaki mesaj bu denemelerde de işlenemeyip, hata devam ederse tekrar ilgili .error kuyruğuna atılıp bir sonraki denemeyi bekleyecektir.

Belirli Bir Süre İçerisinde İşlenecek Mesaj Adedini Belirleme
Bazende MassTransit ile belirli bir süre içerisinde işlenecek mesaj adedini belirtmek isteyebiliriz. İşte bu durumda da ‘RateLimit’ fonksiyonu imdadımıza yetişmektedir.

            .
            .
            .
            var bus = Bus.Factory.CreateUsingRabbitMq(factory =>
            {
                factory.Host(rabbitMqUri, configurator =>
                {
                    configurator.Username(userName);
                    configurator.Password(password);
                });

                factory.ReceiveEndpoint(queue, endpoint => endpoint.Consumer<MessageConsumer>());
                factory.UseRateLimit(1000, TimeSpan.FromMinutes(1));
            });
            .
            .
            .

Yukarıda görüldüğü üzere ilgili servis, 1 dk içerisinde 1000 request yapabilecek şekilde sınırlandırılmıştır.

Nihai olarak;
Message broker’lar ile proje geliştirmenin zahmetli seyrini kolaylaştırabilmek ve süreçteki zahmetin sorumluluğunu devredebilmek için ESB(Enterprise Service Bus)lerle çalışmak kaçınılmazdır. Dolayısıyla böyle bir ihtiyaca istinaden problemleri ve süreci yönetmek açısından MassTransit bence oldukça iyi bir tercih olacaktır.

Uzun ve yorucu olan bu içeriği zahmet edip son noktasına kadar sabır ve sükut ederek okuduğunuz için teşekkür ederim…

İlgilenenlerin faydalanması dileğiyle…
Sonraki yazılarımda görüşmek üzere…
İyi çalışmalar…

Not : Örnek projeleri indirebilmek için aşağıdaki linkleri kullanabilirsiniz.
MassTransit – RabbitMQ Console (Send Function Example)(ExampleMassTransit)
MassTransit – RabbitMQ Console (Publish Function Example)ExampleMassTransitPublish

Bunlar da hoşunuza gidebilir...

1 Cevap

  1. 23 Aralık 2020

    […] önceki .NET Core – MassTransit Kullanarak RabbitMQ İle Messaging başlıklı makalemizde MassTransit ESB kütüphanesini RabbitMQ ile birlikte ele almış ve […]

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

*