.NET’te Rebus Kütüphanesiyle Saga Orchestration Yaklaşımının Uygulanması
Merhaba,
Malumunuz, microservice mimarisiyle tasarlanmış yazılımlarımızda kullanıcılar tarafından başlatılan işlem ya da görevlerin, bir servis tarafından alınıp ardından belirli servisler üzerinde mantıksal bir tetikleme silsilesiyle işlenme ve koordine edilme süreçlerini yürütmemize ve biryandan da bu süreçlerde data consistency’i(veri tutarlılığı) sağlıklı bir şekilde sağlayabilmemize imkan tanıyan Saga pattern’ını kullanmakta ve özellikle bu pattern’ın uygulandığı senaryolar da servis sayısının adil kotadan fazla olduğu durumlarda uygulanma zorluğu ve geliştirme maliyeti eğrisini düşürebilmemizi sağlayacak olan Orchestration yaklaşımını tercih etmekteyiz.
Eğer ki konuya dair bilgi sahibi olmaksızın direkt olarak bu içeriğe merak saldıysanız öncelikle Saga pattern’ının ne olduğunu anlayabilmek için Microservice Mimarilerde Saga Pattern İle Transaction Yönetimi başlıklı makaleyi, ardından da Orchestration implementasyonunu ele aldığımız Microservice – Saga – Commands/Orchestration Implementasyonu İle Transaction Yönetimi başlıklı içeriği okumanız gerekmektedir. Ha ayriyeten Microservice mimarisinde Saga’nın bir diğer alternatif yaklaşımı olan Choreography implementasyonunu da Microservice – Saga – Events/Choreography Implementasyonu İle Transaction Yönetimi başlıklı makaleden inceleyerek olayın dibini sıyırabilir ve bu içerikten istifade edebilecek altyapıyı kusursuz bir hale getirebilirsiniz.
Velhasıl, eğer ki konunun terminolojisine tam teferruatlı bir hakimiyet sağladıysanız bu içeriğimizde Orchestration yaklaşımının Rebus kütüphanesiyle farklı bir tekniğini pratik ağırlıklı olarak ele alacağımızı düşünebilirsiniz.
Rebus Nedir?
Rebus, distributed sistemlerde servisler arası asenkron mesajlaşma için kullanılan bir mesajlaşma kütüphanedir. Azure Service Bus, RabbitMQ, SQL Server, InMemory vs. gibi mesaj taşıma görevinden sorumlu olan transporter isimli birçok sistemi desteklemektedir. Ayrıca, belirli bir düzen içinde çalışan iş adımlarını veya microservice’lerin görev gerçekleştirme süreçlerini yönetmek için de kullanılabilmektedir.
Rebus İle Orchestration İmplementasyonu
Bunu direkt örnek bir senaryo üzerinden adım adım inceliyor olacağız. Senaryomuz temsilen, klasik e-ticaret sistemlerindeki sipariş süreçlerine karşın olacaktır. Senaryo sürecinde aşağıdaki servislerden istifade ediyor olacağız;
Servis | Türü | Açıklama |
---|---|---|
Order.API | Web API | Kullanıcı tarafından başlatılan siparişin alınmasını ve siparişe dair stok ve ödeme sürecinin başlatılmasını sağlayacak olan servistir. |
Order.Saga | Worker Service | İlgili siparişe dair servisler arasındaki adım adım işleyiş neticesinin yürütüldüğü handler’ların barındırılacağı servistir. Düşündüğünüz zaman bu servisin sorumluluğunu direkt Order.API’ın alabileceğine ve sanki lüzumsuz olduğuna kanaat getirebilirsiniz. Ancak, Order.API’ın kullanıcıyla etkileşime girmekten başka mesaj tüketme sorumluluğunu üstlendirmek ilkelere aykırı olacağından dolayı bu sorumluluğu farklı bir servise yüklemek best practice açısından daha uygun olacaktır. |
Stock.Service | Worker Service | Stok işlemlerinin yürütüleceği servistir. |
Payment.Service | Worker Service | Ödeme işlemlerinin yürütüleceği servistir. |
Shared | Class Library | Servisler arasında kullanılacak tüm ortak message ve event yapılanmalarını ve bunların yanında belli başlı queue isimleri gibi konfigürasyonları barındıracak olan ortak bir katmandır. |
Saga.Orchestration.Service | Worker Service | Servisler arasındaki iş ve görev sürecini merkezi bir noktadan yönetmek üzere tasarlanacak olan ve böylece Orchestration yaklaşımını sağlayacak olan servisin ta kendisidir. |
Ve bu servisler arasındaki iş akışı da aşağıdaki gibi olacaktır;
- Kullanıcı
Order.API
üzerinden bir sipariş talebinde bulunacak ve süreci başlatacaktır. Order.API
siparişi veritabanına ekleyecek ve bunuSaga.Orchestration.Service
‘e bildirecektir.Saga.Orchestration.Service
ise oluşturulan siparişe dairStock.Service
‘e bilgide bulunacaktır.Stock.Service
siparişteki ürünlere göre stok çalışması gerçekleştirilecektir. Stok işlemleri başarılı ya da başarısız olmak üzere sonuçlanacak ve neticeye göreSaga.Orchestration.Service
‘e bildiride bulunulacaktır.- Eğer ki,
Stock.Service
siparişe dair stok işlemlerini başarıyla gerçekleştirdiyse o taktirdePayment.Service
‘e ödeme işlemleri için bilgide bulunacaktır. - Yok eğer başarısız bir durum söz konusuysa işte o taktirde
Order.Saga
servisine siparişin başarıyla sonuçlanamadığına dair bilgi mesaj gönderecek ve ilgili sipariş iptal edilecektir.
- Eğer ki,
Payment.Service
‘te ise ilgili siparişe dair ödeme başarılıysa eğer bunun içinSaga.Orchestration.Service
‘e bildiride bulunulacak ve o da,Order.Saga
‘ya siparişin başarıyla sonuçlandığına dair mesaj yayınlayacaktır.- Yok eğer ödeme gerçekleştirilemediyse hem siparişin iptal edilmesi hem de
Stock.Service
‘te ilgili siparişe dair yapılan stok işlemlerinin geri alınması içinSaga.Orchestration.Service
‘e bildiride bulunacak, o da, siparişi iptal etmesi içinOrder.Saga
‘ya ve yapılan stok çalışmalarının geri alınması için deStock.Service
‘e mesaj yollayacaktır.
İlk Adım
Her şeyden önce yukarıda oluşturduğumuz projelere ilk adım olarak aşağıdaki kütüphanelerin yüklenmesi gerekmektedir;
Kazmayı Vuralım
Projelere ilgili kütüphaneleri yüklediğimize göre artık genel senaryonun inşasına başlayabiliriz. Bunun için aşağıdaki adımları sırasıyla seyretmemiz yeterli olacaktır kanaatindeyim…
- Adım 1 Order.API – (Order Entity’sinin Oluşturulması, Veritabanı Altyapısının Kurgulanması ve Migration İşlemleri)
İlk olarak kullanıcı tarafından verilecek olan siparişinOrder.API
tarafında altyapısını kurgulamamız gerekmektedir. Bunun için aşağıdaki ‘Order’ entity’sinden istifade edebiliriz.public class Order { public int Id { get; set; } public int BuyerId { get; set; } public List<OrderItem> OrderItems { get; set; } public OrderStatus OrderStatus { get; set; } public DateTime CreatedDate { get; set; } public decimal TotalPrice { get; set; } }
Tabi bu entity içerisindeki ‘OrderItem’ ve ‘OrderStatus’ türleri de aşağıdaki gibi olacaktır.
public class OrderItem { public int Id { get; set; } public int ProductId { get; set; } public int Count { get; set; } public decimal Price { get; set; } }
public enum OrderStatus { Suspend, Completed, Fail }
Sipariş entity’sini oluşturduğumuza göre şimdi sırada DbContext nesnesini oluşturmamız gerekmektedir.
public class ApplicationDbContext : DbContext { public ApplicationDbContext(DbContextOptions options) : base(options) { } public DbSet<Order.API.Models.Entities.Order> Orders { get; set; } public DbSet<OrderItem> OrderItems { get; set; } }
Ardından oluşturulan bu yapıyı uygulamanın ‘Program.cs’ dosyasında aşağıdaki gibi yapılandırmamız gerekmektedir.
var builder = WebApplication.CreateBuilder(args); . . . builder.Services.AddDbContext<ApplicationDbContext>(options => options.UseSqlServer(builder.Configuration.GetConnectionString("SQLServer"))); var app = builder.Build(); . . . app.Run();
Ve son olarak da
add-migration mig_1
talimatıyla migration basalım ve
update-database
talimatıyla da migrate ederek veritabanını sunucuda oluşturalım.(Not: Bu adımı uygularken süreçte ihtiyaç olabilecek bazı kütüphanelerin yüklendiği varsayılmıştır. Bu kütüphaneler; EF Core için Microsoft.EntityFrameworkCore, EF Core’da SQL Server’ın kullanılabilmesi için Microsoft.EntityFrameworkCore.SqlServer ve package manager console üzerinden migration işlemlerinin yürütülebilmesi için Microsoft.EntityFrameworkCore.Tools kütüphaneleridir.) - Adım 2 Order.API – (Siparişin Oluşturulması)
Kullanıcıdan gelecek olan sipariş için temel altyapıyı kurguladığımıza göre artık siparişin oluşturulmasına odaklanabiliriz. Bunun için öncelikle kullanıcıdan siparişin bilgilerini alabileceğimiz aşağıdaki viewmodel nesnelerini oluşturalım.public class CreateOrderVM { public int BuyerId { get; set; } public List<CreateOrderItemVM> OrderItems { get; set; } }
public class CreateOrderItemVM { public int ProductId { get; set; } public int Count { get; set; } public decimal Price { get; set; } }
Ardından siparişin oluşturulmasını sağlayacak olan endpoint’i aşağıdaki gibi oluşturalım.
var builder = WebApplication.CreateBuilder(args); . . . var app = builder.Build(); . . . app.MapPost("/create-order", async (ApplicationDbContext context, CreateOrderVM model) => { Order.API.Models.Entities.Order order = new() { BuyerId = model.BuyerId, OrderItems = model.OrderItems.Select(oi => new OrderItem { Count = oi.Count, Price = oi.Price, ProductId = oi.ProductId }).ToList(), OrderStatus = OrderStatus.Suspend, TotalPrice = model.OrderItems.Sum(oi => oi.Count * oi.Price), CreatedDate = DateTime.Now }; await context.Orders.AddAsync(order); await context.SaveChangesAsync(); return Results.Created(); }); app.Run();
Görüldüğü üzere siparişe dair distributed sürecin tamamlanmasını bekleyeceğimizden dolayı ilk etapta Suspend olarak oluşturmaktayız.
- Adım 3 Stock.Service – (MongoDB Altyapısının Kurgulanması ve Ürün Stok Bilgilerine Dair Dummy Dataların Eklenmesi)
Gelecek olan siparişin stok işlemlerine dair deStock.Service
‘de bir altyapı oluşturmamız gerekmektedir. Bu altyapı içeriği zenginleştirmek maksadıyla MongoDB üzerine kurgulanmış olacaktır. Bunun için öncelikle aşağıdaki ‘MongoDBService’i oluşturalım.public class MongoDBService { readonly IMongoDatabase _database; readonly IConfiguration _configuration; public MongoDBService(IConfiguration configuration) { _configuration = configuration; MongoClient client = new(_configuration["MongoDB:Server"]); _database = client.GetDatabase(_configuration["MongoDB:DatabaseName"]); } public IMongoCollection<T> GetCollection<T>() => _database.GetCollection<T>(typeof(T).Name.ToLowerInvariant()); }
Tabi bu servisi oluşturabilmek için MongoDB.Driver kütüphanesinin ilgili servise yüklenmesi gerekmektedir.
Bağlantı sağlayacağımız MongoDB sunucusu bilgilerini de aşağıdaki gibi appsettings.json dosyasına kaydedelim.
"MongoDB": { "Server": "mongodb://localhost:27017", "DatabaseName": "SagaOrchestrationRebusDB" }
Ve ardından ilgili servisi, projenin ‘Program.cs’ dosyası üzerinden aşağıdaki gibi yapılandırarak uygulamanın IoC Container’ına ekleyelim.
var builder = Host.CreateApplicationBuilder(args); . . . builder.Services.AddSingleton<MongoDBService>(); var host = builder.Build(); . . . host.Run();
Devamında ise MongoDB veritabanında stok bilgilerini tutacak aşağıdaki entity’i oluşturalım.
public class Stock { [BsonRepresentation(BsonType.ObjectId)] [BsonId] [BsonElement(Order = 0)] public ObjectId Id { get; set; } [BsonRepresentation(BsonType.Int64)] [BsonElement(Order = 1)] public int ProductId { get; set; } [BsonRepresentation(BsonType.Int64)] [BsonElement(Order = 2)] public int Count { get; set; } }
Ve bu entity eşliğinde yukarıda yapılandırdığımız servis sayesinde ürünlerin stok bilgilerini barındıran dummy dataları oluşturalım.
var builder = Host.CreateApplicationBuilder(args); . . . builder.Services.AddSingleton<MongoDBService>(); var host = builder.Build(); using var scope = builder.Services.BuildServiceProvider().CreateScope(); var mongoDbService = scope.ServiceProvider.GetRequiredService<MongoDBService>(); if (!await (await mongoDbService.GetCollection<Stock.Service.Models.Stock>().FindAsync(x => true)).AnyAsync()) { mongoDbService.GetCollection<Stock.Service.Models.Stock>().InsertOne(new() { ProductId = 1, Count = 200 }); mongoDbService.GetCollection<Stock.Service.Models.Stock>().InsertOne(new() { ProductId = 2, Count = 300 }); mongoDbService.GetCollection<Stock.Service.Models.Stock>().InsertOne(new() { ProductId = 3, Count = 50 }); mongoDbService.GetCollection<Stock.Service.Models.Stock>().InsertOne(new() { ProductId = 4, Count = 10 }); mongoDbService.GetCollection<Stock.Service.Models.Stock>().InsertOne(new() { ProductId = 5, Count = 60 }); } host.Run();
Artık bu çalışmalar eşliğinde
Stock.Service
‘in bir kere ayağa kaldırılması hem veritabanının hem de dummy dataların oluşturulması açısından yeterli olacaktır. - Adım 4 Shared – (BaseCorrelation Sınıfının Oluşturulması)
İçeriğimizin sonraki adımlarında Rebus ile Saga Orchestration davranışını uygularken geliştireceğimiz merkezi Saga yapılanmasında, servislerden gelecek olan mesajları eşleştirmemizi ve bu mesajların aynı iş akışı içerisinde belirli bir siparişe karşılık ilişkilendirmemizi sağlayacak olan korelasyon yapısını oluşturmamız gerekmektedir. Bu yapı, herhangi bir değer olabilir. Usul gereği mümkün mertebe unique olmasına özen gösterilmesi gerekeceğinden dolayı biz bu ihtiyaca istinaden ‘BaseCorrelation’ sınıfı ile çözüm getiriyor olacağız. Haliyle ilgili sınıfın içeriğiniShared
class library’sinde aşağıdaki gibi oluşturalım;[Serializable] public class BaseCorrelation { public BaseCorrelation() { } public BaseCorrelation(Guid correlationId) { CorrelationId = correlationId; } [DataMember] public Guid CorrelationId { get; set; } }
Görüldüğü üzere ‘BaseCorrelation’ sınıfı içerisinde ‘Guid’ türünden bir değer alınmakta ve bu ‘CorrelationId’ property’sinde saklanmaktadır. Böylece sistemde oluşturulacak tüm message ve event yapılanmalarını bu sınıftan türeterek, aynı iş akışındaki farklı olayları ve mesajları, ortak bir bağlam bilgisi olması amacıyla üretilmiş olan korelasyon değeri üzerinden eşleştirecek ve tek bir siparişe karşın olay zincirinin bağını kurguluyor olacağız.
- Adım 5 Saga.Orchestration.Service – (Saga Orchestration Altyapısının Kurgulanması)
Rebus kütüphanesiyle yapılan Orchestration operasyonunda tıpkı MassTransit ile yapmış olduğumuz çalışmada olduğu gibi belli başlı davranışlara sahip nesneler senaryoda sorumluluk üstlenmekte ve haliyle öncelikle bu nesnelerin üretilmesi gerekmektedir. Şimdi gelin bu nesneleri tek tek izah ederek ilerleyelim;- Saga Data Nesnesi
Orchestration operasyonu sürecinde distributed bir şekilde takip edilen akışın veri formatını temsil eden bir nesnedir. Bizim örnek senaryomuzda bu bir sipariş modeline karşılık gelmekte ve bir siparişle ilgili servisler arası iletişim sürecindeki verilerin gelişimini takip etmemizi sağlayacak olan bilgileri barındıracak olan bir nesneyi ifade etmektedir. Böylece bizler saga data nesnesi sayesinde distributed akıştaki olgunun state’ini tutabilmekte ve yönetebilmekteyiz.Bir sınıfın saga data sınıfı olabilmesi için
ISagaData
interface’inden implemente edilmesi gerekmektedir.public class OrderSagaData : BaseCorrelation, ISagaData { public Guid Id { get; set; } public int Revision { get; set; } public int OrderId { get; set; } public int BuyerId { get; set; } public decimal TotalPrice { get; set; } public List<OrderItem> OrderItems { get; set; } }
Yukarıda görüldüğü üzere bir sipariş sürecinin state’ini takip etmemizi sağlayacak olan ‘OrderSagaData’ sınıfını oluşturmuş bulunuyoruz. Burada dikkatinizi bir önceki adımda oluşturduğumuz ‘BaseCorrelation’ sınıfına çekmek isterim. Evet, oluşturduğumuz bu saga data nesnesi her ne kadar sipariş state’ini tutacak olsa da hangi siparişe dair state’i tuttuğunu anlayabilmek için bunu da ‘BaseCorrelation’dan türetiyor ve bu sınıftaki korelasyon yapılanmasını saga data nesnesine aktarıyoruz.
Ayrıca yukarıda kullanılan ‘OrderItem’ referansını da
Shared
içerisinde aşağıdaki gibi oluşturalım.public class OrderItem { public int ProductId { get; set; } public int Count { get; set; } public decimal Price { get; set; } }
- Saga Nesnesi
Orchestration yaklaşımını uygulamamızı sağlayacak olan, bir işin-akışın yani örnek senaryomuzdaki sipariş sürecinin temsil edilmesine ve servisler arasındaki koordinasyonun merkezi bir şekilde yürütülmesine imkan tanıyacak olan sınıfın ta kendisidir.Bizler saga sınıfı sayesinde servisler arasındaki mesajlaşmaları takip edebilmekte, durumları yönetebilmekte ve iş sürecine uygun bir şekilde durumlar arası geçişler yapabilmekteyiz.
Bir sınıfın saga sınıfı olabilmesi için
Saga<T>
abstract class’ını implemente etmesi gerekmektedir. Şöyle ki;public class OrderSaga : Saga<OrderSagaData> { protected override void CorrelateMessages(ICorrelationConfig<OrderSagaData> config) { } }
Evet, yukarıdaki kod bloğunda görüldüğü üzere bizler ‘OrderSaga’ adında bir saga nesnesi oluşturmuş bulunuyoruz. İçerik sürecinde bu nesnenin hacminin farklı implementasyonlarla genişlediğini görüyor olacaksınız. Şimdilik en sade haliyle incelememiz gerekirse eğer
<T>
parametresine verilen ‘OrderSagaData’ nesnesine dikkatinizi çekerim. Evet, saga nesnesi sayesinde servisler arası iş akış sürecini gelen message ve event’lere göre merkezi bir şekilde yönetecek ve iletişimi gerçekleştiriyor olacağız lakin bunu yaparken de işleyeceğimiz verinin türünün ‘OrderSagaData’ olacağını bildirmiş oluyoruz. Bizler iş akış sürecinin(yani sipariş sürecinin) hangi noktasında ya da hangi servisinde olursak olalım ona göre arkada üretilmiş olan ‘OrderSagaData’ nesnesinin değerleri şekillenecek ve kuracağımız korelasyon ağıyla da tek bir bağlamla içerisinde bir siparişin en nihai halini yine bu nesne üzerinden elde edebiliyor olacağız.Ayrıca
Saga
abstract class’ının implemente ettirdiği CorrelateMessages isimli metotta ise birazdan oluşturacağımız ve süreçte işlemeye çalışacağımız message ve event’lerin, saga data sınıfımız olan ‘OrderSagaData’ nesnesiyle eşleştirilmesini ve bir olguya yani siparişe dair korelasyon ağının kurulmasını sağlıyor olacağız. Oraya geleceğiz, şimdi bunun için öncelikle çalışmada kullanacağımız event ve message’ların oluşturulmasını gerçekleştirelim.
- Saga Data Nesnesi
- Adım 6 Shared – (Event ve Message’ların Oluşturulması)
Şimdi kurguda kullanacağımız tüm event ve message yapılanmalarını sorumluluklarını izah ederek tek tek oluşturmaya odaklanalım. Burada oluşturacağımız tüm event ve message’ların korelasyon ilişkisini sağlayabilmek için baştan 4. adımda oluşturduğumuz ‘BaseCorrelation’ sınıfından türetildiğini vurgulamakta fayda görmekteyim.- OrderStartedEvent
Kullanıcı tarafından sipariş verildiği an sürecin başladığınıSaga.Orchestration.Service
‘e bildirecek olan event’tir. Yani MassTransit ile yaptığımız çalışmadaki Tetikleyici Event‘e karşılık gelmektedir. Bu event yayınlandığı taktirdeSaga.Orchestration.Service
‘in veritabanında ‘OrderSagaData’ya karşılık bir satır oluşturulacak ve distributed iş akışı sürecindeki tüm ilerlemeler adım adım bu satır üzerinden takip ediliyor olacaktır.public class OrderStartedEvent : BaseCorrelation { public OrderStartedEvent() { } public OrderStartedEvent(Guid correlationId) : base(correlationId) { } public int OrderId { get; set; } public int BuyerId { get; set; } public decimal TotalPrice { get; set; } public List<OrderItem> OrderItems { get; set; } }
- OrderCreatedEvent
Sipariş oluşturulduktan ve süreç başlatıldıktan sonraStock.Service
‘i tetikleyebilmek için yayınlanacak event’tir.public class OrderCreatedEvent : BaseCorrelation { public OrderCreatedEvent(Guid correlationId) : base(correlationId) { } public List<OrderItem>? OrderItems { get; set; } }
MassTransit’te ki Başarılı Event‘e karşılık gelmektedir.
- OrderCompletedEvent
Siparişe dair tüm servislerin işlemlerini başarıyla tamamlandığını ve siparişin başarıyla sonlandığını ifade eden event’tir.Saga.Orchestration.Service
, bu event’i yayınladığı taktirdeOrder.Saga
servisi siparişin durumunu Completed‘a çekecektir. MassTransit’te ki Başarılı Event‘e karşılık gelmektedir.public class OrderCompletedEvent { public int OrderId { get; set; } }
Ayrıca bu event’in dikkat ederseniz herhangi bir korelasyon değeri bulunmamaktadır. Çünkü bu event, siparişin sonlandığını ifade edeceğinden dolayı bu bilgiyi içerisindeki ‘OrderId’ ile taşıyacak ve artık korelasyonel bir bağlam içerisinde çalışmaya gerek kalmayacaktır. Bu mantıkla içeriğin devamında tecrübe edileceği üzere, OrderCompletedEvent event’inin yayınlanacağı noktada
Saga.Orchestration.Service
tarafından ilgili siparişe dair saga bilgisi tamamlanacak ve veritabanından kaldırılmış olacaktır. Yani birnevi sipariş saga açısından Finilize edilmiş olacaktır. O noktaya geldiğimizde bunu tekrardan hatırlatacağım. - OrderFailedEvent
Siparişe dair distributed iş akışı sürecinde servislerden herhangi birinde bir başarısızlığın söz konusu olduğunu yani uzun lafın kısası siparişin başarısız olduğunu ifade eden event’tir.public class OrderFailedEvent : BaseCorrelation { public OrderFailedEvent(Guid correlationId) : base(correlationId) { } public int OrderId { get; set; } public string Message { get; set; } }
Bu event
Saga.Orchestration.Service
tarafından yayınlandığı taktirdeOrder.Saga
tarafından ilgili siparişe dair state Fail‘a çekilecektir. MassTransit’te ki Başarsızı Event‘e karşılık gelmektedir. - StockReservedEvent
Siparişe dairStock.Service
‘de ki stok işlemlerinin başarılı olduğunu ifade eden event’tir.public class StockReservedEvent : BaseCorrelation { public StockReservedEvent(Guid correlationId) : base(correlationId) { } public List<OrderItem> OrderItems { get; set; } }
MassTransit’te ki Başarılı Event‘e karşılık gelmektedir.
- StockNotReservedEvent
Siparişe dairStock.Service
‘de ki stok işlemlerinde başarısızlığın olduğunu ifade eden event’tir.public class StockNotReservedEvent : BaseCorrelation { public StockNotReservedEvent(Guid correlationId) : base(correlationId) { } public string Message { get; set; } }
MassTransit’te ki Başarısız Event‘e karşılık gelmektedir.
- PaymentStartedEvent
Stock.Service
işlemini başarıyla tamamladığı taktirdePayment.Service
‘i tetikleyecek ve ödeme sürecini başlatacak olan event’tir.public class PaymentStartedEvent : BaseCorrelation { public PaymentStartedEvent(Guid correlationId) : base(correlationId) { } public decimal TotalPrice { get; set; } public List<OrderItem> OrderItems { get; set; } }
MassTransit’te ki Başarılı Event‘e karşılık gelmektedir.
- PaymentCompletedEvent
Siparişe dairPayment.Service
‘de ödeme işlemlerinin başarılı olduğunu ifade eden event’tir.public class PaymentCompletedEvent : BaseCorrelation { public PaymentCompletedEvent(Guid correlationId) : base(correlationId) { } }
MassTransit’te ki Başarılı Event‘e karşılık gelmektedir.
- PaymentFailedEvent
Siparişe dairPayment.Service
‘de ki ödeme işlemlerinde başarısızlığın olduğunu ifade eden event’tir.public class PaymentFailedEvent : BaseCorrelation { public List<OrderItem> OrderItems { get; set; } public string Messages { get; set; } }
MassTransit’te ki Başarısız Event‘e karşılık gelmektedir.
- StockRollbackMessage
Payment.Service
‘te PaymentFailedEvent event’i yayınlanırsa eğerStock.Service
‘de ki yapılan işlemlerin geri alınmasını sağlayacak olan message’dır.public class StockRollbackMessage : BaseCorrelation { public StockRollbackMessage(Guid correlationId) : base(correlationId) { } public List<OrderItem> OrderItems { get; set; } }
MassTransit’te ki Compensable Event‘e karşılık gelmektedir.
- OrderStartedEvent
- Adım 7 Saga.Orchestration.Service – (OrderSaga İçerisinde Korelasyon İlişkisini Gerçekleştirme)
Event ve message’ları da oluşturduğumuza göre artık 5. adımda bahsi geçen korelasyonel ağı oluşturmaya odaklanabiliriz. Bunun içinSaga.Orchestration.Service
içerisinde oluşturduğumuz ‘OrderSaga’ sınıfındaki CorrelateMessages metodunda aşağıdaki geliştirmeyi gerçekleştirelim.public class OrderSaga : Saga<OrderSagaData> { protected override void CorrelateMessages(ICorrelationConfig<OrderSagaData> config) { config.Correlate<OrderStartedEvent>( orderStartedEvent => orderStartedEvent.CorrelationId, orderSagaData => orderSagaData.CorrelationId); config.Correlate<StockReservedEvent>( stockReservedEvent => stockReservedEvent.CorrelationId, orderSagaData => orderSagaData.CorrelationId); config.Correlate<StockNotReservedEvent>( stockNotReservedEvent => stockNotReservedEvent.CorrelationId, orderSagaData => orderSagaData.CorrelationId); config.Correlate<PaymentCompletedEvent>( paymentCompletedEvent => paymentCompletedEvent.CorrelationId, orderSagaData => orderSagaData.CorrelationId); config.Correlate<PaymentFailedEvent>( paymentFailedEvent => paymentFailedEvent.CorrelationId, orderSagaData => orderSagaData.CorrelationId); } }
Yukarıdaki çalışmayı incelerseniz eğer servislerden
Saga.Orchestration.Service
‘e gelecek olan event’lerin ‘CorrelationId’ değerinin, ‘OrderSagaData’ türünden olan saga data’nın ‘CorrelationId’ ile eşleştirilmesiniCorrelate<T>
metoduyla gerçekleştiriyoruz. Böylece bir sipariş sürecinde distributed iş akışına dair takibin hangi bağlam çerçevesinde olduğunu bu korelasyonel ağ sayesinde ilişkilendirerek sınırlamış oluyoruz.Artık korelasyonel ağı da kurguladığımıza göre artık saga nesnesindeki operasyonlara odaklanabiliriz.
- Adım 8 Saga.Orchestration.Service – (OrderSaga’nın IAmInitiatedBy ve IHandleMessages İmplementasyonlarının Gerçekleştirilmesi)
Sırada sipariş sürecindeki distributed akışın nasıl olacağını handle etmek vardır. Yani saga nesnesine gelen event’lere karşın hangi servise hangi event’in ya da message’ın gönderileceğini ayarlamamız gerekmektedir. Bunun içinSaga.Orchestration.Service
‘de ki saga nesnesinde aşağıdaki çalışmayı yapalım.public class OrderSaga : Saga<OrderSagaData>, IAmInitiatedBy<OrderStartedEvent>, IHandleMessages<StockReservedEvent>, IHandleMessages<StockNotReservedEvent>, IHandleMessages<PaymentCompletedEvent>, IHandleMessages<PaymentFailedEvent> { public Task Handle(OrderStartedEvent message) { ... } public Task Handle(StockReservedEvent message) { ... } public Task Handle(StockNotReservedEvent message) { ... } public Task Handle(PaymentCompletedEvent message) { ... } public Task Handle(PaymentFailedEvent message) { ... } protected override void CorrelateMessages(ICorrelationConfig<OrderSagaData> config) { . . . } }
Burada görüldüğü üzere
IAmInitiatedBy
veIHandleMessages
interface’leri kullanılmaktadır. Bu interface’ler ile saga nesnesine diğer servislerden gelecek olan event ve message’lar tanımlanarak, gerekli aksiyonlar sağlanabilmektedir. Bu interface’lerin definition’larına göz atarsak eğer;public interface IAmInitiatedBy<in TMessage> : IHandleMessages<TMessage> { }
public interface IHandleMessages { } public interface IHandleMessages<in TMessage> : IHandleMessages { Task Handle(TMessage message); }
şeklinde olduklarını görürüz. Yani bu interface’lerin her ikisinin de implementasyonunun da
Task Handle(TMessage message);
imzasına karşın bir metodun uygulatıldığını söyleyebiliriz. Dolayısıyla yukarıdaki çalışmada 9 ile 32. satır aralığındaki Handle metot tanımları bu implementasyonlara karşın oluşturulmuştur.Burada
IAmInitiatedBy
interface’i, saga’nın(yani kurgumuz gereği bir siparişin) başlatılmasını sağlayacak event’i temsil etmektedir. 3. satıra göz atarsanız eğer bu event’in OrderStartedEvent olduğu ifade edilmekte ve böylece bu event yayınlandığı taktirde saga nesnesinin yeni bir saga data oluşturacağı bildirilmiş olmaktadır. Ve bu event’e karşın 9 ile 12. satır aralığındaki ilgili türü handle eden metot tetiklenecektir.IHandleMessages
interface’i ise diğer event yahut message’ları temsil etmektedir. Bu interface ile belirtilen event’lerin yayınlanması durumunda bu saga nesnesi tarafından bunların karşılanabileceğini ifade etmemizi sağlamaktadır. 14 ile 32. satır aralığındaki metotlar ise gelecek event’in türüne uygun olacak şekilde tetiklenecektir. - Adım 9 Saga.Orchestration.Service – (HandleMessage Operasyonlarını Gerçekleştirme)
Şimdi de handle metotlarındaki gerekli çalışmaları gerçekleştirelim.public class OrderSaga(IBus bus) : Saga<OrderSagaData>, IAmInitiatedBy<OrderStartedEvent>, IHandleMessages<StockReservedEvent>, IHandleMessages<StockNotReservedEvent>, IHandleMessages<PaymentCompletedEvent>, IHandleMessages<PaymentFailedEvent> { public async Task Handle(OrderStartedEvent message) { if (!IsNew) return; Data.OrderId = message.OrderId; Data.BuyerId = message.BuyerId; Data.OrderItems = message.OrderItems; Data.TotalPrice = message.TotalPrice; await bus.Send(new OrderCreatedEvent(message.CorrelationId) { OrderItems = message.OrderItems, }); } public async Task Handle(StockReservedEvent message) { await bus.Send(new PaymentStartedEvent(message.CorrelationId) { OrderItems = message.OrderItems, TotalPrice = Data.TotalPrice }); } public async Task Handle(StockNotReservedEvent message) { await bus.Send(new OrderFailedEvent(message.CorrelationId) { Message = message.Message, OrderId = Data.OrderId }); } public async Task Handle(PaymentCompletedEvent message) { await bus.Send(new OrderCompletedEvent() { OrderId = Data.OrderId }); MarkAsComplete(); } public async Task Handle(PaymentFailedEvent message) { await bus.Send(new OrderFailedEvent(message.CorrelationId) { OrderId = Data.OrderId, Message = message.Messages }); await bus.Send(new StockRollbackMessage(message.CorrelationId) { OrderItems = message.OrderItems }); } protected override void CorrelateMessages(ICorrelationConfig<OrderSagaData> config) { . . . } }
Bu adımda yapılan çalışmaları incelemeye öncelikle inject edilen
IBus
referansını vurgulayarak başlayalım. Bizler Rebus ile mesaj gönderim işlemlerini bu referans aracılığıyla kolaylıkla gerçekleştirebiliyoruz.9 ile 23. satır aralığında OrderStartedEvent‘e karşın handle işlemleri yürütülmektedir. Bu event geldiği taktirde öncelikle
IsNew
property’si ile yeni bir saga yani sipariş sürecinin olup olmadığı değerlendirilmekte ve eğer ki bu sipariş daha önce işlendiyse tekrar etmeye gerek görülmemektedir. Yok eğer yeni sipariş durumu söz konusuysa o taktirde ‘OrderSagaData’ türünden saga data’yaData
property’si üzerinden erişilmekte ve başlatıcı event’in değerleri işlenmekte ve ardından OrderCreatedEvent yayınlanmaktadır.25 ile 32. satır aralığında ise StockReservedEvent‘e karşılık PaymentStartedEvent, 34 ile 41. satır aralığında StockNotReservedEvent‘e karşılık OrderFailedEvent, 43 ile 51. satır aralığında PaymentCompletedEvent‘e karşılık OrderCompletedEvent ve son olarak da 53 ile 65. satır aralığında ise PaymentFailedEvent‘e karşılık OrderFailedEvent ile birlikte StockRollbackMessage yayınlanmaktadır.
43 ile 51. satır aralığına tekrar gelirsek eğer burada
MarkAsComplete
metodunun çağrıldığını görmekteyiz. Bu metot 6. adımda OrderCompletedEvent‘i oluştururken bahsettiğim finalize işlemini gerçekleştirmektedir. Yani, ilgili handle metodunda OrderCompletedEvent yayınlandığından dolayı artık siparişin başarılı olduğu kabul edilmekte ve böylece iş akışının tamamlandığı ifade edilmektedir. DolayısıylaMarkAsComplete
metodu çağrılmakta veSaga.Orchestration.Service
‘in veritabanında siparişe dair oluşturulmuş olan saga data satırı kaldırılmaktadır. - Adım 10 Shared – (Queue’ların ve AMQP Url’in Yapılandırılması)
Şimdi de tüm servislerin iletişim süreçlerinde handle edecekleri mesajları hangi queue’lardan alacaklarını yapılandıralım. Bunun içinShared
katmanında aşağıdaki sınıfı oluşturabiliriz.public static class RabbitMQQueueSettings { public const string OrderInputQueueName = "order-input-queue-name"; public const string StockInputQueueName = "stock-input-queue-name"; public const string PaymentInputQueueName = "payment-input-queue-name"; public const string SagaOrchestrationInputQueueName = "saga-orchestration-input-queue-name"; }
Görüldüğü üzere
Saga.Orchestration.Service
‘de dahil tüm servislerin queue isimlendirmesini RabbitMQQueueSettings sınıfında ayarlamış bulunuyoruz.Queue isimlerinin yanında ayrıca servisler arası iletişim sürecinde kullanacağımız RabbitMQ transporter’ının AMQP Url’ine tüm servislerin erişebileceği şekilde bir tanımlamada bulunmamız gerekmektedir. Evet, bu işlem için her servisin appsettings.json dosyasından istifade edebiliriz ki doğrusu da budur. Çünkü production ortamında bu tarz değerler elbet environment’tan yapılandırılacaktır. Haliyle bunda mutabıkız. Lakin bizler makalemizin hacmini şişirmemek için bu değeri de yine
Shared
katmanında oluşturduğumuz aşağıdaki RabbitMQAMQPUrl sınıfın da tutuyor olacak ve bu best practice’i bilinçli bir şekilde göz ardı ediyor olacağız.public class RabbitMQAMQPUrl { public const string AMQPUrl = "amqps://irresudw:slx6TGLxuMAthCz4X61dYE6Crr-Vx7lx@chimpanzee.rmq.cloudamqp.com/irresudw"; }
Evet, artık bundan sonraki adımlarda bu yapılandırma değerlerini kullanarak servislerimizi konfigüre edebiliriz.
- Adım 11 Saga.Orchestration.Service – (Program.cs Yapılandırması ve Veritabanının Oluşturulması)
İlk konfigüre edeceğimiz servisSaga.Orchestration.Service
‘dir. Bunun için ilgili servisin ‘Program.cs’ dosyasında aşağıdaki çalışmayı yapalım.var builder = Host.CreateApplicationBuilder(args); builder.Services.AddRebus(rebus => rebus .Routing(r => r.TypeBased() .Map<OrderCreatedEvent>(RabbitMQQueueSettings.StockInputQueueName) .Map<PaymentStartedEvent>(RabbitMQQueueSettings.PaymentInputQueueName) .Map<OrderFailedEvent>(RabbitMQQueueSettings.OrderInputQueueName) .Map<OrderCompletedEvent>(RabbitMQQueueSettings.OrderInputQueueName) .Map<StockRollbackMessage>(RabbitMQQueueSettings.StockInputQueueName)) .Transport(t => t.UseRabbitMq(RabbitMQAMQPUrl.AMQPUrl, inputQueueName: RabbitMQQueueSettings.SagaOrchestrationInputQueueName)) .Sagas(s => s.StoreInSqlServer("Server=localhost, 1433;Database=SagaOrchestrationRebusDB;User ID=SA;Password=1q2w3e4r+!;TrustServerCertificate=True", dataTableName: "Sagas", indexTableName: "SagaIndexes", automaticallyCreateTables: true)) .Timeouts(t => t.StoreInSqlServer("Server=localhost, 1433;Database=SagaOrchestrationRebusDB;User ID=SA;Password=1q2w3e4r+!;TrustServerCertificate=True", tableName: "Timeouts", automaticallyCreateTables: true))); builder.Services.AutoRegisterHandlersFromAssemblyOf<Program>(); var host = builder.Build(); host.Run();
Yapılan çalışmayı incelersek eğer; 3. satırda
AddRebus
fonksiyonunu kullanarak Rebus servisini uygulamaya dahil ediyoruz. 4 ile 10. satır aralığındaRouting
metodu ile türüne göre mesajların hangi kuyruklara yönlendirileceğini belirliyoruz. BuradaTypeBased
metodu ile yönlendirme stratejisi olarak mesaj türüne dayalı bir davranışı belirleyeceğimizi ifade etmiş oluyoruz. AyrıcaMap
fonksiyonu ile de belirtilen mesaj türünü önceden ayarladığımız kuyruk adıyla eşleştiriyoruz ki böylece bu servis tarafından, o türden mesajın hangi kuyruktan geleceğini ve tüketebileceğini yapılandırmış oluyoruz. Tabi burada tüm mesaj türlerine göre ayrı ayrı yapılandırma da bulunmaktansaMapAssemblyOf
metoduyla da aşağıdaki gibi tüm türlere karşın assembly bazlı kümülatif bir tanımda bulunabilirdik.. . . builder.Services.AddRebus(rebus => rebus .Routing(r => r.TypeBased() .MapAssemblyOf<BaseCorrelation>("..queue-name..") . . .
Velhasıl, 11. satırdaki
Transport
metodu ile de bu servisin kullanacağı transporter’ı belirtiyor ve 12. satırdaUseRabbitMq
metodu ile RabbitMQ’ya göre gerekli yapılandırmayı sağlıyoruz. Ayrıca bu servisin mesaj gönderirken hangi kuyruğa göndereceğini deinputQueueName
parametresine verilen queue değeriyle belirtiyoruz. YaniSaga.Orchestration.Service
bir message’ı send ya da publish ettiğinde bu, burada belirtilen kuyruğa gönderilmiş olacaktır.14. satırda
Sagas
metodu ile de Rebus kütüphanesinin servisler arasındaki iletişim sürecinde iş akışının durumunu nerede saklayacağını belirtiyoruz.Ve son olarak 19. satırda çağırdığımız
Timeouts
metodu ile de Rebus’ın timeout süreçlerine dair kayıtların nerede tutulacağını yapılandırıyoruz.Burada
Sagas
veTimeouts
işlemleri için gerekli olan veritabanı yapılarıautomaticallyCreateTables
parametresi sayesinde servis ayağa kaldırıldığı an otomatik olarak oluşturulacaktır. Tabi veritabanını bizlerin oluşturması gerekmektedir. Kaynaktaki connection string değerine bakarsanız veritabanı ismineSagaOrchestrationRebusDB
verdiğimizi görürsünüz. Haliyle şimdi bu isimde bir veritabanını oluşturalım ve ardından servisimizi ayağa kaldırıp gerekli tablo yapılanmasını generate ettirelim.CREATE DATABASE SagaOrchestrationRebusDB
Görüldüğü üzere oluşturduğumuz veritabanı içerisine ilgili tablo yapısı oluşturulmuştur. Şimdi oluşturulan bu tabloları inceleyelim.
- Adım 12 Order.API – (Program.cs Yapılandırması)
Sıradaki konfigüre edeceğimiz servisimizOrder.API
olacaktır. Bunun için ilgili servisin ‘Program.cs’ dosyasında aşağıdaki çalışmayı yapalım.. . . builder.Services.AddRebus(rebus => rebus .Routing(r => r.TypeBased().MapAssemblyOf<BaseCorrelation>(RabbitMQQueueSettings.SagaOrchestrationInputQueueName)) .Transport(t => t.UseRabbitMq(RabbitMQAMQPUrl.AMQPUrl, inputQueueName: RabbitMQQueueSettings.OrderInputQueueName))); builder.Services.AutoRegisterHandlersFromAssemblyOf<Program>(); . . .
Görüldüğü üzere
Order.API
‘a tüm mesajlarınSaga.Orchestration.Service
‘in mesaj göndereceği kuyruktan yani kısacaSaga.Orchestration.Service
‘ten geleceğini bildirmiş bulunmaktayız. ÇünküOrder.API
ve diğer tüm servisler sade ve sadeceSaga.Orchestration.Service
‘ten mesaj alacak ya da ona mesaj gönderecektir. Ayrıca bu servisin göndereceği mesajları da,inputQueueName
parametresine bakarsakRabbitMQQueueSettings.OrderInputQueueName
değerine karşılık gelen kuyruğa göndereceğini ifade etmiş bulunmaktayız. Haliyle buradaki yapılandırma mantığını diğer servislerde de birebir görüyor olacağız.AutoRegisterHandlersFromAssemblyOf
metodu ise sonraki adımlarda oluşturacağımız handler sınıflarının assembly seviyesinde otomatik olarak yakalayıp, register edilmesini sağlamaktadır. Ha tabi, her ne kadar bu senaryodaOrder.API
‘da bir handler sınıfı oluşturmayacak olsak da Rebus ile yapılan çalışmalarda handler yapılanmalarının aktivite edilmesi için bu metodun çağrılması elzemdir ve sonraki süreçlerde oluşturma ihtimaline nazaran uygulama seviyesinde önceden çağrılıp hazır bir şekilde tutulması kâfidir. - Adım 13 Order.API – (Sipariş Oluşturma Sürecinde OrderStartedEvent’in Yayınlanması)
Şimdi de 2. adımdaOrder.API
‘da geliştirdiğimiz/create-order
minimal api’sinde aşağıdaki gibi OrderStartedEvent‘i yayınlayalım ve distributed iş akışı sürecini başlatalım.. . . app.MapPost("/create-order", async (IBus bus, ApplicationDbContext context, CreateOrderVM model) => { . . . OrderStartedEvent orderStartedEvent = new() { OrderId = order.Id, BuyerId = model.BuyerId, TotalPrice = order.TotalPrice, OrderItems = model.OrderItems.Select(oi => new Shared.Datas.OrderItem { Count = oi.Count, Price = oi.Price, ProductId = oi.ProductId }).ToList(), CorrelationId = Guid.NewGuid(), }; await bus.Send(orderStartedEvent); return Results.Created(); }); app.Run();
- Adım 14 Stock.Service – (Program.cs Yapılandırması)
Stock.Service
‘de de önceki adımlarda anlatılan mantıkta yapılandırmayı aşağıdaki gibi sağlayalım.. . . builder.Services.AddRebus(rebus => rebus .Routing(r => r.TypeBased().MapAssemblyOf<BaseCorrelation>(RabbitMQQueueSettings.SagaOrchestrationInputQueueName)) .Transport(t => t.UseRabbitMq(RabbitMQAMQPUrl.AMQPUrl, inputQueueName: RabbitMQQueueSettings.StockInputQueueName))); builder.Services.AutoRegisterHandlersFromAssemblyOf<Program>(); . . .
- Adım 15 Stock.Service – (OrderCreatedEventHandler ve StockRollbackMessageHandler’ın Oluşturulması)
Stock.Service
‘i yapılandırdıktan sonra gerekli handler sınıflarını aşağıdaki gibi oluşturalım.public class OrderCreatedEventHandler(MongoDBService mongoDBService, IBus bus) : IHandleMessages<OrderCreatedEvent> { public async Task Handle(OrderCreatedEvent message) { List<bool> stockResults = new(); var stockCollection = mongoDBService.GetCollection<Models.Stock>(); foreach (var orderItem in message.OrderItems) stockResults.Add(await (await stockCollection.FindAsync(s => s.ProductId == orderItem.ProductId && s.Count >= (long)orderItem.Count)).AnyAsync()); if (stockResults.TrueForAll(s => s.Equals(true))) { foreach (var orderItem in message.OrderItems) { var stock = await (await stockCollection.FindAsync(s => s.ProductId == orderItem.ProductId)).FirstOrDefaultAsync(); stock.Count -= orderItem.Count; await stockCollection.FindOneAndReplaceAsync(x => x.ProductId == orderItem.ProductId, stock); } StockReservedEvent stockReservedEvent = new(message.CorrelationId) { OrderItems = message.OrderItems }; await bus.Send(stockReservedEvent); } else { StockNotReservedEvent stockNotReservedEvent = new(message.CorrelationId) { Message = "Stok yetersiz...." }; await bus.Send(stockNotReservedEvent); } } }
public class StockRollbackMessageHandler(MongoDBService mongoDBService) : IHandleMessages<StockRollbackMessage> { public async Task Handle(StockRollbackMessage message) { var stockCollection = mongoDBService.GetCollection<Models.Stock>(); foreach (var orderItem in message.OrderItems) { var stock = await (await stockCollection.FindAsync(x => x.ProductId == orderItem.ProductId)).FirstOrDefaultAsync(); stock.Count += orderItem.Count; await stockCollection.FindOneAndReplaceAsync(x => x.ProductId == orderItem.ProductId, stock); } } }
Yukarıdaki kodları incelerseniz eğer gerekli stok işlemleri gerçekleştirilmekte yahut compensable transaction operasyonu yürütülmektedir.
- Adım 16 Payment.Service – (Program.cs Yapılandırması)
Aynı mantıktaPayment.Service
‘de de ‘Program.cs’ yapılandırmasını gerçekleştirelim.. . . builder.Services.AddRebus(rebus => rebus .Routing(r => r.TypeBased().MapAssemblyOf<BaseCorrelation>(RabbitMQQueueSettings.SagaOrchestrationInputQueueName)) .Transport(t => t.UseRabbitMq(RabbitMQAMQPUrl.AMQPUrl, inputQueueName: RabbitMQQueueSettings.PaymentInputQueueName))); builder.Services.AutoRegisterHandlersFromAssemblyOf<Program>(); . . .
- Adım 17 Payment.Service – (PaymentStartedEventHandler’ın Oluşturulması)
Ve ödemeyle ilgili işlemleri başlatacak olan PaymentStartedEvent‘e karşın PaymentStartedEventHandler‘ı oluşturalım.public class PaymentStartedEventHandler(IBus bus) : IHandleMessages<PaymentStartedEvent> { public async Task Handle(PaymentStartedEvent message) { //Ödeme başarılı... await bus.Send(new PaymentCompletedEvent(message.CorrelationId)); } }
Burada farazi bir ödeme işleminin yapıldığını varsayarak yolumuza devam edelim.
- Adım 19 Order.Saga – (Program.cs Yapılandırması)
Ve son olarakOrder.Saga
servisinde de ‘Program.cs’ yapılandırmasını gerçekleştirelim.. . . builder.Services.AddRebus(rebus => rebus .Routing(r => r.TypeBased().MapAssemblyOf<BaseCorrelation>(RabbitMQQueueSettings.SagaOrchestrationInputQueueName)) .Transport(t => t.UseRabbitMq(RabbitMQAMQPUrl.AMQPUrl, inputQueueName: RabbitMQQueueSettings.OrderInputQueueName))); builder.Services.AutoRegisterHandlersFromAssemblyOf<Program>(); . . .
- Adım 18 Order.Saga – (OrderCompletedEventHandler ve OrderFailedEventHandler’ın Oluşturulması)
Ve ardından siparişin nihai durumunu belirleyecek olan handler’ları oluşturalım.public class OrderCompletedEventHandler(ApplicationDbContext applicationDbContext) : IHandleMessages<OrderCompletedEvent> { public async Task Handle(OrderCompletedEvent message) { Order.Saga.Models.Entities.Order order = await applicationDbContext.Orders.FindAsync(message.OrderId); if (order != null) { order.OrderStatus = Models.Enums.OrderStatus.Completed; await applicationDbContext.SaveChangesAsync(); } } }
public class OrderFailedEventHandler(ApplicationDbContext applicationDbContext) : IHandleMessages<OrderFailedEvent> { public async Task Handle(OrderFailedEvent message) { Order.Saga.Models.Entities.Order order = await applicationDbContext.Orders.FindAsync(message.OrderId); if (order != null) { order.OrderStatus = Order.Saga.Models.Enums.OrderStatus.Fail; await applicationDbContext.SaveChangesAsync(); } } }
Burada dikkat edilmesi gereken nokta şudur ki,
Order.Saga
servisinde yapılan veritabanı çalışmalarındaOrder.API
‘dakine benzer context ve entity yapısı inşa edilmiş ve IoC Container’a provide edilmiştir. Tabi tekrara girmemek ve içeriği haddinden fazla uzatmamak için bu kısımlar es geçilmiştir. Yapılan çalışmaların tam teferruatlı içeriğine makalenin sonunda paylaşılan github linki üzerinden gözlemde bulunabilirsiniz. - Adım 19 Test
Artık yapılan bunca çalışmanın testini gerçekleştirebiliriz. Bunun için servislerimizi derleyip ayağa kaldıralım ve aşağıdaki json data eşliğinde bir sipariş talebinde bulunalım.{ "buyerId": 1, "orderItems": [ { "productId": 1, "count": 10, "price": 5 }, { "productId": 2, "count": 30, "price": 50 }, { "productId": 3, "count": 5, "price": 5 } ] }
Bu işlem neticesinde sürecin aşağıdaki görseldeki gibi olduğunu göreceğiz.Ayrıca yeri gelmişken bahsetmekte fayda görmekteyim ki, distributed iş akışı sürecinde Sagas tablosunun ‘data’ kolonundaki binary veriyi aşağıdaki sorguyla metinsele dönüştürebilir ve içeriğine dair gözlemde bulunabilirsiniz.
SELECT CONVERT(VARCHAR(MAX), data) FROM SagaOrchestrationRebusDB.dbo.Sagas
{“$type”:”Saga.Orchestration.Service.SagaDatas.OrderSagaData, Saga.Orchestration.Service”,”Id”:”7e630713-c610-4c9b-8a38-03f61f3060a8″,”Revision”:1,”OrderId”:5,”BuyerId”:1,”TotalPrice”:1825.0,”OrderItems”:{“$type”:”System.Collections.Generic.List`1[[Shared.Datas.OrderItem, Shared]], System.Private.CoreLib”,”$values”:[{“$type”:”Shared.Datas.OrderItem, Shared”,”ProductId”:1,”Count”:10,”Price”:5.0},{“$type”:”Shared.Datas.OrderItem, Shared”,”ProductId”:2,”Count”:30,”Price”:50.0},{“$type”:”Shared.Datas.OrderItem, Shared”,”ProductId”:4,”Count”:55,”Price”:5.0}]},”CorrelationId”:”c5d0b313-2bf4-4081-935f-72fa0c9895f0″}
İşte bu kadar 🙂
Nihai olarak;
Saga Orchestration implementasyonunu Rebus kütüphanesiyle, test adımı da dahil olmak üzere 19 adımda uygulamış ve MassTransit’e nazaran konuya dair alternatif bir yaklaşımı tecrübe etmiş bulunmaktayız. Nacizane fikrim, Rebus kütüphanesi MassTransit’e karşın yapılandırma açısından düşük bir komplikasyona sahip olsa da onun kadar geniş bir yelpazede entegrasyon imkanı sağlamamakta ve küçük yahut orta ölçekli uygulamalarda daha pratik çözümler sunmaktadır.
İlgilenenlerin faydalanması dileğiyle…
Sonraki yazılarımda görüşmek üzere…
İyi çalışmalar…
Not : Örnek çalışmaya aşağıdaki github adresinden erişebilirsiniz.
https://github.com/gncyyldz/Saga.Orchestration.Example.With.Rebus