Microservice – Saga – Events/Choreography Implemantasyonu İle Transaction Yönetimi
Merhaba,
Bir önceki Microservice Mimarilerde Saga Pattern İle Transaction Yönetimi başlıklı makalemizde Saga pattern üzerine detaylıca teorik incelemede bulunmuştuk. Bu içeriğimizde ise Saga pattern’ını Events/Choreography implemantasyonu çerçevesinde ele alacak ve pratikte nasıl bir inşanın söz konusu olduğunu hep beraber adım adım inceliyor olacağız.
Senaryo
İncelememizde senaryo olarak basit bir e-ticaret sürecini ele alıyor olacağız. Bu süreçte Order.API, Stock.API ve Payment.API olmak üzere üç temel microservice bizlere eşlik ediyor olacak. İşlevsel olarak kullanıcıdan gelen sipariş talebi üzerine Order.API bu talebi karşılayacak ve state’i Suspend olan bir sipariş oluşturulacaktır. Ardından sipariş edilen ürünlerin stok verilerinin düşürülmesi için Stock.API servisinde gerekli güncelleme gerçekleştirilecek ve bu işlem başarılı olduktan sonra nihai olarak ödeme işlemi için devreye Payment.API girecek ve gerekli tahsilat gerçekleştirilecektir. Eğer süreçte herhangi bir hata meydana gelirse tüm transactionları compensable transaction ile telafi edeceğiz.
Bu servislerin kendi aralarında haberleşebilmeleri için publish edecekleri ve subscribe olacakları event şeması aşağıdaki gibi olacaktır.
Service | Publish | Subscribe |
---|---|---|
Order Service | OrderCreatedEvent : Sipariş oluşturulduğunu ifade eden event’tir. Stock Service tarafından dinlenmektedir. |
PaymentCompletedEvent PaymentFailedEvent StockNotReservedEvent
|
Stock Service |
StockReservedEvent : Stok güncellemesi başarıyla gerçekleştirildiğinde yayılan event’tir. Payment Service tarafından dinlenmektedir.StockNotReservedEvent : Stok güncellemesinde bir hata ya da tutarsızlık meydana geldiğinde yayılan event’tir. Order Service tarafından dinlenmektedir ve fırlatıldığı taktirde sipariş Fail durumuna çekilir.
|
OrderCreatedEvent PaymentFailedEvent
|
Payment Service |
PaymentCompletedEvent : Ödeme işleminin başarıyla gerçekleştirildiğini ifade eden event’tir. Order Service tarafından dinlenmektedir ve sipariş durumunun Completed olmasını sağlar.PaymentFailEvent : Ödeme sürecinde bir hata meydana geldiğini ifade eden event’tir. Order Service ve Stock Service tarafından dinlenmektedir. Sipariş durumunun Fail olmasını sağlar.
|
StockReservedEvent
|
İşte böyle 🙂 Pratik yapacağımız senaryo ve event şemasını masaya yatırdıktan sonra sıra servislerimizi oluşturmaya gelmiştir.
Servislerin Oluşturulması
Yukarıdaki paragraflarda bahsedildiği gibi Order.API, Stock.API ve Payment.API olmak üzere üç adet servis oluşturacağız. Bunu herhangi bir yöntemle gerçekleştirebileceğiniz gibi aşağıdaki dotnet cli komutlarını kullanarak da gerçekleştirebilirsiniz.
(Service) dotnet new webapi --name Order.API
(Service) dotnet new webapi --name Stock.API
(Service) dotnet new webapi --name Payment.API
(Class Library) dotnet new classlib --name Shared
: Servisler arası mesajlaşma için event’leri vs. bu class library’de oluşturacağız.
Servislere MassTransit Kurulumu ve Temel Konfigürasyonlar
Saga pattern ile servisler arası distributed transaction’ı sağlayabilmek için asenkron bir iletişim modeli kullanacağımızı makalemizin ilk paragrafında referans edilen içeriğimizde konuşmuştuk. Haliyle bizler asenkron iletişim için RabbitMQ sistemini tercih edeceğiz. Lakin uygulama içerisinde salt bir şekilde RabbitMQ’yu kodlamak bizler için bir nebze fazla maliyetli olacağından dolayı hızlı hareket edebilmek ve güvenilir bir şekilde kuyruklara mesajlarımızı iletebilmek için Enterprise Service Bus(ESB) olan MassTransit kütüphanesini kullanacağız.
Dolayısıyla tüm servislerimize MassTransit’i aşağıdaki dotnet cli komutları eşliğinde yükleyelim.
dotnet add package MassTransit
(Library)
dotnet add package MassTransit.AspNetCore
(Library)
dotnet add package MassTransit.RabbitMQ
Ardından tüm servislerin ‘Startup.cs’ dosyasında aşağıdaki konfigürasyonu sağlayarak, ilgili kütüphaneyi ve RabbitMQ servislerini uygulamalara entegre edelim.
public class Startup { . . . public void ConfigureServices(IServiceCollection services) { . . . services.AddMassTransit(configure => { configure.UsingRabbitMq((context, configurator) => { configurator.Host(Configuration.GetConnectionString("RabbitMQ")); }); }); services.AddMassTransitHostedService(); . . . } . . . }
Dikkat ederseniz tüm servislerde bağlantı kurulacak RabbitMQ sunucusunun bilgileri yapılandırma noktasından talep edilmektedir. Bu bilgiyi ister environment‘tan isterseniz de ‘appsettings.json’ dosyasından verebilirsiniz. Misal bizler şimdilik tüm servislerimizin ‘appsettings.json’ dosyasında sunucu bilgilerini aşağıdaki gibi tutalım.
{ "Logging": { "LogLevel": { "Default": "Information", "Microsoft": "Warning", "Microsoft.Hosting.Lifetime": "Information" } }, "AllowedHosts": "*", "ConnectionStrings": { "RabbitMQ": "amqps://pg*******w6iBqJ92iNG_0@elk.r******p.com/pgjnreme" } }
Bizler örneklendirmemizde RabbitMQ’yu cloud servis olarak kullanacağımızdan dolayı gerekli bağlantıyı cloudamqp.com adresindeki instance’a uygun olarak yazmış bulunmaktayız. Eğer ki, RabbitMQ instance’ının cloud ortamda nasıl ayağa kaldırıldığına dair herhangi bir fikriniz yoksa RabbitMQ – Cloud Ortamda(CloudAMQP) Kurulumu başlıklı makaleme göz atmanızı tavsiye ederim.
Velhasıl, tüm servislerde yapılan bu konfigürasyon neticesinde artık microservice yapılanmamızın tüm parçaları asenkron bir iletişim kurabilir niteliğe kavuşmuş demektir. Artık servislerimizi tek tek amaca uygun bir şekilde geliştirebilir ve Saga pattern’ın Choreography implemantasyonunu kullanarak distributed transaction’ı sağlayabiliriz. Hadi buyrun başlayalım.
Servislerin Geliştirilmesi
Şimdi yukarılarda bahsedilen senaryoya uygun bir şekilde e-ticaret yapılanmamızı adım adım kodlamaya geçebiliriz. Burada makalenin hizasına göre kodlanan herhangi bir servis süreçte tekrardan düzeltmek yahut yeni bir event’e göre tekrar kodlanmak gerektirebileceğinden dolayı sayısal olarak başlıklandırılmış adımlarla parça parça ilerleyeceğiz. Dolayısıyla iyi takip etmenizi, yapılan işlemleri yakalayabilmek ve bütünsel olarak yorumlayabilmek için gerekirse sizlerin de bu pratiğe eşlik etmenizi tavsiye ederim. Hadi başlayalım…
- Adım 1 Order.API | Temel Altyapının Atılması
İlk olarak Order.API servisinin geliştirilmesiyle başlayalım.Merkezde bir sipariş sürecini ele alacağımızdan dolayı öncelikle bu süreci modelleyecek olan ‘Order’ ve ‘OrderItem’ entity’lerini oluşturarak başlayalım.
public class Order { public int Id { get; set; } public int BuyerId { get; set; } public List<OrderItem> OrderItems { get; set; } public OrderStatus OrderStatus { get; set; } public DateTime CreatedDate { get; set; } public decimal TotalPrice { get; set; } }
public class OrderItem { public int Id { get; set; } public int ProductId { get; set; } public int Count { get; set; } public decimal Price { get; set; } }
Ardından bu entity’lerin code first yaklaşımı ile veritabanına migrate işlemini gerçekleştirebilmek için bir context nesnesi oluşturalım ve içeriğini aşağıdaki gibi dolduralım.
public class ApplicationDbContext : DbContext { public ApplicationDbContext(DbContextOptions options) : base(options) { } public DbSet<Order> Orders { get; set; } public DbSet<OrderItem> OrderItems { get; set; } }
Devamında ise Microsoft.EntityFrameworkCore.SqlServer kütüphanesini uygulamamıza yükleyerek ‘Startup.cs’ dosyasında aşağıdaki konfigürasyonu yapalım.
public void ConfigureServices(IServiceCollection services) { . . . services.AddDbContext<ApplicationDbContext>(options => options.UseSqlServer(Configuration.GetConnectionString("SQLServer"))); . . . }
Tabi yine burada veritabanının connection string değerini yapılandırma dosyasından almaktayız. Bunun için ‘appsettings.json’ dosyasına ilgili bağlantı cümleciğini aşağıdaki gibi ekleyelim.
"ConnectionStrings": { "SQLServer": "Server=***;Database=SagaChoreographyDB;User Id=***;Password=***;", "RabbitMQ": "amqps://pg*******w6iBqJ92iNG_0@elk.r******p.com/pgjnreme" }
Tüm bu işlemlerden sonra artık sıra migration oluşturma ve migrate etmeye gelmiş bulunmaktadır. Bunun için Microsoft.EntityFrameworkCore.Tools kütüphanesini yükleyerek ‘Package Manager Console’ üzerinden sırasıyla
add-migration mig_1
veupdate-database
talimatlarını verelim. Bu işlemi isterseniz powershell üzerinden dotnet cli komutları aracılığıyla da gerçekleştirebilirsiniz. Bunun için Microsoft.EntityFrameworkCore.Design kütüphanesini yüklemeniz ve sırasıyladotnet ef migrations add mig_1
vedotnet ef database update
talimatlarını vermeniz yeterli olacaktır. Ee haliyle buradaki tercih artık sizindir.Şimdi konfigüre edilen tüm bu altyapı sonrası artık bir endpoint üzerinden sipariş talebinde bulunmaya ve ardından siparişi oluşturtup sonraki microservisimiz olan Stock.API‘a stok işlemleri için bilgi vermeye sıra gelmiş bulunmaktadır.
- Adım 2 Order.API | Sipariş Oluşturma
Sipariş talebinde bulunmak ve bu talep neticesinde sipariş oluşturabilmek için OrdersController.cs isminde bir controller oluşturalım ve içeriğini aşağıdaki gibi dolduralım.[Route("api/[controller]")] [ApiController] public class OrdersController : ControllerBase { readonly ApplicationDbContext _applicationDbContext; public OrdersController(ApplicationDbContext applicationDbContext) { _applicationDbContext = applicationDbContext; } [HttpPost] public async Task<IActionResult> CreateOrder(OrderVM model) { Order.API.Models.Order order = new() { BuyerId = model.BuyerId, OrderItems = model.OrderItems.Select(oi => new OrderItem { Count = oi.Count, Price = oi.Price, ProductId = oi.ProductId }).ToList(), OrderStatus = OrderStatus.Suspend, TotalPrice = model.OrderItems.Sum(oi => oi.Count * oi.Price), CreatedDate = DateTime.Now }; await _applicationDbContext.AddAsync<Order.API.Models.Order>(order); await _applicationDbContext.SaveChangesAsync(); return Ok(true); } }
Yukarıdaki kod bloğuna göz atarsanız eğer ‘CreateOrder’ isimli bir action metot aracılığıyla, içeriği aşağıda verilmiş olan ‘OrderVM’ türünde gelen sipariş talebi gerekli manuel dönüşümler neticesinde oluşturulmakta ve veritabanına eklenmektedir. Burada dikkat etmenizi istediğim husus 23. satırdaki siparişin durumunu ifade eden ‘OrderStatus’ özelliğinin Suspend olarak belirtiliyor olmasıdır. Nihayetinde bu siparişin başarılı bir şekilde sonlanabilmesi için ürünlere dair stok bilgilerinin güncellenmesi ve ödemenin sağlıklı bir şekilde gerçekleştirilebiliyor olması gerekmektedir.
‘OrderVM’ içeriği :public class OrderVM { public int BuyerId { get; set; } public List<OrderItemVM> OrderItems { get; set; } }
- Adım 3 Shared | IEvent ve OrderCreatedEvent Event’ini Oluşturma
Sipariş oluşturulduktan sonra artık sırada “eyy Stock.API, sipariş oluşturuldu sıra sende!” şeklinde diğer servislere haber vermemiz gerekmektedir. Haliyle bunun için bir event fırlatılması gerekmektedir. Bu eventOrderCreatedEvent
olacaktır.Bunun için ortak message ve event türlerini barındıracağımız Shared isimli class library’e ‘Events’ isminde bir klasör oluşturalım ve içerisine öncelikle IEvent isimli bir interface oluşturalım. Bu interface, sistemdeki event class’larını diğerlerinden ayırt edebilmek için bir işaretleyici görevi görecektir.
public interface IEvent { }
Haliyle aynı klasör içerisine
OrderCreatedEvent
isminde bir class oluşturalım ve içeriğini aşağıdaki gibi dolduralım.public class OrderCreatedEvent : IEvent { public int OrderId { get; set; } public int BuyerId { get; set; } public decimal TotalPrice { get; set; } public List<OrderItemMessage> OrderItems { get; set; } } public class OrderItemMessage { public int ProductId { get; set; } public int Count { get; set; } public decimal Price { get; set; } }
Burada
OrderCreatedEvent
içeriğine göz atarsanız eğer hangi alıcının, hangi siparişi, toplam ne kadar fiyata ve hangi ürünlerle(OrderItems/OrderItemMessage) aldığına dair tüm bilgileri tutmaktayız. Haliyle sipariş oluşturulduktan sonra bu event’in kuyruğa gönderilmesi neticesinde Stock.API için gerekli tüm detaylar sağlanmış olacaktır. - Adım 4 Shared | OrderCreatedEvent Event’inin Kuyruk Adının Belirlenmesi
OluşturulanOrderCreatedEvent
event’inin hangi kuyruğa atılacağını belirleyebilmek için bir static sınıf tasarlamamız oldukça uygun olacaktır. Çünkü süreçte birden çok event için farklı kuyruk isimleri belirlememiz ve bu kuyruk isimlerini birden çok yerde kullanılabilir kılmamız gerekecektir.Haliyle Shared class library’si içerisine ‘RabbitMQSettings’ isimli static bir class ekleyerek içerisine kuyruk isimlerini aşağıdaki gibi ekleyelim.
public static class RabbitMQSettings { public const string Stock_OrderCreatedEventQueue = "stock-order-created-queue"; }
Evet, yukarıdaki kod bloğunu incelerseniz eğer tanımlanan kuyruk isimlerini o kuyruğa subscribe olacak olan servisin adını belirterek oluşturduğuma dikkatinizi çekerim. Ve bundan sonra eklenecek olan kuyruk isimlerinde de aynı formata riayet edeceğim. Bu şimdilik irticalen yaptığım ve kabul ettiğim bir formattır. Sizler kendinize göre farklı formatlar belirleyebilirsiniz.
- Adım 5 Order.API | OrderCreatedEvent Event’inin Kuyruğa Atılması
Artık sipariş oluşturulduğunda, bu siparişin oluşturulduğunu Stock.API‘a bildirecek olanOrderCreatedEvent
ve kuyruk adı hazırlanmış olduğuna göre tek yapmamız gereken ‘OrdersController.cs’de aşağıdaki ekstralarda bulunmaktır;[Route("api/[controller]")] [ApiController] public class OrdersController : ControllerBase { readonly ApplicationDbContext _applicationDbContext; readonly IPublishEndpoint _publishEndpoint; public OrdersController( ApplicationDbContext applicationDbContext, IPublishEndpoint publishEndpoint) { _applicationDbContext = applicationDbContext; _publishEndpoint = publishEndpoint; } [HttpPost] public async Task<IActionResult> CreateOrder(OrderVM model) { Order.API.Models.Order order = new() { BuyerId = model.BuyerId, OrderItems = model.OrderItems.Select(oi => new OrderItem { Count = oi.Count, Price = oi.Price, ProductId = oi.ProductId }).ToList(), OrderStatus = OrderStatus.Suspend, TotalPrice = model.OrderItems.Sum(oi => oi.Count * oi.Price), CreatedDate = DateTime.Now }; await _applicationDbContext.AddAsync<Order.API.Models.Order>(order); await _applicationDbContext.SaveChangesAsync(); OrderCreatedEvent orderCreatedEvent = new() { OrderId = order.Id, BuyerId = order.BuyerId, TotalPrice = order.TotalPrice, OrderItems = order.OrderItems.Select(oi => new OrderItemMessage { Price = oi.Price, Count = oi.Count, ProductId = oi.ProductId }).ToList() }; await _publishEndpoint.Publish(orderCreatedEvent); return Ok(true); } }
Yukarıdaki kod bloğunu incelerseniz eğer 6. satırda tanımlanmış olan ‘IPublishEndpoint’ referansına dependency injection ile gerekli publish işlemini icra edecek nesne talep edilmekte ve 36. satırda üretilen
OrderCreatedEvent
nesnesi 48. satırda ‘Publish’ metoduyla RabbitMQ kuyruğuna gönderilmektedir.Hoca neden kuyruk adını burada kullanmadın? diye sorduğunuzu duyar gibiyim. Burada dikkat ederseniz eğer bir event publish edilmektedir. Haliyle publish yönteminde yayınlanan event her ne ise o event’e subscribe olan tüm consumer’lara bu mesaj iletilmiş olacaktır. O yüzden mesaj yayınlama sürecinde herhangi bir kuyruk adı bildirilmemiştir lakin birazdan bu event’e subscribe olacak olan ilgili servis(lerde)te kuyruk adının belirtildiğini görüyor olacağız. Nihayetinde -bu event’e subscribe olanlar şu kuyruk adından ilgili mesajı alsınlar- dememiz gerekmektedir.
- Adım 6 Stock.API | OrderCreatedEvent Event’e Subscribe Olunması ve Mesajın Okunması
Order.API‘da oluşturulan mesajın Stock.API‘da yakalanabilmesi için öncelikle Stock.API‘da bir consumer oluşturulması gerekmektedir. Bunun için ilgili serviste ‘Consumers’ isimli bir klasör ekleyerek içerisine ‘OrderCreatedEventConsumer’ isimli bir sınıf oluşturalım.public class OrderCreatedEventConsumer : IConsumer<OrderCreatedEvent> { readonly MongoDbService _mongoDbService; readonly ISendEndpointProvider _sendEndpointProvider; readonly IPublishEndpoint _publishEndpoint; public OrderCreatedEventConsumer( MongoDbService mongoDbService, ISendEndpointProvider sendEndpointProvider, IPublishEndpoint publishEndpoint) { _mongoDbService = mongoDbService; _sendEndpointProvider = sendEndpointProvider; _publishEndpoint = publishEndpoint; } public async Task Consume(ConsumeContext<OrderCreatedEvent> context) { List<bool> stockResult = new(); IMongoCollection<Models.Stock> collection = _mongoDbService.GetCollection<Models.Stock>(); //Sipariş edilen ürünlerin stok miktarı sipariş adedinden fazla mı? değil mi? foreach (OrderItemMessage orderItem in context.Message.OrderItems) stockResult.Add((await collection.FindAsync(s => s.ProductId == orderItem.ProductId && s.Count > orderItem.Count)).Any()); //Eğer fazlaysa sipariş edilen ürünlerin stok miktarı güncelleniyor. if (stockResult.TrueForAll(sr => sr.Equals(true))) { foreach (OrderItemMessage orderItem in context.Message.OrderItems) { Models.Stock stock = await (await collection.FindAsync(s => s.ProductId == orderItem.ProductId)).FirstOrDefaultAsync(); stock.Count -= orderItem.Count; await collection.FindOneAndReplaceAsync(x => x.ProductId == orderItem.ProductId, stock); } } // *** // StockReservedEvent // *** // OR // *** // StockNotReservedEvent // *** } }
Yukarıdaki consumer sınıfına göz atarsanız eğer MassTransit kütüphanesinden gelen ‘IConsumer<OrderCreatedEvent>’ interface’inden türemektedir. Haliyle böylece bu sınıfın
OrderCreatedEvent
türünden event’e subscribe olan bir consumer olduğu resmileşmektedir. İlgili sınıfın içerisine bakarsanız MongoDB ile ilgili çalışmaların olduğunu göreceksiniz. Evet, zenginlik maksatlı Stock.API‘da MongoDB’yi kullanmış bulunmaktayız. Burada ekstra olarak birazdan içeriğini vereceğimiz ‘MongoDbService’ tüm MongoDB işlevselliğini üstlenmekte ve ‘Consume’ metodu içerisinde gerekli operasyonu yürütmektedir.İlgili metoda göz atarsanız eğer 23. satırda sipariş edilen ürünlerin stok miktarının sipariş adedinden fazla olup olmadığı kontrol edilmekte ve 27 ile 35. satır aralığında fazla ise sipariş edilen ürünlerin stok miktarları düşürülmektedir. Yazımızın devamında ise bu consumer tarafından işlemlerin nihai olarak başarılı ya da başarısız olması durumuna göre ya
StockReservedEvent
ya daStockNotReservedEvent
event’leri publish edilecektir. Bu detayları ilgili adımlara geldiğimizde konuşacağız. Şimdilik kodu bu şekilde bırakıp yola devam ediyoruz.‘MongoDbService’ içeriği :
public class MongoDbService { readonly IMongoDatabase _database; public MongoDbService() { MongoClient client = new("mongodb://localhost:27017"); _database = client.GetDatabase("SagaChoreographyDB"); } public IMongoCollection<T> GetCollection<T>() => _database.GetCollection<T>(typeof(T).Name.ToLowerInvariant()); }
Stock.API‘da geliştirilen bu consumer’ın uygulama tarafından gerçek bir consumer olduğunu ifade edebilmek için ilgili uygulamanın ‘Startup.cs’ dosyasında aşağıdaki konfigürasyonun yapılması gerekmektedir.
public class Startup { . . . public void ConfigureServices(IServiceCollection services) { . . . services.AddMassTransit(configure => { configure.AddConsumer<OrderCreatedEventConsumer>(); configure.UsingRabbitMq((context, configurator) => { configurator.Host(Configuration.GetConnectionString("RabbitMQ")); configurator.ReceiveEndpoint(RabbitMQSettings.Stock_OrderCreatedEventQueue, e => e.ConfigureConsumer<OrderCreatedEventConsumer>(context)); }); }); services.AddSingleton<MongoDbService>(); . . . } . . . }
Dikkat ederseniz 13. satırda ‘AddConsumer’ metodu ile geliştirilen ‘OrderCreatedEventConsumer’ sınıfı bu uygulamaya bir consumer olarak eklenmektedir. Ayrıca 18. satırda ise eklenen bu consumer’a kuyruk adı da verilmektedir. Ha tabi ayrıca 22. satıra göz atarsanız geliştirilen ‘MongoDbService’in de dependency injection ile erişilebilir olması için uygulama provider’ına eklendiğinden de gözle kaş arasında bahsetmiş olalım..
Bu adıma istinaden ‘OrderCreatedEventConsumer’ içerisinde kullanılan ‘Stock’ entity’sinin içeriğini merak edenleriniz olacaktır. Buyrun onu da paylaşmış olalım;
public class Stock { [BsonRepresentation(BsonType.ObjectId)] [BsonId] [BsonElement(Order = 0)] public ObjectId Id { get; set; } [BsonRepresentation(BsonType.Int64)] [BsonElement(Order = 1)] public int ProductId { get; set; } [BsonRepresentation(BsonType.Int64)] [BsonElement(Order = 2)] public int Count { get; set; } [BsonRepresentation(BsonType.DateTime)] [BsonDateTimeOptions(Kind = DateTimeKind.Utc)] [BsonElement(Order = 3)] public DateTime CreatedDate { get; set; } = DateTime.Now; }
- Adım 7 Shared | StockReservedEvent ve StockNotReservedEvent Event’lerinin Oluşturulması
Şimdi sırada Stock.API‘da yapılan stok işlemlerinin başarılı ya da başarısız olma durumuna istinaden kuyruğa atılacak olanStockReservedEvent
veStockNotReservedEvent
event’lerinin oluşturulması vardır.Eğer ki stok miktarının düşürülmesi başarıyla gerçekleştirildiyse
StockReservedEvent
event’i yayınlanacaktır. Bu event’e Payment.API subscribe olacak ve ona göre ödeme işlemleri devreye girecektir. Yok eğer stok miktarında bir hata ya da tutarsızlık meydana geliyorsaStockNotReservedEvent
event’i yayılacaktır. Bu event’e de Order.API subscribe olacaktır ve oluşturulan siparişin durumunu Fail‘e çekip süreci sonlandıracaktır. Dolayısıyla bu event’leri aşağıdaki gibi amaçları doğrultusunda geliştirmemiz gerekmektedir;public class StockReservedEvent : IEvent { public int OrderId { get; set; } public int BuyerId { get; set; } public decimal TotalPrice { get; set; } public List<OrderItemMessage> OrderItems { get; set; } }
public class StockNotReservedEvent : IEvent { public int OrderId { get; set; } public int BuyerId { get; set; } public string Message { get; set; } }
Evet, görüldüğü üzere sürecin sağlıklı devam ettiğini ifade eden
StockReservedEvent
sonraki tetiklenecek microservis olan Payment.API‘a gerekli sipariş bilgilerini iletirken, stok miktarında problem olduğunu ifade edenStockNotReservedEvent
hangi alıcının yaptığı hangi siparişin neden(Message) başarısız olduğuna dair bilgileri iletmektedir. - Adım 8 Stock.API | StockReservedEvent ve StockNotReservedEvent Event’lerinin Yayınlanması
Şimdi oluşturulanStockReservedEvent
veStockNotReservedEvent
event’lerinin yayınlanmasıyla ilgilenelim. Bunun için Stock.API‘da biraz önce(6. adım) oluşturulan consumer’a gelelim ve aşağıdaki eklentilerde bulunalım.public class OrderCreatedEventConsumer : IConsumer<OrderCreatedEvent> { readonly MongoDbService _mongoDbService; readonly ISendEndpointProvider _sendEndpointProvider; readonly IPublishEndpoint _publishEndpoint; public OrderCreatedEventConsumer( MongoDbService mongoDbService, ISendEndpointProvider sendEndpointProvider, IPublishEndpoint publishEndpoint) { _mongoDbService = mongoDbService; _sendEndpointProvider = sendEndpointProvider; _publishEndpoint = publishEndpoint; } public async Task Consume(ConsumeContext<OrderCreatedEvent> context) { List<bool> stockResult = new(); IMongoCollection<Models.Stock> collection = _mongoDbService.GetCollection<Models.Stock>(); //Sipariş edilen ürünlerin stok miktarı sipariş adedinden fazla mı? değil mi? foreach (OrderItemMessage orderItem in context.Message.OrderItems) stockResult.Add((await collection.FindAsync(s => s.ProductId == orderItem.ProductId && s.Count > orderItem.Count)).Any()); //Eğer fazlaysa sipariş edilen ürünlerin stok miktarı güncelleniyor. if (stockResult.TrueForAll(sr => sr.Equals(true))) { foreach (OrderItemMessage orderItem in context.Message.OrderItems) { Models.Stock stock = await (await collection.FindAsync(s => s.ProductId == orderItem.ProductId)).FirstOrDefaultAsync(); stock.Count -= orderItem.Count; await collection.FindOneAndReplaceAsync(x => x.ProductId == orderItem.ProductId, stock); } ISendEndpoint sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{RabbitMQSettings.Payment_StockReservedEventQueue}")); StockReservedEvent stockReservedEvent = new() { BuyerId = context.Message.BuyerId, OrderId = context.Message.OrderId, OrderItems = context.Message.OrderItems, TotalPrice = context.Message.TotalPrice }; await sendEndpoint.Send(stockReservedEvent); } //Eğer az ise siparişin iptal edilmesi için gerekli event gönderiliyor. else { StockNotReservedEvent stockNotReservedEvent = new() { BuyerId = context.Message.BuyerId, OrderId = context.Message.OrderId, Message = "Stok yetersiz..." }; await _publishEndpoint.Publish(stockNotReservedEvent); } } }
Yukarıdaki kod bloğunu incelerseniz eğer 36 ile 44. satırlar arasında
StockReservedEvent
event’i 49 ile 55. satırlar arasında iseStockNotReservedEvent
‘i yayınlanmaktadır.StockReservedEvent
event’ine odaklanırsak eğer publish değil send işlemi ile kuyruğa gönderildiğini görmekteyiz. Bunun nedeni, bu event’a sade ve sadece tek bir servisin(Payment.API) subscribe olacağıdır. Yani Payment.API‘ın dışında başka bir servis bu event’i dinlemeyecektir. Haliyle bizler de send ile direkt hedef olarak belirlenen kuyruğa bu event’i atmaktayız. Ha tabi sizler isterseniz publish olarak da yayınlayabilirsiniz. Yine maksadımız zenginlik olsun 🙂StockReservedEvent
event’ine odaklandığımızda ise önceki işlemlerde tecrübe edildiği gibi publish ile ilgili event yayınlanmaktadır.Bu adımda ‘RabbitMQSettings’ sınıfının en güncel içeriği aşağıdaki gibi olacaktır;
public static class RabbitMQSettings { public const string Stock_OrderCreatedEventQueue = "stock-order-created-queue"; public const string Payment_StockReservedEventQueue = "payment-stock-reserved-queue"; }
- Adım 9 Payment.API | StockReservedEvent Event’ine Subscribe Olunması ve Mesajın Okunması
Order.API‘da sipariş oluşturulup, Stock.API‘da stok işlemleri de başarıyla gerçekleştirildikten sonra artık sırada ödeme işlemleri vardır. Ve bu işlemi malumunuz Payment.API‘da gerçekleştiriyor olacağız. Bu uygulamada da artık ilgiliStockReservedEvent
event’ini dinleyecek olan bir consumer geliştirmeliyiz. Bunun için ilgili uygulamada ‘Consumers’ isimli klasör içerisine ‘StockReservedEventConsumer’ isimli bir sınıf oluşturalım.public class StockReservedEventConsumer : IConsumer<StockReservedEvent> { readonly IPublishEndpoint _publishEndpoint; public StockReservedEventConsumer(IPublishEndpoint publishEndpoint) { _publishEndpoint = publishEndpoint; } public async Task Consume(ConsumeContext<StockReservedEvent> context) { // Process ... // Process ... // Process ... bool paymentState = true; //Ödeme başarılıysa if (paymentState) { Console.WriteLine("Ödeme başarılı..."); // *** // PaymentCompletedEvent // *** } else { Console.WriteLine("Ödeme başarısız..."); // *** // PaymentFailedEvent // *** } } }
Artık burada dermanımız kalmadığından dolayı ödeme işlemi farazi bir şekilde es geçilmiştir. Lakin önemli olan ödeme neticesine göre başarılı ya da başarısız bir event yayınlama işlemi gerçekleştirilecektir.
Tabi oluşturulan bu consumer’ın Payment.API‘ın ‘Startup.cs’ dosyasında aşağıdaki gibi eklenmiş olması gerekmektedir.
public void ConfigureServices(IServiceCollection services) { . . . services.AddMassTransit(configure => { configure.AddConsumer<StockReservedEventConsumer>(); configure.UsingRabbitMq((context, configurator) => { configurator.Host(Configuration.GetConnectionString("RabbitMQ")); configurator.ReceiveEndpoint(RabbitMQSettings.Payment_StockReservedEventQueue, e => e.ConfigureConsumer<StockReservedEventConsumer>(context)); }); }); . . . }
- Adım 10 Shared | PaymentCompletedEvent ve PaymentFailedEvent Event’lerinin Oluşturulması
Payment.API‘da ki sonuç başarılıysaPaymentCompletedEvent
yok eğer başarısızsaPaymentFailedEvent
event’leri yayınlanacaktır.PaymentCompletedEvent
ile sipariş başarılı bir şekilde tamamlanmış olacak ve durumu Completed‘a çekilecektir.PaymentFailedEvent
event’i ile sipariş iptal edilerek Fail durumuna getirilecek hem de Stock.API‘da ki yapılan değişiklikler compensable transaction ile telafi edilecektir/geri alınacaktır.public class PaymentCompletedEvent : IEvent { public int OrderId { get; set; } }
public class PaymentFailedEvent : IEvent { public int OrderId { get; set; } public string Message { get; set; } public List<OrderItemMessage> OrderItems { get; set; } }
Görüldüğü üzere her iki event’te de gerekli bilgiler tutulmaktadır.
PaymentCompletedEvent
event’in de hangi siparişin başarıyla sonlandırılacağı bilgisi gelirkenPaymentFailedEvent
event’in de ise hangi siparişin neden iptal edildiği bilgileri gelmektedir. - Adım 11 Payment.API | PaymentCompletedEvent ve PaymentFailedEvent Event’lerinin Yayınlanması
Payment.API servisinde ödeme sonucuna göre bir adım önce oluşturulanPaymentCompletedEvent
vePaymentFailedEvent
event’lerinin yayınlanması için 9. adımda geliştirilen ‘StockReservedEventConsumer’ isimli consumer’a aşağıdaki müdahalenin yapılması gerekmektedir.public class StockReservedEventConsumer : IConsumer<StockReservedEvent> { readonly IPublishEndpoint _publishEndpoint; public StockReservedEventConsumer(IPublishEndpoint publishEndpoint) { _publishEndpoint = publishEndpoint; } public async Task Consume(ConsumeContext<StockReservedEvent> context) { // Process ... // Process ... // Process ... bool paymentState = true; //Ödeme başarılıysa if (paymentState) { Console.WriteLine("Ödeme başarılı..."); PaymentCompletedEvent paymentCompletedEvent = new() { OrderId = context.Message.OrderId }; await _publishEndpoint.Publish(paymentCompletedEvent); } else { Console.WriteLine("Ödeme başarısız..."); PaymentFailedEvent paymentFailedEvent = new() { OrderId = context.Message.OrderId, OrderItems = context.Message.OrderItems, Message = "Bakiye yetersiz!" }; await _publishEndpoint.Publish(paymentFailedEvent); } } }
21 ile 25. satır aralığına bakarsanız
PaymentCompletedEvent
, 30 ile 35. satır aralığına bakarsanız eğerPaymentFailedEvent
event’leri yayınlanmaktadır. Şimdi gelin bu event’lerin consumer’larını oluşturalım. - Adım 12 Order.API | PaymentCompletedEvent Event’ine Subscribe Olunması ve Mesajın Okunması
PaymentCompletedEvent
event’i yayınlandığı zaman Order.API ilgili siparişi başarılı bir şekilde sonuçlandırmalıdır. Bunun için Order.API servisinde ilgili event’i tüketecek bir consumer oluşturulmalıdır. Haliyle ‘Consumers’ isimli klasör altında ‘PaymentCompletedEventConsumer’ isimli bir sınıf oluşturalım.public class PaymentCompletedEventConsumer : IConsumer<PaymentCompletedEvent> { readonly ApplicationDbContext _applicationDbContext; public PaymentCompletedEventConsumer(ApplicationDbContext applicationDbContext) { _applicationDbContext = applicationDbContext; } public async Task Consume(ConsumeContext<PaymentCompletedEvent> context) { Models.Order order = await _applicationDbContext.Orders.FindAsync(context.Message.OrderId); if (order != null) { order.OrderStatus = OrderStatus.Completed; await _applicationDbContext.SaveChangesAsync(); } } }
Görüldüğü üzere
PaymentCompletedEvent
yayınlandığı anda bu consumer devreye girecek ve ilgili siparişin durumunu Completed‘a çekecektir. Bu sınıfı ilgili uygulamanın ‘Startup.cs’ dosyasında aşağıdaki gibi consumer olarak ekleyelim;public void ConfigureServices(IServiceCollection services) { . . . services.AddMassTransit(configure => { configure.AddConsumer<PaymentCompletedEventConsumer>(); configure.UsingRabbitMq((context, configurator) => { configurator.Host(Configuration.GetConnectionString("RabbitMQ")); configurator.ReceiveEndpoint(RabbitMQSettings.Order_PaymentCompletedEventQueue, e => e.ConfigureConsumer<PaymentCompletedEventConsumer>(context)); }); }); . . . }
Bu adımda ‘RabbitMQSettings’ sınıfının son hali aşağıdaki gibidir :
public static class RabbitMQSettings { public const string Stock_OrderCreatedEventQueue = "stock-order-created-queue"; public const string Payment_StockReservedEventQueue = "payment-stock-reserved-queue"; public const string Order_PaymentCompletedEventQueue = "order-payment-completed-queue"; }
- Adım 13 Order.API – Stock.API | PaymentFailedEvent Event’ine Subscribe Olunması ve Mesajın Okunması
Şimdi sırada Payment.API‘da oluşan olumsuz bir durumdan dolayı yayınlananPaymentFailedEvent
event’ini Order.API ve Stock.API servislerinde dinleyecek consumer’ları yazmaya geldi. Bunun için her iki servisin ‘Consumers’ klasörü altında ‘PaymentFailedEventConsumer’ isimli consumer sınıflarının oluşturulması gerekmektedir.Order.API için;
public class PaymentFailedEventConsumer : IConsumer<PaymentFailedEvent> { readonly ApplicationDbContext _context; public PaymentFailedEventConsumer(ApplicationDbContext context) { _context = context; } public async Task Consume(ConsumeContext<PaymentFailedEvent> context) { Models.Order order = await _context.FindAsync<Models.Order>(context.Message.OrderId); if (order != null) { order.OrderStatus = OrderStatus.Fail; await _context.SaveChangesAsync(); Console.WriteLine(context.Message.Message); } } }
Evet, görüldüğü üzere Order.API servisinde ilgili sipariş durumu Fail yapılarak iptal edilmektedir.
Stock.API için;
public class PaymentFailedEventConsumer : IConsumer<PaymentFailedEvent> { readonly MongoDbService _mongoDbService; public PaymentFailedEventConsumer(MongoDbService mongoDbService) { _mongoDbService = mongoDbService; } public async Task Consume(ConsumeContext<PaymentFailedEvent> context) { var collection = _mongoDbService.GetCollection<Models.Stock>(); foreach (var item in context.Message.OrderItems) { Models.Stock stock = await (await collection.FindAsync(s => s.ProductId == item.ProductId)).FirstOrDefaultAsync(); if (stock != null) { stock.Count += item.Count; await collection.FindOneAndReplaceAsync(s => s.ProductId == item.ProductId, stock); } } } }
Stock.API servisinde ise bu siparişten dolayı stok miktarı düşürülen tüm ürünlerin değerleri compensable transaction ile telafi edilerek geri alınmaktadır.
Haliyle her iki servisin ‘Startup.cs’ dosyasında kendilerine özel oluşturulan consumer’lar için gerekli konfigürasyonlar aşağıdaki gibi yapılmalıdır;
Order.API için;public void ConfigureServices(IServiceCollection services) { . . . services.AddMassTransit(configure => { configure.AddConsumer<PaymentCompletedEventConsumer>(); configure.AddConsumer<PaymentFailedEventConsumer>(); configure.UsingRabbitMq((context, configurator) => { configurator.Host(Configuration.GetConnectionString("RabbitMQ")); configurator.ReceiveEndpoint(RabbitMQSettings.Order_PaymentCompletedEventQueue, e => e.ConfigureConsumer<PaymentCompletedEventConsumer>(context)); configurator.ReceiveEndpoint(RabbitMQSettings.Order_PaymentFailedEventQueue, e => e.ConfigureConsumer<PaymentFailedEventConsumer>(context)); }); }); . . . }
Stock.API için;
public void ConfigureServices(IServiceCollection services) { . . . services.AddMassTransit(configure => { configure.AddConsumer<OrderCreatedEventConsumer>(); configure.AddConsumer<PaymentFailedEventConsumer>(); configure.UsingRabbitMq((context, configurator) => { configurator.Host(Configuration.GetConnectionString("RabbitMQ")); configurator.ReceiveEndpoint(RabbitMQSettings.Stock_OrderCreatedEventQueue, e => e.ConfigureConsumer<OrderCreatedEventConsumer>(context)); configurator.ReceiveEndpoint(RabbitMQSettings.Stock_PaymentFailedEventQueue, e => e.ConfigureConsumer<PaymentFailedEventConsumer>(context)); }); }); . . . }
Bu adımda ‘RabbitMQSettings’ sınıfının son hali aşağıdaki gibidir :
public static class RabbitMQSettings { public const string Stock_OrderCreatedEventQueue = "stock-order-created-queue"; public const string Payment_StockReservedEventQueue = "payment-stock-reserved-queue"; public const string Order_PaymentCompletedEventQueue = "order-payment-completed-queue"; public const string Order_PaymentFailedEventQueue = "order-payment-failed-queue"; public const string Stock_PaymentFailedEventQueue = "stock-payment-failed-queue"; }
- Adım 14 Order.API | StockNotReservedEvent Event’ine Subscribe Olunması ve Mesajın Okunması
Son olarak Stock.API‘da olabilecek herhangi bir hata yahut tutarsızlık durumunda Payment.API‘a gitmeksizin direkt olarak siparişin iptal edilmesi içinStockNotReservedEvent
event’i yayınlanmaktadır. Dolayısıyla Order.API‘da bu event’e subscribe olup gerekli iptal işleminin gerçekleştirilmesi gerekmektedir. Bunun için ‘Consumers’ klasörü içerisine ‘StockNotReservedEventConsumer’ isimli bir consumer sınıfı oluşturalım.public class StockNotReservedEventConsumer : IConsumer<StockNotReservedEvent> { readonly ApplicationDbContext _context; public StockNotReservedEventConsumer(ApplicationDbContext context) { _context = context; } public async Task Consume(ConsumeContext<StockNotReservedEvent> context) { Models.Order order = await _context.FindAsync<Models.Order>(context.Message.OrderId); if (order != null) { order.OrderStatus = OrderStatus.Fail; await _context.SaveChangesAsync(); Console.WriteLine(context.Message.Message); } } }
Haliyle görüldüğü üzere ilgili sipariş durumu Fail olarak belirtilmekte ve böylece iptal edilmektedir.
Bu consumer’ıda ‘Startup.cs’e eklemeyi unutmuyoruz.
public void ConfigureServices(IServiceCollection services) { . . . services.AddMassTransit(configure => { configure.AddConsumer<PaymentCompletedEventConsumer>(); configure.AddConsumer<PaymentFailedEventConsumer>(); configure.AddConsumer<StockNotReservedEventConsumer>(); configure.UsingRabbitMq((context, configurator) => { configurator.Host(Configuration.GetConnectionString("RabbitMQ")); configurator.ReceiveEndpoint(RabbitMQSettings.Order_PaymentCompletedEventQueue, e => e.ConfigureConsumer<PaymentCompletedEventConsumer>(context)); configurator.ReceiveEndpoint(RabbitMQSettings.Order_PaymentFailedEventQueue, e => e.ConfigureConsumer<PaymentFailedEventConsumer>(context)); configurator.ReceiveEndpoint(RabbitMQSettings.Order_StockNotReservedEventQueue, e => e.ConfigureConsumer<StockNotReservedEventConsumer>(context)); }); }); . . . }
Bu adımda da ‘RabbitMQSettings’ sınıfının son hali nihai olarak aşağıdaki gibi olacaktır.
public static class RabbitMQSettings { public const string Stock_OrderCreatedEventQueue = "stock-order-created-queue"; public const string Payment_StockReservedEventQueue = "payment-stock-reserved-queue"; public const string Order_PaymentCompletedEventQueue = "order-payment-completed-queue"; public const string Order_PaymentFailedEventQueue = "order-payment-failed-queue"; public const string Stock_PaymentFailedEventQueue = "stock-payment-failed-queue"; public const string Order_StockNotReservedEventQueue = "order-stock-not-reserved-queue"; }
Test Edelim
Evet, artık yaptığımız microservice çalışmasını ve Saga pattern ile distributed transaction uygulamasını baştan ayağa test edebiliriz. Bunun için öncelik olarak servislerin portlarını ve MongoDB’de bir kaç dummy data yapılanmasını ayarlamamız gerekmektedir.
Order.API | https://localhost:5001 |
Stock.API | https://localhost:5003 |
Payment.API | https://localhost:5005 |
Stock.API‘ın ‘Program.cs’ dosyasına girelim ve aşağıdaki dummy dataları MongoDB’ye yükleyecek olan konfigürasyonu gerçekleştirelim.
public class Program { public static void Main(string[] args) { var host = CreateHostBuilder(args).Build(); using IServiceScope scope = host.Services.CreateScope(); MongoDbService mongoDbService = scope.ServiceProvider.GetRequiredService<MongoDbService>(); if (!mongoDbService.GetCollection<Models.Stock>().FindSync(x => true).Any()) { mongoDbService.GetCollection<Stock.API.Models.Stock>().InsertOne(new() { ProductId = 21, Count = 200 }); mongoDbService.GetCollection<Stock.API.Models.Stock>().InsertOne(new() { ProductId = 22, Count = 100 }); mongoDbService.GetCollection<Stock.API.Models.Stock>().InsertOne(new() { ProductId = 23, Count = 50 }); mongoDbService.GetCollection<Stock.API.Models.Stock>().InsertOne(new() { ProductId = 24, Count = 10 }); mongoDbService.GetCollection<Stock.API.Models.Stock>().InsertOne(new() { ProductId = 25, Count = 30 }); } host.Run(); } public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>(); }); }
Artık bu ayarlardan sonra uygulamamız teste hazır. Şimdi testimizi gerçekleştirebiliriz.
Görüldüğü üzere MonboDB’ye girilen dummy datalara uygun verilerde bir sipariş post edildiği taktirde
başarılı bir sipariş söz konusu olabilmektedir.
Veritabanına bakıldığında da verilerin stok miktarlarının tutarlı bir şekilde düştüğünü gözlemlemekteyiz.
Benzer mantıkla stok miktarı tutarsız olan ürünler sipariş edildiğinde
sipariş durumu Fail olarak düzenlenmektedir. Ve hatta Order.API‘ın console’una göz atarsak bu durumun nedeni mesaj olarak karşımıza çıkmaktadır.
MongoDB’ye göz atarsak eğer verilerin en sonuncu halinde olduğunu görmekteyiz.
Ve son olarak Payment.API‘da farazi olarak yaptığımız ödeme işlemini ‘false’ yapıp başarısız duruma getirerek son durumu test edersek eğer;ekran görüntüsünde de görüldüğü üzere ‘Bakiye yetersiz!’ mesajıyla karşılaşılmakta ve stok bilgileri compensable transaction ile telafi edilerek geri alınmakta ve sipariş iptal edilmektedir.
Ayrıca bu servislerin RabbitMQ’da ki connection durumlarına da göz atarsak eğer şeklinde olduğunu gözlemlemekteyiz.
Nihai olarak;
Distributed transaction senaryolarında microservisler arasında tutarlılığı sağlayabilmek için Saga pattern’ın choreography implemantasyonunu adım adım hep beraber tecrübe etmiş olduk. Dikkat ederseniz eğer her bir aksiyona karşılık bir event yayınlanmakta ve bu event’e subscribe olan consumer’lar aracılığıyla ilgili servisin alacağı aksiyon belirlenmektedir. Haliyle bu yaklaşımda servis sayısı arttıkça ister istemez kuyrukların yönetimi ve koordinasyonu zorlaşacak ve bir yerden sonra gelişimsel açıdan son raddeye varılmış olunacaktır. Haliyle servis sayısı 2 ile 4 arasında adil bir sayıya tekabül eden çalışmalar da choreography implemantasyonunun tercih edilmesi makul görülürken daha fazla servisin söz konusu olduğu durumlarda bir sonraki makalemizde pratiksel olarak yine benzer bir senaryo üzerinden ele alacağımız Orchestrator implemantasyonunu uygulamamız daha ideal bir çözüm getirmiş olacaktır. O halde sizler bu makaleyi sindirirken, ben de biraz dinlendikten sonra diğer makaleyi klavyeye almaya başlıyor olacağım 🙂
Bu noktaya kadar üşenmeden okuyup, eşlik ettiğiniz için teşekkür ederim…..
İlgilenenlerin faydalanması dileğiyle…
Sonraki yazılarımda görüşmek üzere…
İyi çalışmalar…
Not : Örnek projeyi indirmek için buraya tıklayınız.
connectionstring kısmınıı nerden alabilirim
Merhaba,
Cevap için RabbitMQ – Cloud Ortamda(CloudAMQP) Kurulumu başlıklı makaleme göz atmanızı tavsiye ederim.
Keyifli okumalar dilerim…
Değerli okuyucularım,
Yukarıdaki içerikte oluşturduğumuz örnek uygulamaya aşağıdaki github adresinden erişebilirsiniz :
https://github.com/gncyyldz/Saga-Choreography
İyi okumalar, bol faydalar…