Asp.NET Core – MassTransit & RabbitMQ İle Birlikte Messaging Uygulaması
Merhaba,
Bir önceki .NET Core – MassTransit Kullanarak RabbitMQ İle Messaging başlıklı makalemizde MassTransit ESB kütüphanesini RabbitMQ ile birlikte ele almış ve birçok detayıyla birlikte incelemiştik. Bu içeriğimizde ise bir senaryo dahilinde MassTransit kütüphanesi ile Asp.NET Core tabanlı bir çalışma gerçekleştireceğiz. O halde buyrun vakit kaybetmeden başlayalım…
Senaryo
Pratikte kodlamaya geçmeden önce ilk olarak senaryoyu genel hatlarıyla ortaya koyalım.
Geliştirilen bir ürün yönetim sisteminde aşağıdaki ihtiyaçların karşılanması gerekmektedir.
‘ProductManagement’ isminde oluşturulan bir solution içerisinde;
- Gelen ürünleri karşılayabilmek için ProductManagement.Api Web API projesi oluşturulmalıdır.
- Ürünleri database’e asenkron kaydedebilmek için ProductManagement.Registration console uygulaması oluşturulmalıdır.
- Ürünleri Facebook ve Instagram sosyal medyalarında yayınlamak için ProductManagement.Facebook ve ProductManagement.Instagram console uygulamaları oluşturulmalıdır.
- Ürün hakkında müşterileri bilgilendirmek için ProductManagement.Notification MVC uygulaması oluşturulmalıdır.
- Tüm uygulamalar arasında entegrasyonel kontraktları sağlayacak olan(bir başka deyişle message contract görevini görecek olan) ProductManagement.MessageContracts class library’si oluştrulmalıdır. Bu library içerisinde mesaj türleri, RabbitMQ hesap bilgileri, queue isimleri vs. bilgiler tutulmalıdır.
Bu ihtiyaçlar karşılandıktan sonra işlevsel olarak şöyle bir davranış sergilemesi beklenmektedir; Kullanıcı tarafından gönderilen ürünler ProductManagement.Api aracılığıyla karşılanacak ve ProductManagement.Registration uygulamasına gönderilecektir. ProductManagement.Registration veritabanına kaydettiği ürünleri biryandan da ProductManagement.Facebook ve ProductManagement.Instagram uygulamalarına gönderecek ve böylece sosyal medyada yayınlamış olacaktır. Ayrıca ProductManagement.Notification uygulamasına da göndererek müşterileri ürünlere dair bilgilendirecektir.
İnşaat
Her şeyden önce istenilen uygulamaları oluşturarak başlayalım.
Ardından oluşturulan tüm uygulamalara MassTransit.RabbitMQ kütüphanesini yükleyelim ve yanında gerekli message contract’ları barındıracağı için ProductManagement.MessageContracts dll’ini referans edelim.
Evet… Sanırım inşaat altyapısı hazır 🙂 Şimdi kodlamaya başlayabiliriz.
İlk olarak ProductManagement.MessageContracts projesinde gerekli message contract’ları oluşturarak başlayalım. Bunun için ilgili library’de ‘RabbiMqConstants’ isimli bir sınıf oluşturalım ve içeriğini aşağıdakilerle dolduralım;
public static class RabbitMqConstants { public const string RabbitMqUri = "amqps://wmfjyalu:X0GmitgZM7iXUPmT4uymccexqiTKoxJq@shark.rmq.cloudamqp.com/wmfjyalu"; public const string Username = "wmfjyalu"; public const string Password = "X0GmitgZM7iXUPmT4uymccexqiTKoxJq"; public const string RegistrationServiceQueue = "registration.service"; public const string NotificationServiceQueue = "notification.service"; public const string FacebookServiceQueue = "facebook.service"; public const string InstagramServiceQueue = "instagram.service"; }
Bu değerler proje boyunca kullanacağımız ortak tanımlamalar, RabbitMQ credential bilgileri ve kuyruk isimleridir. Yani kısaca kontratlarımızdır.
Kontratların dışında bize RabbitMQ bağlantılı bir bus(IBusControl) dönecek olan sınıfa da ihtiyacımız olacaktır. Tüm bağlantıları bu sınıf üzerinden gerçekleştirecek ve yöneteceğiz. Dolayısıyla bu işlevi yürütecek olan bu sınıfı yine ProductManagement.MessageContracts library’sinde ‘BusConfigurator’ isminde aşağıdaki gibi oluşturalım;
public static class BusConfigurator { public static IBusControl ConfigureBus(Action<IRabbitMqBusFactoryConfigurator> registrationAction = null) { return Bus.Factory.CreateUsingRabbitMq(factory => { factory.Host(RabbitMqConstants.RabbitMqUri, configurator => { configurator.Username(RabbitMqConstants.Username); configurator.Password(RabbitMqConstants.Password); }); registrationAction?.Invoke(factory); }); } }
Yukarıdaki sınıfa göz atılırsa eğer, ‘IBusControl’ türünden nesne döndüren ve ‘IRabbitMqBusFactoryConfigurator’ türünden ‘registrationAction’ isimli Action’ı parametre olarak alan ‘ConfigureBus’ isimli static bir metot barındırmaktadır. Bu metot, içerisinde Bus.Factory.CreateUsingRabbitMq
ile üretilen bir ‘IBusControl’ nesnesini geriye döndürmektedir. ‘registrationAction’ delegasyonel parametresi ise consumer’lar tarafından gelen mesajları tüketebilmeleri için çağrılması gereken ‘ReceiveEndpoint’ metodunu tetikleyebilmeleri için tanımlanmıştır. Haliyle bunun için ‘IRabbitMqBusFactoryConfigurator’ türünden olan ‘factory’ parametresine ihtiyaç vardır. O yüzden Action<IRabbitMqBusFactoryConfigurator>
olarak tanımlanmıştır…
Evet, artık kontratlarımız hazırlanmış olduğuna göre senaryonun ilk parçası olan ve kullanıcılardan gelen ürünleri karşılayacak olan ProductManagement.Api uygulamasını geliştirerek devam edebiliriz.
[Route("api/[controller]")] [ApiController] public class ProductController : ControllerBase { [HttpPost] public async Task<IActionResult> Post(Product product) { //Ürün karşılanır. //Gerekli ön çalışmalar yapılır. var bus = BusConfigurator.ConfigureBus(); var sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}/{RabbitMqConstants.RegistrationServiceQueue}"); var endPoint = await bus.GetSendEndpoint(sendToUri); await endPoint.Send<IProductRegistrationCommand>(product); return Ok("Gönderdiğiniz ürün tarafımıza ulaşmış ve gerekli işlemler gerçekleştirilmiştir. İlginiz için teşekkür ederiz."); } }
Yukarıda görüldüğü üzere kullanıcı tarafından gönderilen ürünleri karşılayabilmek için ‘ProductController’ oluşturulmuştur. Gelen ‘Product’ nesnesini gerekli ön çalışmalara tabii tuttuktan sonra veritabanına kaydedilmesi ve gerekli sosyal medyalarda yayınlayabilmesi için ProductManagement.Registration projesine iletmemiz gerekmektedir. Bu durumda mesajı ileteceğimiz queue belli olduğundan dolayı direkt command olarak göndermemiz en doğrusu olacaktır. Bu yüzden ‘RegistrationServiceQueue’ sabitindeki kuyruk adına karşılık olan queue’ya gerekli ‘Send’ operasyonu gerçekleştirilmiştir. Burada ilgili mesaj ‘IProductRegistrationCommand’ arayüzü eşliğinde gönderilmiştir. Bu arayüzü ve ‘Product’ modelini ProductManagement.MessageContracts library’sinde oluşturmamız, consumer olan diğer uygulamalar açısından da ortak erişim açısından kullanışlı olacaktır. Tabi siz isterseniz farklı bir katman oluşturabilir ve orada da ortak arayüz ve entity modellerinizi tutabilirsiniz.
namespace ProductManagement.MessageContracts.Commands { public interface IProductRegistrationCommand { string ProductName { get; set; } int Quantity { get; set; } } }
namespace ProductManagement.MessageContracts.Models { public class Product : IProductRegistrationCommand { public string ProductName { get; set; } public int Quantity { get; set; } } }
ProductManagement.MessageContracts uygulamasında gerçekleştirilen bu tanımlamaları yandaki görselde olduğu gibi uygun klasörler içerisinde gerçekleştirdiğimize dikkatinizi çekerim. Command olarak iletilecek tüm mesajların arayüzlerini ‘Commands’ klasörü altında tanımlamaktayım. Nihayetinde içeriğimizin devamında event olarak gönderilecek mesajların arayüzlerinide ‘Events’ isminde bir klasörde tutacağız. Böylece daha derli toplu bir inşa süreci gerçekleştirmiş olmaktayız.
Velhasıl, sıra ProductManagement.Api‘dan gönderilen mesajı alabilmesi için ProductManagement.Registration uygulamasını geliştirmeye gelmiştir. İlgili uygulamanın ‘Program.cs’ dosyasında aşağıdaki çalışmanın yapılması gerekmektedir.
namespace ProductManagement.Registration { class Program { static async Task Main(string[] args) { var bus = BusConfigurator.ConfigureBus(factory => { factory.ReceiveEndpoint(RabbitMqConstants.RegistrationServiceQueue, endpoint => { endpoint.Consumer<ProductRegistrationCommandConsumer>(); }); }); await bus.StartAsync(); await Task.Run(() => Console.ReadLine()); await bus.StopAsync(); } } }
Yukarıda görüldüğü üzere ProductManagement.Registration uygulaması direkt olarak ‘RegistrationServiceQueue’ sabitine karşılık olan kuyruğu dinlemekte ve ‘ProductRegistrationCommandConsumer’ türünden tüketim gerçekleştirmektedir. ‘ProductRegistrationCommandConsumer’ içeriğine göz atarsak eğer;
namespace ProductManagement.MessageContracts.Consumers { public class ProductRegistrationCommandConsumer : IConsumer<IProductRegistrationCommand> { public async Task Consume(ConsumeContext<IProductRegistrationCommand> context) { Console.WriteLine($"{context.Message.ProductName} isimli ürün :"); Console.WriteLine($"Veritabanına kayıt edilmiştir."); Console.WriteLine($"Facebook'ta yayınlanacaktır."); Console.WriteLine($"Instagram'da yayınlanacaktır."); Console.WriteLine("*********************"); } } }
ProductManagement.MessageContracts library’sinde tanımladığımız ‘IProductRegistrationCommand’ türünden gelen mesajları yakalamaktadır.
Bu ve bunun gibi süreçte tanımlayacağımız tüm consumer sınıflarını ProductManagement.MessageContracts library’sin de proje nizamını koruyabilmek için yandaki görselde olduğu gibi ‘Consumers’ klasörü altında toplayacağımızı bilmenizde fayda var.
Bu çalışma neticesinde ProductManagement.Api ile ProductManagement.Registration uygulamaları arasında MassTransit ve RabbitMQ ile gerekli bağlantı sağlanmış bulunmaktadır. Dilerseniz devam etmeden önce bu uygulamaların senkronizasyonunu test edelim ve çalışmamızın bu noktaya kadar olan işlevlerinden emin olalım.
Süper 🙂 Yukarıdaki ekran görüntüsünde olduğu gibi Postman aracılığıyla gönderilen datanın API tarafından karşılandıktan sonra ilgili uygulamaya mesaj tabanlı iletildiğini gözlemlemekteyiz.
İnşaata devam edersek eğer ProductManagement.Registration uygulaması tarafından elde edilen mesajı sosyal medyalarda yayınlayabilmek için ilgili ProductManagement.Facebook ve ProductManagement.Instagram uygulamalarına ve müşterileri ürüne dair bilgilendirmek içinde ProductManagement.Notification uygulamasına göndermeliyiz. Haliyle birden fazla uygulama sadece kendilerine ait kuyruklardan bu mesajları edineceğinden dolayı burada ilgili mesajı event olarak tüm kuyruklara iletmemiz yerinde olacaktır. Yani ilgili mesaj tüm kuyruklara publish edilecektir.
Bunun için ProductManagement.Registration uygulamasının api tarafından iletilen mesajı karşıladığı ‘ProductRegistrationCommandConsumer’ içerisinde çalışmamız gerekmektedir. Çünkü bu sınıf, gelen mesajı yakaladıktan sonra ihtiyacı olan diğer kuyruklara publish edebilmemiz için bize fırsat tanımaktadır.
İlgili sınıfın son hali aşağıdaki gibi olacaktır;
namespace ProductManagement.MessageContracts.Consumers { public class ProductRegistrationCommandConsumer : IConsumer<IProductRegistrationCommand> { public async Task Consume(ConsumeContext<IProductRegistrationCommand> context) { Console.WriteLine($"{context.Message.ProductName} isimli ürün :"); Console.WriteLine($"Veritabanına kayıt edilmiştir."); Console.WriteLine($"Facebook'ta yayınlanacaktır."); Console.WriteLine($"Instagram'da yayınlanacaktır."); Console.WriteLine("*********************"); await context.Publish<IProductEvent>(new Product { ProductName = context.Message.ProductName, Quantity = context.Message.Quantity }); } } }
Görüldüğü üzere api’den gelen mesajı tüm kuyruklara publish etmekteyiz. Dikkat ederseniz publish edilen mesajı ‘IProductEvent’ arayüzünden göndermekteyiz. İlgili arayüzü ProductManagement.MessageContracts library’sinde aşağıdaki gibi ‘Events’ isimli bir klasör altında oluşturmaktayız.
namespace ProductManagement.MessageContracts.Events { public interface IProductEvent { string ProductName { get; set; } int Quantity { get; set; } } }
Tabi ‘Product’ modelini de bu arayüz eşliğinde publish edebilmek için ilgili arayüzden implement etmeyi unutmuyoruz:
public class Product : IProductRegistrationCommand, IProductEvent { public string ProductName { get; set; } public int Quantity { get; set; } }
Bu işlemlerden sonra ProductManagement.MessageContracts library’sinin son hali yandaki gibi olacaktır. Artık bu aşamadan sonra publish edilen mesajları ilgili uygulamalarda consume edebilmek için gerekli consumer sınıflarını oluşturacak ve ardından uygulamalarda ilgili kuyrukları tüketeceğiz.
Şimdi gelin sırasıyla Facebook, Instagram ve Notification consumer’larını oluşturalım ve ardından ilgili uygulamaları kodlamaya geçelim.
‘Facebook’ :
namespace ProductManagement.MessageContracts.Consumers { public class ProductFacebookEventConsumer : IConsumer<IProductEvent> { public async Task Consume(ConsumeContext<IProductEvent> context) { Console.WriteLine($"{context.Message.ProductName} isimli ürün Facebook'ta yayınlanmıştır."); } } }
‘Instagram’ :
namespace ProductManagement.MessageContracts.Consumers { public class ProductInstagramEventConsumer : IConsumer<IProductEvent> { public async Task Consume(ConsumeContext<IProductEvent> context) { Console.WriteLine($"{context.Message.ProductName} isimli ürün Instagram'da yayınlanmıştır."); } } }
‘Notification’ :
namespace ProductManagement.MessageContracts.Consumers { public class ProductNotificationEventConsumer : IConsumer<IProductEvent> { public async Task Consume(ConsumeContext<IProductEvent> context) { Console.WriteLine($"{context.Message.ProductName} isimli ürün yayınlanarak müşteriler bilgilendirilmiştir."); } } }
Consumer’lar hazır. Şimdi sırada, uygulamalar üzerinden ilgili consumer’ları kullanarak kuyruklardaki mesajları tüketmek var.
‘Facebook’ :
namespace ProductManagement.Facebook { class Program { static async Task Main(string[] args) { var bus = BusConfigurator.ConfigureBus(factory => { factory.ReceiveEndpoint(RabbitMqConstants.FacebookServiceQueue, endpoint => { endpoint.Consumer<ProductFacebookEventConsumer>(); }); }); await bus.StartAsync(); await Task.Run(() => Console.ReadLine()); await bus.StopAsync(); } } }
‘Instagram’ :
namespace ProductManagement.Instagram { class Program { static async Task Main(string[] args) { var bus = BusConfigurator.ConfigureBus(factory => { factory.ReceiveEndpoint(RabbitMqConstants.FacebookServiceQueue, endpoint => { endpoint.Consumer<ProductInstagramEventConsumer>(); }); }); await bus.StartAsync(); await Task.Run(() => Console.ReadLine()); await bus.StopAsync(); } } }
‘Notification’ :
namespace ProductManagement.Notification.Controllers { public class ProductNotificationController : Controller { public async Task<IActionResult> Index() { var bus = BusConfigurator.ConfigureBus(factory => { factory.ReceiveEndpoint(RabbitMqConstants.NotificationServiceQueue, endpoint => { endpoint.Consumer<ProductNotificationEventConsumer>(); }); }); await bus.StartAsync(); await Task.Run(() => Console.ReadLine()); await bus.StopAsync(); return View(); } } }
İşte bu kadar… Tüm bu inşa sürecinden sonra tüm uygulamaları derleyip, çalıştıralım ve test edelim.
Veee görüldüğü üzere uygulamalarımız MassTransit sayesinde tam teferruatlı mesaj tabanlı iletişimi gerçekleştirebilmektedir.
Makalemizi sonlandırmadan önce web uygulaması olan ProductManagement.Notification projesinde gelen verileri anlık olarak sayfada gösterebilmek için SignalR teknolojisinden istifade edebiliriz diye düşündüm. Ve bununla ilgili gerekli çalışmayı makalemizin sonunda sunmuş olduğum örnek çalışma projesinde uygulamış bulunmaktayım. İndirmenizi ve incelemenizi tavsiye ederim 🙂
İlgilenenlerin faydalanması dileğiyle…
Sonraki yazılarımda görüşmek üzere…
İyi çalışmalar…
Not : Örnek projeyi indirmek için buraya tıklayınız.
Merhabalar,
Event driven architecture yapılarında, her istek produce cosumer olarak mı kurgulanmalı yoksa farklı api’lara direk istek atılabilir mi? HttpClient vs gibi. Ek olarak hangi durumlarda istek hangi durumlarda queu kullanılmalıdır?
Teşekkürler.
Merhaba,
Microservice yapılanmalarında servisler arası iletişim
olmak üzere üç esasta ele alınmaktadır.
Haliyle ilk iki yaklaşım şahsına münhasır bir davranış gerektiriyor olsa da kimi ihtiyaçlara istinaden her iki yaklaşıma da riayet edilmesi icap edebilir ve hybrid olarak tasarım esas alınabilir. Yani bir projenin genel gidişatını mimarisel tasarımı belirliyor olsada, süreçte öngörülemeyen ihtiyaçların getirileri göz ardı edilemeyeceğinden dolayı yerine göre hangi yaklaşımın kullanılması gerekiyorsa, danışılması taraftarıyız. Yani yeri gelecek request atarsınız, yeri gelecek kuyruktan gelecek olan veriyi dinlersiniz ve yeri gelecek kargo ile kapıdan teslim yaparsınız 🙂
Yani sorunuza cevap olarak, evet istek atabilir ve API servisleriyle gerekli iletişimi sağlayabilirsiniz.
Sevgiler…
Mühteşem
Hocam merhabalar,
Load balance yaptığımız bir projede her iki serverdada iki tane consumer var ikisi tek kuyrugu dinliyor.Hangisi daha uygun ise veriyi o işliyor fakat şöyle bir durum kişi hem mobil hemde web tarafindan veya dış apiden bı ürün satın alabilir hepsinden aynı anda istek atarsa stok bazen 1 düşüyor bütün consumerlar aynı anda işliyor hepsi aynı stok bilgisini çekiyor diğeri güncelleme yapmadan.Boyle bir durumda consumerlari load balance ortamında nasıl çoklayabiliriz sizce?
Teşekkürler.