Derinlemesine yazılım eğitimleri için kanalımı takip edebilirsiniz...

.NET’te Rebus Kütüphanesiyle Saga Orchestration Yaklaşımının Uygulanması

Merhaba,

Malumunuz, microservice mimarisiyle tasarlanmış yazılımlarımızda kullanıcılar tarafından başlatılan işlem ya da görevlerin, bir servis tarafından alınıp ardından belirli servisler üzerinde mantıksal bir tetikleme silsilesiyle işlenme ve koordine edilme süreçlerini yürütmemize ve biryandan da bu süreçlerde data consistency’i(veri tutarlılığı) sağlıklı bir şekilde sağlayabilmemize imkan tanıyan Saga pattern’ını kullanmakta ve özellikle bu pattern’ın uygulandığı senaryolar da servis sayısının adil kotadan fazla olduğu durumlarda uygulanma zorluğu ve geliştirme maliyeti eğrisini düşürebilmemizi sağlayacak olan Orchestration yaklaşımını tercih etmekteyiz.

Eğer ki konuya dair bilgi sahibi olmaksızın direkt olarak bu içeriğe merak saldıysanız öncelikle Saga pattern’ının ne olduğunu anlayabilmek için Microservice Mimarilerde Saga Pattern İle Transaction Yönetimi başlıklı makaleyi, ardından da Orchestration implementasyonunu ele aldığımız Microservice – Saga – Commands/Orchestration Implementasyonu İle Transaction Yönetimi başlıklı içeriği okumanız gerekmektedir. Ha ayriyeten Microservice mimarisinde Saga’nın bir diğer alternatif yaklaşımı olan Choreography implementasyonunu da Microservice – Saga – Events/Choreography Implementasyonu İle Transaction Yönetimi başlıklı makaleden inceleyerek olayın dibini sıyırabilir ve bu içerikten istifade edebilecek altyapıyı kusursuz bir hale getirebilirsiniz.

Velhasıl, eğer ki konunun terminolojisine tam teferruatlı bir hakimiyet sağladıysanız bu içeriğimizde Orchestration yaklaşımının Rebus kütüphanesiyle farklı bir tekniğini pratik ağırlıklı olarak ele alacağımızı düşünebilirsiniz.

Rebus Nedir?

Rebus, distributed sistemlerde servisler arası asenkron mesajlaşma için kullanılan bir mesajlaşma kütüphanedir. Azure Service Bus, RabbitMQ, SQL Server, InMemory vs. gibi mesaj taşıma görevinden sorumlu olan transporter isimli birçok sistemi desteklemektedir. Ayrıca, belirli bir düzen içinde çalışan iş adımlarını veya microservice’lerin görev gerçekleştirme süreçlerini yönetmek için de kullanılabilmektedir.

Rebus İle Orchestration İmplementasyonu

Bunu direkt örnek bir senaryo üzerinden adım adım inceliyor olacağız. Senaryomuz temsilen, klasik e-ticaret sistemlerindeki sipariş süreçlerine karşın olacaktır. Senaryo sürecinde aşağıdaki servislerden istifade ediyor olacağız;

Servis Türü Açıklama
Order.API Web API Kullanıcı tarafından başlatılan siparişin alınmasını ve siparişe dair stok ve ödeme sürecinin başlatılmasını sağlayacak olan servistir.
Order.Saga Worker Service İlgili siparişe dair servisler arasındaki adım adım işleyiş neticesinin yürütüldüğü handler’ların barındırılacağı servistir. Düşündüğünüz zaman bu servisin sorumluluğunu direkt Order.API’ın alabileceğine ve sanki lüzumsuz olduğuna kanaat getirebilirsiniz. Ancak, Order.API’ın kullanıcıyla etkileşime girmekten başka mesaj tüketme sorumluluğunu üstlendirmek ilkelere aykırı olacağından dolayı bu sorumluluğu farklı bir servise yüklemek best practice açısından daha uygun olacaktır.
Stock.Service Worker Service Stok işlemlerinin yürütüleceği servistir.
Payment.Service Worker Service Ödeme işlemlerinin yürütüleceği servistir.
Shared Class Library Servisler arasında kullanılacak tüm ortak message ve event yapılanmalarını ve bunların yanında belli başlı queue isimleri gibi konfigürasyonları barındıracak olan ortak bir katmandır.
Saga.Orchestration.Service Worker Service Servisler arasındaki iş ve görev sürecini merkezi bir noktadan yönetmek üzere tasarlanacak olan ve böylece Orchestration yaklaşımını sağlayacak olan servisin ta kendisidir.

Ve bu servisler arasındaki iş akışı da aşağıdaki gibi olacaktır;

  1. Kullanıcı Order.API üzerinden bir sipariş talebinde bulunacak ve süreci başlatacaktır.
  2. Order.API siparişi veritabanına ekleyecek ve bunu Saga.Orchestration.Service‘e bildirecektir.
  3. Saga.Orchestration.Service ise oluşturulan siparişe dair Stock.Service‘e bilgide bulunacaktır. Stock.Service siparişteki ürünlere göre stok çalışması gerçekleştirilecektir. Stok işlemleri başarılı ya da başarısız olmak üzere sonuçlanacak ve neticeye göre Saga.Orchestration.Service‘e bildiride bulunulacaktır.
    1. Eğer ki, Stock.Service siparişe dair stok işlemlerini başarıyla gerçekleştirdiyse o taktirde Payment.Service‘e ödeme işlemleri için bilgide bulunacaktır.
    2. Yok eğer başarısız bir durum söz konusuysa işte o taktirde Order.Saga servisine siparişin başarıyla sonuçlanamadığına dair bilgi mesaj gönderecek ve ilgili sipariş iptal edilecektir.
  4. Payment.Service‘te ise ilgili siparişe dair ödeme başarılıysa eğer bunun için Saga.Orchestration.Service‘e bildiride bulunulacak ve o da, Order.Saga‘ya siparişin başarıyla sonuçlandığına dair mesaj yayınlayacaktır.
  5. Yok eğer ödeme gerçekleştirilemediyse hem siparişin iptal edilmesi hem de Stock.Service‘te ilgili siparişe dair yapılan stok işlemlerinin geri alınması için Saga.Orchestration.Service‘e bildiride bulunacak, o da, siparişi iptal etmesi için Order.Saga‘ya ve yapılan stok çalışmalarının geri alınması için de Stock.Service‘e mesaj yollayacaktır.
İlk Adım

Her şeyden önce yukarıda oluşturduğumuz projelere ilk adım olarak aşağıdaki kütüphanelerin yüklenmesi gerekmektedir;

Kazmayı Vuralım

Projelere ilgili kütüphaneleri yüklediğimize göre artık genel senaryonun inşasına başlayabiliriz. Bunun için aşağıdaki adımları sırasıyla seyretmemiz yeterli olacaktır kanaatindeyim…

  • Adım 1 Order.API – (Order Entity’sinin Oluşturulması, Veritabanı Altyapısının Kurgulanması ve Migration İşlemleri)
    İlk olarak kullanıcı tarafından verilecek olan siparişin Order.API tarafında altyapısını kurgulamamız gerekmektedir. Bunun için aşağıdaki ‘Order’ entity’sinden istifade edebiliriz.

        public class Order
        {
            public int Id { get; set; }
            public int BuyerId { get; set; }
            public List<OrderItem> OrderItems { get; set; }
            public OrderStatus OrderStatus { get; set; }
            public DateTime CreatedDate { get; set; }
            public decimal TotalPrice { get; set; }
        }
    

    Tabi bu entity içerisindeki ‘OrderItem’ ve ‘OrderStatus’ türleri de aşağıdaki gibi olacaktır.

        public class OrderItem
        {
            public int Id { get; set; }
            public int ProductId { get; set; }
            public int Count { get; set; }
            public decimal Price { get; set; }
        }
    
        public enum OrderStatus
        {
            Suspend,
            Completed,
            Fail
        }
    

    Sipariş entity’sini oluşturduğumuza göre şimdi sırada DbContext nesnesini oluşturmamız gerekmektedir.

        public class ApplicationDbContext : DbContext
        {
            public ApplicationDbContext(DbContextOptions options) : base(options)
            {
            }
    
            public DbSet<Order.API.Models.Entities.Order> Orders { get; set; }
            public DbSet<OrderItem> OrderItems { get; set; }
        }
    

    Ardından oluşturulan bu yapıyı uygulamanın ‘Program.cs’ dosyasında aşağıdaki gibi yapılandırmamız gerekmektedir.

    var builder = WebApplication.CreateBuilder(args);
    .
    .
    .
    builder.Services.AddDbContext<ApplicationDbContext>(options =>
        options.UseSqlServer(builder.Configuration.GetConnectionString("SQLServer")));
    
    var app = builder.Build();
    .
    .
    .
    app.Run();
    

    Ve son olarak da add-migration mig_1 talimatıyla migration basalım ve
    update-database talimatıyla da migrate ederek veritabanını sunucuda oluşturalım..NET'te Rebus Kütüphanesiyle Saga Orchestration Yaklaşımının Uygulanması(Not: Bu adımı uygularken süreçte ihtiyaç olabilecek bazı kütüphanelerin yüklendiği varsayılmıştır. Bu kütüphaneler; EF Core için Microsoft.EntityFrameworkCore, EF Core’da SQL Server’ın kullanılabilmesi için Microsoft.EntityFrameworkCore.SqlServer ve package manager console üzerinden migration işlemlerinin yürütülebilmesi için Microsoft.EntityFrameworkCore.Tools kütüphaneleridir.)

  • Adım 2 Order.API – (Siparişin Oluşturulması)
    Kullanıcıdan gelecek olan sipariş için temel altyapıyı kurguladığımıza göre artık siparişin oluşturulmasına odaklanabiliriz. Bunun için öncelikle kullanıcıdan siparişin bilgilerini alabileceğimiz aşağıdaki viewmodel nesnelerini oluşturalım.

        public class CreateOrderVM
        {
            public int BuyerId { get; set; }
            public List<CreateOrderItemVM> OrderItems { get; set; }
        }
    
        public class CreateOrderItemVM
        {
            public int ProductId { get; set; }
            public int Count { get; set; }
            public decimal Price { get; set; }
        }
    

    Ardından siparişin oluşturulmasını sağlayacak olan endpoint’i aşağıdaki gibi oluşturalım.

    var builder = WebApplication.CreateBuilder(args);
    .
    .
    .
    var app = builder.Build();
    .
    .
    .
    app.MapPost("/create-order", async (ApplicationDbContext context, CreateOrderVM model) =>
    {
        Order.API.Models.Entities.Order order = new()
        {
            BuyerId = model.BuyerId,
            OrderItems = model.OrderItems.Select(oi => new OrderItem
            {
                Count = oi.Count,
                Price = oi.Price,
                ProductId = oi.ProductId
            }).ToList(),
            OrderStatus = OrderStatus.Suspend,
            TotalPrice = model.OrderItems.Sum(oi => oi.Count * oi.Price),
            CreatedDate = DateTime.Now
        };
    
        await context.Orders.AddAsync(order);
        await context.SaveChangesAsync();
        return Results.Created();
    });
    
    app.Run();
    

    Görüldüğü üzere siparişe dair distributed sürecin tamamlanmasını bekleyeceğimizden dolayı ilk etapta Suspend olarak oluşturmaktayız.

  • Adım 3 Stock.Service – (MongoDB Altyapısının Kurgulanması ve Ürün Stok Bilgilerine Dair Dummy Dataların Eklenmesi)
    Gelecek olan siparişin stok işlemlerine dair de Stock.Service‘de bir altyapı oluşturmamız gerekmektedir. Bu altyapı içeriği zenginleştirmek maksadıyla MongoDB üzerine kurgulanmış olacaktır. Bunun için öncelikle aşağıdaki ‘MongoDBService’i oluşturalım.

        public class MongoDBService
        {
            readonly IMongoDatabase _database;
            readonly IConfiguration _configuration;
            public MongoDBService(IConfiguration configuration)
            {
                _configuration = configuration;
                MongoClient client = new(_configuration["MongoDB:Server"]);
                _database = client.GetDatabase(_configuration["MongoDB:DatabaseName"]);
            }
            public IMongoCollection<T> GetCollection<T>() => _database.GetCollection<T>(typeof(T).Name.ToLowerInvariant());
        }
    

    Tabi bu servisi oluşturabilmek için MongoDB.Driver kütüphanesinin ilgili servise yüklenmesi gerekmektedir.

    Bağlantı sağlayacağımız MongoDB sunucusu bilgilerini de aşağıdaki gibi appsettings.json dosyasına kaydedelim.

      "MongoDB": {
        "Server": "mongodb://localhost:27017",
        "DatabaseName": "SagaOrchestrationRebusDB"
      }
    

    Ve ardından ilgili servisi, projenin ‘Program.cs’ dosyası üzerinden aşağıdaki gibi yapılandırarak uygulamanın IoC Container’ına ekleyelim.

    var builder = Host.CreateApplicationBuilder(args);
    .
    .
    .
    builder.Services.AddSingleton<MongoDBService>();
    
    var host = builder.Build();
    .
    .
    .
    host.Run();
    

    Devamında ise MongoDB veritabanında stok bilgilerini tutacak aşağıdaki entity’i oluşturalım.

        public class Stock
        {
            [BsonRepresentation(BsonType.ObjectId)]
            [BsonId]
            [BsonElement(Order = 0)]
            public ObjectId Id { get; set; }
            [BsonRepresentation(BsonType.Int64)]
            [BsonElement(Order = 1)]
            public int ProductId { get; set; }
            [BsonRepresentation(BsonType.Int64)]
            [BsonElement(Order = 2)]
            public int Count { get; set; }
        }
    

    Ve bu entity eşliğinde yukarıda yapılandırdığımız servis sayesinde ürünlerin stok bilgilerini barındıran dummy dataları oluşturalım.

    var builder = Host.CreateApplicationBuilder(args);
    .
    .
    .
    builder.Services.AddSingleton<MongoDBService>();
    
    var host = builder.Build();
    
    using var scope = builder.Services.BuildServiceProvider().CreateScope();
    var mongoDbService = scope.ServiceProvider.GetRequiredService<MongoDBService>();
    if (!await (await mongoDbService.GetCollection<Stock.Service.Models.Stock>().FindAsync(x => true)).AnyAsync())
    {
        mongoDbService.GetCollection<Stock.Service.Models.Stock>().InsertOne(new()
        {
            ProductId = 1,
            Count = 200
        });
        mongoDbService.GetCollection<Stock.Service.Models.Stock>().InsertOne(new()
        {
            ProductId = 2,
            Count = 300
        });
        mongoDbService.GetCollection<Stock.Service.Models.Stock>().InsertOne(new()
        {
            ProductId = 3,
            Count = 50
        });
        mongoDbService.GetCollection<Stock.Service.Models.Stock>().InsertOne(new()
        {
            ProductId = 4,
            Count = 10
        });
        mongoDbService.GetCollection<Stock.Service.Models.Stock>().InsertOne(new()
        {
            ProductId = 5,
            Count = 60
        });
    }
    
    host.Run();
    

    Artık bu çalışmalar eşliğinde Stock.Service‘in bir kere ayağa kaldırılması hem veritabanının hem de dummy dataların oluşturulması açısından yeterli olacaktır..NET'te Rebus Kütüphanesiyle Saga Orchestration Yaklaşımının Uygulanması

  • Adım 4 Shared – (BaseCorrelation Sınıfının Oluşturulması)
    İçeriğimizin sonraki adımlarında Rebus ile Saga Orchestration davranışını uygularken geliştireceğimiz merkezi Saga yapılanmasında, servislerden gelecek olan mesajları eşleştirmemizi ve bu mesajların aynı iş akışı içerisinde belirli bir siparişe karşılık ilişkilendirmemizi sağlayacak olan korelasyon yapısını oluşturmamız gerekmektedir. Bu yapı, herhangi bir değer olabilir. Usul gereği mümkün mertebe unique olmasına özen gösterilmesi gerekeceğinden dolayı biz bu ihtiyaca istinaden ‘BaseCorrelation’ sınıfı ile çözüm getiriyor olacağız. Haliyle ilgili sınıfın içeriğini Shared class library’sinde aşağıdaki gibi oluşturalım;

        [Serializable]
        public class BaseCorrelation
        {
            public BaseCorrelation() { }
            public BaseCorrelation(Guid correlationId)
            {
                CorrelationId = correlationId;
            }
            [DataMember]
            public Guid CorrelationId { get; set; }
        }
    

    Görüldüğü üzere ‘BaseCorrelation’ sınıfı içerisinde ‘Guid’ türünden bir değer alınmakta ve bu ‘CorrelationId’ property’sinde saklanmaktadır. Böylece sistemde oluşturulacak tüm message ve event yapılanmalarını bu sınıftan türeterek, aynı iş akışındaki farklı olayları ve mesajları, ortak bir bağlam bilgisi olması amacıyla üretilmiş olan korelasyon değeri üzerinden eşleştirecek ve tek bir siparişe karşın olay zincirinin bağını kurguluyor olacağız.

  • Adım 5 Saga.Orchestration.Service – (Saga Orchestration Altyapısının Kurgulanması)
    Rebus kütüphanesiyle yapılan Orchestration operasyonunda tıpkı MassTransit ile yapmış olduğumuz çalışmada olduğu gibi belli başlı davranışlara sahip nesneler senaryoda sorumluluk üstlenmekte ve haliyle öncelikle bu nesnelerin üretilmesi gerekmektedir. Şimdi gelin bu nesneleri tek tek izah ederek ilerleyelim;

    • Saga Data Nesnesi
      Orchestration operasyonu sürecinde distributed bir şekilde takip edilen akışın veri formatını temsil eden bir nesnedir. Bizim örnek senaryomuzda bu bir sipariş modeline karşılık gelmekte ve bir siparişle ilgili servisler arası iletişim sürecindeki verilerin gelişimini takip etmemizi sağlayacak olan bilgileri barındıracak olan bir nesneyi ifade etmektedir. Böylece bizler saga data nesnesi sayesinde distributed akıştaki olgunun state’ini tutabilmekte ve yönetebilmekteyiz.

      Bir sınıfın saga data sınıfı olabilmesi için ISagaData interface’inden implemente edilmesi gerekmektedir.

          public class OrderSagaData : BaseCorrelation, ISagaData
          {
              public Guid Id { get; set; }
              public int Revision { get; set; }
              public int OrderId { get; set; }
              public int BuyerId { get; set; }
              public decimal TotalPrice { get; set; }
              public List<OrderItem> OrderItems { get; set; }
          }
      

      Yukarıda görüldüğü üzere bir sipariş sürecinin state’ini takip etmemizi sağlayacak olan ‘OrderSagaData’ sınıfını oluşturmuş bulunuyoruz. Burada dikkatinizi bir önceki adımda oluşturduğumuz ‘BaseCorrelation’ sınıfına çekmek isterim. Evet, oluşturduğumuz bu saga data nesnesi her ne kadar sipariş state’ini tutacak olsa da hangi siparişe dair state’i tuttuğunu anlayabilmek için bunu da ‘BaseCorrelation’dan türetiyor ve bu sınıftaki korelasyon yapılanmasını saga data nesnesine aktarıyoruz.

      Ayrıca yukarıda kullanılan ‘OrderItem’ referansını da Shared içerisinde aşağıdaki gibi oluşturalım.

          public class OrderItem
          {
              public int ProductId { get; set; }
              public int Count { get; set; }
              public decimal Price { get; set; }
          }
      
    • Saga Nesnesi
      Orchestration yaklaşımını uygulamamızı sağlayacak olan, bir işin-akışın yani örnek senaryomuzdaki sipariş sürecinin temsil edilmesine ve servisler arasındaki koordinasyonun merkezi bir şekilde yürütülmesine imkan tanıyacak olan sınıfın ta kendisidir.

      Bizler saga sınıfı sayesinde servisler arasındaki mesajlaşmaları takip edebilmekte, durumları yönetebilmekte ve iş sürecine uygun bir şekilde durumlar arası geçişler yapabilmekteyiz.

      Bir sınıfın saga sınıfı olabilmesi için Saga<T> abstract class’ını implemente etmesi gerekmektedir. Şöyle ki;

          public class OrderSaga : Saga<OrderSagaData>
          {
              protected override void CorrelateMessages(ICorrelationConfig<OrderSagaData> config)
              {
      
              }
          }
      

      Evet, yukarıdaki kod bloğunda görüldüğü üzere bizler ‘OrderSaga’ adında bir saga nesnesi oluşturmuş bulunuyoruz. İçerik sürecinde bu nesnenin hacminin farklı implementasyonlarla genişlediğini görüyor olacaksınız. Şimdilik en sade haliyle incelememiz gerekirse eğer <T> parametresine verilen ‘OrderSagaData’ nesnesine dikkatinizi çekerim. Evet, saga nesnesi sayesinde servisler arası iş akış sürecini gelen message ve event’lere göre merkezi bir şekilde yönetecek ve iletişimi gerçekleştiriyor olacağız lakin bunu yaparken de işleyeceğimiz verinin türünün ‘OrderSagaData’ olacağını bildirmiş oluyoruz. Bizler iş akış sürecinin(yani sipariş sürecinin) hangi noktasında ya da hangi servisinde olursak olalım ona göre arkada üretilmiş olan ‘OrderSagaData’ nesnesinin değerleri şekillenecek ve kuracağımız korelasyon ağıyla da tek bir bağlamla içerisinde bir siparişin en nihai halini yine bu nesne üzerinden elde edebiliyor olacağız.

      Ayrıca Saga abstract class’ının implemente ettirdiği CorrelateMessages isimli metotta ise birazdan oluşturacağımız ve süreçte işlemeye çalışacağımız message ve event’lerin, saga data sınıfımız olan ‘OrderSagaData’ nesnesiyle eşleştirilmesini ve bir olguya yani siparişe dair korelasyon ağının kurulmasını sağlıyor olacağız. Oraya geleceğiz, şimdi bunun için öncelikle çalışmada kullanacağımız event ve message’ların oluşturulmasını gerçekleştirelim.

  • Adım 6 Shared – (Event ve Message’ların Oluşturulması)
    Şimdi kurguda kullanacağımız tüm event ve message yapılanmalarını sorumluluklarını izah ederek tek tek oluşturmaya odaklanalım. Burada oluşturacağımız tüm event ve message’ların korelasyon ilişkisini sağlayabilmek için baştan 4. adımda oluşturduğumuz ‘BaseCorrelation’ sınıfından türetildiğini vurgulamakta fayda görmekteyim.

    1. OrderStartedEvent
      Kullanıcı tarafından sipariş verildiği an sürecin başladığını Saga.Orchestration.Service‘e bildirecek olan event’tir. Yani MassTransit ile yaptığımız çalışmadaki Tetikleyici Event‘e karşılık gelmektedir. Bu event yayınlandığı taktirde Saga.Orchestration.Service‘in veritabanında ‘OrderSagaData’ya karşılık bir satır oluşturulacak ve distributed iş akışı sürecindeki tüm ilerlemeler adım adım bu satır üzerinden takip ediliyor olacaktır.

          public class OrderStartedEvent : BaseCorrelation
          {
              public OrderStartedEvent() { }
              public OrderStartedEvent(Guid correlationId) : base(correlationId) { }
              public int OrderId { get; set; }
              public int BuyerId { get; set; }
              public decimal TotalPrice { get; set; }
              public List<OrderItem> OrderItems { get; set; }
          }
      
    2. OrderCreatedEvent
      Sipariş oluşturulduktan ve süreç başlatıldıktan sonra Stock.Service‘i tetikleyebilmek için yayınlanacak event’tir.

          public class OrderCreatedEvent : BaseCorrelation
          {
              public OrderCreatedEvent(Guid correlationId) : base(correlationId) { }
              public List<OrderItem>? OrderItems { get; set; }
          }
      

      MassTransit’te ki Başarılı Event‘e karşılık gelmektedir.

    3. OrderCompletedEvent
      Siparişe dair tüm servislerin işlemlerini başarıyla tamamlandığını ve siparişin başarıyla sonlandığını ifade eden event’tir. Saga.Orchestration.Service, bu event’i yayınladığı taktirde Order.Saga servisi siparişin durumunu Completed‘a çekecektir. MassTransit’te ki Başarılı Event‘e karşılık gelmektedir.

          public class OrderCompletedEvent
          {
              public int OrderId { get; set; }
          }
      

      Ayrıca bu event’in dikkat ederseniz herhangi bir korelasyon değeri bulunmamaktadır. Çünkü bu event, siparişin sonlandığını ifade edeceğinden dolayı bu bilgiyi içerisindeki ‘OrderId’ ile taşıyacak ve artık korelasyonel bir bağlam içerisinde çalışmaya gerek kalmayacaktır. Bu mantıkla içeriğin devamında tecrübe edileceği üzere, OrderCompletedEvent event’inin yayınlanacağı noktada Saga.Orchestration.Service tarafından ilgili siparişe dair saga bilgisi tamamlanacak ve veritabanından kaldırılmış olacaktır. Yani birnevi sipariş saga açısından Finilize edilmiş olacaktır. O noktaya geldiğimizde bunu tekrardan hatırlatacağım.

    4. OrderFailedEvent
      Siparişe dair distributed iş akışı sürecinde servislerden herhangi birinde bir başarısızlığın söz konusu olduğunu yani uzun lafın kısası siparişin başarısız olduğunu ifade eden event’tir.

          public class OrderFailedEvent : BaseCorrelation
          {
              public OrderFailedEvent(Guid correlationId) : base(correlationId)
              {
              }
      
              public int OrderId { get; set; }
              public string Message { get; set; }
          }
      

      Bu event Saga.Orchestration.Service tarafından yayınlandığı taktirde Order.Saga tarafından ilgili siparişe dair state Fail‘a çekilecektir. MassTransit’te ki Başarsızı Event‘e karşılık gelmektedir.

    5. StockReservedEvent
      Siparişe dair Stock.Service‘de ki stok işlemlerinin başarılı olduğunu ifade eden event’tir.

          public class StockReservedEvent : BaseCorrelation
          {
              public StockReservedEvent(Guid correlationId) : base(correlationId) { }
              public List<OrderItem> OrderItems { get; set; }
          }
      

      MassTransit’te ki Başarılı Event‘e karşılık gelmektedir.

    6. StockNotReservedEvent
      Siparişe dair Stock.Service‘de ki stok işlemlerinde başarısızlığın olduğunu ifade eden event’tir.

          public class StockNotReservedEvent : BaseCorrelation
          {
              public StockNotReservedEvent(Guid correlationId) : base(correlationId) { }
              public string Message { get; set; }
          }
      

      MassTransit’te ki Başarısız Event‘e karşılık gelmektedir.

    7. PaymentStartedEvent
      Stock.Service işlemini başarıyla tamamladığı taktirde Payment.Service‘i tetikleyecek ve ödeme sürecini başlatacak olan event’tir.

          public class PaymentStartedEvent : BaseCorrelation
          {
              public PaymentStartedEvent(Guid correlationId) : base(correlationId)
              {
              }
              public decimal TotalPrice { get; set; }
              public List<OrderItem> OrderItems { get; set; }
          }
      

      MassTransit’te ki Başarılı Event‘e karşılık gelmektedir.

    8. PaymentCompletedEvent
      Siparişe dair Payment.Service‘de ödeme işlemlerinin başarılı olduğunu ifade eden event’tir.

          public class PaymentCompletedEvent : BaseCorrelation
          {
              public PaymentCompletedEvent(Guid correlationId) : base(correlationId)
              {
              }
          }
      

      MassTransit’te ki Başarılı Event‘e karşılık gelmektedir.

    9. PaymentFailedEvent
      Siparişe dair Payment.Service‘de ki ödeme işlemlerinde başarısızlığın olduğunu ifade eden event’tir.

          public class PaymentFailedEvent : BaseCorrelation
          {
              public List<OrderItem> OrderItems { get; set; }
              public string Messages { get; set; }
          }
      

      MassTransit’te ki Başarısız Event‘e karşılık gelmektedir.

    10. StockRollbackMessage
      Payment.Service‘te PaymentFailedEvent event’i yayınlanırsa eğer Stock.Service‘de ki yapılan işlemlerin geri alınmasını sağlayacak olan message’dır.

          public class StockRollbackMessage : BaseCorrelation
          {
              public StockRollbackMessage(Guid correlationId) : base(correlationId) { }
              public List<OrderItem> OrderItems { get; set; }
          }
      

      MassTransit’te ki Compensable Event‘e karşılık gelmektedir.

  • Adım 7 Saga.Orchestration.Service – (OrderSaga İçerisinde Korelasyon İlişkisini Gerçekleştirme)
    Event ve message’ları da oluşturduğumuza göre artık 5. adımda bahsi geçen korelasyonel ağı oluşturmaya odaklanabiliriz. Bunun için Saga.Orchestration.Service içerisinde oluşturduğumuz ‘OrderSaga’ sınıfındaki CorrelateMessages metodunda aşağıdaki geliştirmeyi gerçekleştirelim.

        public class OrderSaga : Saga<OrderSagaData>
        {
            protected override void CorrelateMessages(ICorrelationConfig<OrderSagaData> config)
            {
                config.Correlate<OrderStartedEvent>(
                    orderStartedEvent => orderStartedEvent.CorrelationId,
                    orderSagaData => orderSagaData.CorrelationId);
    
                config.Correlate<StockReservedEvent>(
                    stockReservedEvent => stockReservedEvent.CorrelationId,
                    orderSagaData => orderSagaData.CorrelationId);
    
                config.Correlate<StockNotReservedEvent>(
                    stockNotReservedEvent => stockNotReservedEvent.CorrelationId,
                    orderSagaData => orderSagaData.CorrelationId);
    
                config.Correlate<PaymentCompletedEvent>(
                    paymentCompletedEvent => paymentCompletedEvent.CorrelationId,
                    orderSagaData => orderSagaData.CorrelationId);
    
                config.Correlate<PaymentFailedEvent>(
                    paymentFailedEvent => paymentFailedEvent.CorrelationId,
                    orderSagaData => orderSagaData.CorrelationId);
            }
        }
    

    Yukarıdaki çalışmayı incelerseniz eğer servislerden Saga.Orchestration.Service‘e gelecek olan event’lerin ‘CorrelationId’ değerinin, ‘OrderSagaData’ türünden olan saga data’nın ‘CorrelationId’ ile eşleştirilmesini Correlate<T> metoduyla gerçekleştiriyoruz. Böylece bir sipariş sürecinde distributed iş akışına dair takibin hangi bağlam çerçevesinde olduğunu bu korelasyonel ağ sayesinde ilişkilendirerek sınırlamış oluyoruz.

    Artık korelasyonel ağı da kurguladığımıza göre artık saga nesnesindeki operasyonlara odaklanabiliriz.

  • Adım 8 Saga.Orchestration.Service – (OrderSaga’nın IAmInitiatedBy ve IHandleMessages İmplementasyonlarının Gerçekleştirilmesi)
    Sırada sipariş sürecindeki distributed akışın nasıl olacağını handle etmek vardır. Yani saga nesnesine gelen event’lere karşın hangi servise hangi event’in ya da message’ın gönderileceğini ayarlamamız gerekmektedir. Bunun için Saga.Orchestration.Service‘de ki saga nesnesinde aşağıdaki çalışmayı yapalım.

        public class OrderSaga :
            Saga<OrderSagaData>,
            IAmInitiatedBy<OrderStartedEvent>,
            IHandleMessages<StockReservedEvent>,
            IHandleMessages<StockNotReservedEvent>,
            IHandleMessages<PaymentCompletedEvent>,
            IHandleMessages<PaymentFailedEvent>
        {
            public Task Handle(OrderStartedEvent message)
            {
                ...
            }
    
            public Task Handle(StockReservedEvent message)
            {
                ...
            }
    
            public Task Handle(StockNotReservedEvent message)
            {
                ...
            }
    
            public Task Handle(PaymentCompletedEvent message)
            {
                ...
            }
    
            public Task Handle(PaymentFailedEvent message)
            {
                ...
            }
    
            protected override void CorrelateMessages(ICorrelationConfig<OrderSagaData> config)
            {
                .
                .
                .
            }
        }
    

    Burada görüldüğü üzere IAmInitiatedBy ve IHandleMessages interface’leri kullanılmaktadır. Bu interface’ler ile saga nesnesine diğer servislerden gelecek olan event ve message’lar tanımlanarak, gerekli aksiyonlar sağlanabilmektedir. Bu interface’lerin definition’larına göz atarsak eğer;

    public interface IAmInitiatedBy<in TMessage> : IHandleMessages<TMessage> { }
    
    public interface IHandleMessages { }
    
    public interface IHandleMessages<in TMessage> : IHandleMessages
    {
        Task Handle(TMessage message);
    }
    

    şeklinde olduklarını görürüz. Yani bu interface’lerin her ikisinin de implementasyonunun da Task Handle(TMessage message); imzasına karşın bir metodun uygulatıldığını söyleyebiliriz. Dolayısıyla yukarıdaki çalışmada 9 ile 32. satır aralığındaki Handle metot tanımları bu implementasyonlara karşın oluşturulmuştur.

    Burada IAmInitiatedBy interface’i, saga’nın(yani kurgumuz gereği bir siparişin) başlatılmasını sağlayacak event’i temsil etmektedir. 3. satıra göz atarsanız eğer bu event’in OrderStartedEvent olduğu ifade edilmekte ve böylece bu event yayınlandığı taktirde saga nesnesinin yeni bir saga data oluşturacağı bildirilmiş olmaktadır. Ve bu event’e karşın 9 ile 12. satır aralığındaki ilgili türü handle eden metot tetiklenecektir.

    IHandleMessages interface’i ise diğer event yahut message’ları temsil etmektedir. Bu interface ile belirtilen event’lerin yayınlanması durumunda bu saga nesnesi tarafından bunların karşılanabileceğini ifade etmemizi sağlamaktadır. 14 ile 32. satır aralığındaki metotlar ise gelecek event’in türüne uygun olacak şekilde tetiklenecektir.

  • Adım 9 Saga.Orchestration.Service – (HandleMessage Operasyonlarını Gerçekleştirme)
    Şimdi de handle metotlarındaki gerekli çalışmaları gerçekleştirelim.

        public class OrderSaga(IBus bus) :
            Saga<OrderSagaData>,
            IAmInitiatedBy<OrderStartedEvent>,
            IHandleMessages<StockReservedEvent>,
            IHandleMessages<StockNotReservedEvent>,
            IHandleMessages<PaymentCompletedEvent>,
            IHandleMessages<PaymentFailedEvent>
        {
            public async Task Handle(OrderStartedEvent message)
            {
                if (!IsNew)
                    return;
    
                Data.OrderId = message.OrderId;
                Data.BuyerId = message.BuyerId;
                Data.OrderItems = message.OrderItems;
                Data.TotalPrice = message.TotalPrice;
    
                await bus.Send(new OrderCreatedEvent(message.CorrelationId)
                {
                    OrderItems = message.OrderItems,
                });
            }
    
            public async Task Handle(StockReservedEvent message)
            {
                await bus.Send(new PaymentStartedEvent(message.CorrelationId)
                {
                    OrderItems = message.OrderItems,
                    TotalPrice = Data.TotalPrice
                });
            }
    
            public async Task Handle(StockNotReservedEvent message)
            {
                await bus.Send(new OrderFailedEvent(message.CorrelationId)
                {
                    Message = message.Message,
                    OrderId = Data.OrderId
                });
            }
    
            public async Task Handle(PaymentCompletedEvent message)
            {
                await bus.Send(new OrderCompletedEvent()
                {
                    OrderId = Data.OrderId
                });
    
                MarkAsComplete();
            }
    
            public async Task Handle(PaymentFailedEvent message)
            {
                await bus.Send(new OrderFailedEvent(message.CorrelationId)
                {
                    OrderId = Data.OrderId,
                    Message = message.Messages
                });
    
                await bus.Send(new StockRollbackMessage(message.CorrelationId)
                {
                    OrderItems = message.OrderItems
                });
            }
    
            protected override void CorrelateMessages(ICorrelationConfig<OrderSagaData> config)
            {
                .
                .
                .
            }
        }
    

    Bu adımda yapılan çalışmaları incelemeye öncelikle inject edilen IBus referansını vurgulayarak başlayalım. Bizler Rebus ile mesaj gönderim işlemlerini bu referans aracılığıyla kolaylıkla gerçekleştirebiliyoruz.

    9 ile 23. satır aralığında OrderStartedEvent‘e karşın handle işlemleri yürütülmektedir. Bu event geldiği taktirde öncelikle IsNew property’si ile yeni bir saga yani sipariş sürecinin olup olmadığı değerlendirilmekte ve eğer ki bu sipariş daha önce işlendiyse tekrar etmeye gerek görülmemektedir. Yok eğer yeni sipariş durumu söz konusuysa o taktirde ‘OrderSagaData’ türünden saga data’ya Data property’si üzerinden erişilmekte ve başlatıcı event’in değerleri işlenmekte ve ardından OrderCreatedEvent yayınlanmaktadır.

    25 ile 32. satır aralığında ise StockReservedEvent‘e karşılık PaymentStartedEvent, 34 ile 41. satır aralığında StockNotReservedEvent‘e karşılık OrderFailedEvent, 43 ile 51. satır aralığında PaymentCompletedEvent‘e karşılık OrderCompletedEvent ve son olarak da 53 ile 65. satır aralığında ise PaymentFailedEvent‘e karşılık OrderFailedEvent ile birlikte StockRollbackMessage yayınlanmaktadır.

    43 ile 51. satır aralığına tekrar gelirsek eğer burada MarkAsComplete metodunun çağrıldığını görmekteyiz. Bu metot 6. adımda OrderCompletedEvent‘i oluştururken bahsettiğim finalize işlemini gerçekleştirmektedir. Yani, ilgili handle metodunda OrderCompletedEvent yayınlandığından dolayı artık siparişin başarılı olduğu kabul edilmekte ve böylece iş akışının tamamlandığı ifade edilmektedir. Dolayısıyla MarkAsComplete metodu çağrılmakta ve Saga.Orchestration.Service‘in veritabanında siparişe dair oluşturulmuş olan saga data satırı kaldırılmaktadır.

  • Adım 10 Shared – (Queue’ların ve AMQP Url’in Yapılandırılması)
    Şimdi de tüm servislerin iletişim süreçlerinde handle edecekleri mesajları hangi queue’lardan alacaklarını yapılandıralım. Bunun için Shared katmanında aşağıdaki sınıfı oluşturabiliriz.

        public static class RabbitMQQueueSettings
        {
            public const string OrderInputQueueName = "order-input-queue-name";
            public const string StockInputQueueName = "stock-input-queue-name";
            public const string PaymentInputQueueName = "payment-input-queue-name";
            public const string SagaOrchestrationInputQueueName = "saga-orchestration-input-queue-name";
        }
    

    Görüldüğü üzere Saga.Orchestration.Service‘de dahil tüm servislerin queue isimlendirmesini RabbitMQQueueSettings sınıfında ayarlamış bulunuyoruz.

    Queue isimlerinin yanında ayrıca servisler arası iletişim sürecinde kullanacağımız RabbitMQ transporter’ının AMQP Url’ine tüm servislerin erişebileceği şekilde bir tanımlamada bulunmamız gerekmektedir. Evet, bu işlem için her servisin appsettings.json dosyasından istifade edebiliriz ki doğrusu da budur. Çünkü production ortamında bu tarz değerler elbet environment’tan yapılandırılacaktır. Haliyle bunda mutabıkız. Lakin bizler makalemizin hacmini şişirmemek için bu değeri de yine Shared katmanında oluşturduğumuz aşağıdaki RabbitMQAMQPUrl sınıfın da tutuyor olacak ve bu best practice’i bilinçli bir şekilde göz ardı ediyor olacağız.

        public class RabbitMQAMQPUrl
        {
            public const string AMQPUrl = "amqps://irresudw:slx6TGLxuMAthCz4X61dYE6Crr-Vx7lx@chimpanzee.rmq.cloudamqp.com/irresudw";
        }
    

    Evet, artık bundan sonraki adımlarda bu yapılandırma değerlerini kullanarak servislerimizi konfigüre edebiliriz.

  • Adım 11 Saga.Orchestration.Service – (Program.cs Yapılandırması ve Veritabanının Oluşturulması)
    İlk konfigüre edeceğimiz servis Saga.Orchestration.Service‘dir. Bunun için ilgili servisin ‘Program.cs’ dosyasında aşağıdaki çalışmayı yapalım.

    var builder = Host.CreateApplicationBuilder(args);
    
    builder.Services.AddRebus(rebus => rebus
         .Routing(r =>
            r.TypeBased()
            .Map<OrderCreatedEvent>(RabbitMQQueueSettings.StockInputQueueName)
            .Map<PaymentStartedEvent>(RabbitMQQueueSettings.PaymentInputQueueName)
            .Map<OrderFailedEvent>(RabbitMQQueueSettings.OrderInputQueueName)
            .Map<OrderCompletedEvent>(RabbitMQQueueSettings.OrderInputQueueName)
            .Map<StockRollbackMessage>(RabbitMQQueueSettings.StockInputQueueName))
         .Transport(t =>
             t.UseRabbitMq(RabbitMQAMQPUrl.AMQPUrl,
             inputQueueName: RabbitMQQueueSettings.SagaOrchestrationInputQueueName))
         .Sagas(s =>
             s.StoreInSqlServer("Server=localhost, 1433;Database=SagaOrchestrationRebusDB;User ID=SA;Password=1q2w3e4r+!;TrustServerCertificate=True",
             dataTableName: "Sagas",
             indexTableName: "SagaIndexes",
             automaticallyCreateTables: true))
         .Timeouts(t =>
             t.StoreInSqlServer("Server=localhost, 1433;Database=SagaOrchestrationRebusDB;User ID=SA;Password=1q2w3e4r+!;TrustServerCertificate=True",
             tableName: "Timeouts",
             automaticallyCreateTables: true)));
    
    builder.Services.AutoRegisterHandlersFromAssemblyOf<Program>();
    
    var host = builder.Build();
    host.Run();
    

    Yapılan çalışmayı incelersek eğer; 3. satırda AddRebus fonksiyonunu kullanarak Rebus servisini uygulamaya dahil ediyoruz. 4 ile 10. satır aralığında Routing metodu ile türüne göre mesajların hangi kuyruklara yönlendirileceğini belirliyoruz. Burada TypeBased metodu ile yönlendirme stratejisi olarak mesaj türüne dayalı bir davranışı belirleyeceğimizi ifade etmiş oluyoruz. Ayrıca Map fonksiyonu ile de belirtilen mesaj türünü önceden ayarladığımız kuyruk adıyla eşleştiriyoruz ki böylece bu servis tarafından, o türden mesajın hangi kuyruktan geleceğini ve tüketebileceğini yapılandırmış oluyoruz. Tabi burada tüm mesaj türlerine göre ayrı ayrı yapılandırma da bulunmaktansa MapAssemblyOf metoduyla da aşağıdaki gibi tüm türlere karşın assembly bazlı kümülatif bir tanımda bulunabilirdik.

    .
    .
    .
    builder.Services.AddRebus(rebus => rebus
         .Routing(r =>
            r.TypeBased()
            .MapAssemblyOf<BaseCorrelation>("..queue-name..")
    .
    .
    .
    

    Velhasıl, 11. satırdaki Transport metodu ile de bu servisin kullanacağı transporter’ı belirtiyor ve 12. satırda UseRabbitMq metodu ile RabbitMQ’ya göre gerekli yapılandırmayı sağlıyoruz. Ayrıca bu servisin mesaj gönderirken hangi kuyruğa göndereceğini de inputQueueName parametresine verilen queue değeriyle belirtiyoruz. Yani Saga.Orchestration.Service bir message’ı send ya da publish ettiğinde bu, burada belirtilen kuyruğa gönderilmiş olacaktır.

    14. satırda Sagas metodu ile de Rebus kütüphanesinin servisler arasındaki iletişim sürecinde iş akışının durumunu nerede saklayacağını belirtiyoruz.

    Ve son olarak 19. satırda çağırdığımız Timeouts metodu ile de Rebus’ın timeout süreçlerine dair kayıtların nerede tutulacağını yapılandırıyoruz.

    Burada Sagas ve Timeouts işlemleri için gerekli olan veritabanı yapıları automaticallyCreateTables parametresi sayesinde servis ayağa kaldırıldığı an otomatik olarak oluşturulacaktır. Tabi veritabanını bizlerin oluşturması gerekmektedir. Kaynaktaki connection string değerine bakarsanız veritabanı ismine SagaOrchestrationRebusDB verdiğimizi görürsünüz. Haliyle şimdi bu isimde bir veritabanını oluşturalım ve ardından servisimizi ayağa kaldırıp gerekli tablo yapılanmasını generate ettirelim.

    CREATE DATABASE SagaOrchestrationRebusDB
    

    .NET'te Rebus Kütüphanesiyle Saga Orchestration Yaklaşımının UygulanmasıGörüldüğü üzere oluşturduğumuz veritabanı içerisine ilgili tablo yapısı oluşturulmuştur. Şimdi oluşturulan bu tabloları inceleyelim.

    Tablolar
    Sagas SagaIndexes Timeouts
    Distributed iş akışı sürecindeki state’i saklamak ve takip etmek için kullanılan tablodur..NET'te Rebus Kütüphanesiyle Saga Orchestration Yaklaşımının UygulanmasıBurada iş akışındaki her bir state’i bir versiyon numarasıyla temsil edeceği revision kolonunda tutmaktadır. Her bir state değişikliği bu kolonun değerini +1 artıracaktır. Böylece biz ilgili olguya dair iş akışında state’in kaç kez değiştiğini öğrenebilmekteyiz.

    data kolonunda ise her bir duruma karşın kullanılan OrderSagaData türünden saga data verisi binary formatında tutulmaktadır.

    Sagas tablosuyla ilişkilendirilmiş olan bu SagaIndexes tablosu iş akışı state’ini belirleyen index bilgilerini içermektedir..NET'te Rebus Kütüphanesiyle Saga Orchestration Yaklaşımının Uygulanmasısaga_type kolonunda state’i takip edilen saga data’nın türü, key kolonunda korelasyon değerini ifade eden property adı ve value kolonunda ise iş akışındaki bütünselliği sağlayacak olan korelasyonel değer tutulmaktadır.

    Timeout süreçlerini yönetmek için kullanılan tablodur. Bu tablo sayesinde, Rebus ile yapılan işlemlerin yahut olayların belirli bir süre içerisinde tamamlanmasını beklediği durumları takip edebiliriz..NET'te Rebus Kütüphanesiyle Saga Orchestration Yaklaşımının Uygulanması
  • Adım 12 Order.API – (Program.cs Yapılandırması)
    Sıradaki konfigüre edeceğimiz servisimiz Order.API olacaktır. Bunun için ilgili servisin ‘Program.cs’ dosyasında aşağıdaki çalışmayı yapalım.

    
    .
    .
    .
    builder.Services.AddRebus(rebus => rebus
         .Routing(r => r.TypeBased().MapAssemblyOf<BaseCorrelation>(RabbitMQQueueSettings.SagaOrchestrationInputQueueName))
         .Transport(t => t.UseRabbitMq(RabbitMQAMQPUrl.AMQPUrl, inputQueueName: RabbitMQQueueSettings.OrderInputQueueName)));
    
    builder.Services.AutoRegisterHandlersFromAssemblyOf<Program>();
    .
    .
    .
    

    Görüldüğü üzere Order.API‘a tüm mesajların Saga.Orchestration.Service‘in mesaj göndereceği kuyruktan yani kısaca Saga.Orchestration.Service‘ten geleceğini bildirmiş bulunmaktayız. Çünkü Order.API ve diğer tüm servisler sade ve sadece Saga.Orchestration.Service‘ten mesaj alacak ya da ona mesaj gönderecektir. Ayrıca bu servisin göndereceği mesajları da, inputQueueName parametresine bakarsak RabbitMQQueueSettings.OrderInputQueueName değerine karşılık gelen kuyruğa göndereceğini ifade etmiş bulunmaktayız. Haliyle buradaki yapılandırma mantığını diğer servislerde de birebir görüyor olacağız.

    AutoRegisterHandlersFromAssemblyOf metodu ise sonraki adımlarda oluşturacağımız handler sınıflarının assembly seviyesinde otomatik olarak yakalayıp, register edilmesini sağlamaktadır. Ha tabi, her ne kadar bu senaryoda Order.API‘da bir handler sınıfı oluşturmayacak olsak da Rebus ile yapılan çalışmalarda handler yapılanmalarının aktivite edilmesi için bu metodun çağrılması elzemdir ve sonraki süreçlerde oluşturma ihtimaline nazaran uygulama seviyesinde önceden çağrılıp hazır bir şekilde tutulması kâfidir.

  • Adım 13 Order.API – (Sipariş Oluşturma Sürecinde OrderStartedEvent’in Yayınlanması)
    Şimdi de 2. adımda Order.API‘da geliştirdiğimiz /create-order minimal api’sinde aşağıdaki gibi OrderStartedEvent‘i yayınlayalım ve distributed iş akışı sürecini başlatalım.

    .
    .
    .
    
    app.MapPost("/create-order", async (IBus bus, ApplicationDbContext context, CreateOrderVM model) =>
    {
        .
        .
        .
    
        OrderStartedEvent orderStartedEvent = new()
        {
            OrderId = order.Id,
            BuyerId = model.BuyerId,
            TotalPrice = order.TotalPrice,
            OrderItems = model.OrderItems.Select(oi => new Shared.Datas.OrderItem
            {
                Count = oi.Count,
                Price = oi.Price,
                ProductId = oi.ProductId
            }).ToList(),
            CorrelationId = Guid.NewGuid(),
        };
    
        await bus.Send(orderStartedEvent);
    
        return Results.Created();
    });
    
    app.Run();
    
  • Adım 14 Stock.Service – (Program.cs Yapılandırması)
    Stock.Service‘de de önceki adımlarda anlatılan mantıkta yapılandırmayı aşağıdaki gibi sağlayalım.

    .
    .
    .
    
    builder.Services.AddRebus(rebus => rebus
         .Routing(r => r.TypeBased().MapAssemblyOf<BaseCorrelation>(RabbitMQQueueSettings.SagaOrchestrationInputQueueName))
         .Transport(t => t.UseRabbitMq(RabbitMQAMQPUrl.AMQPUrl, inputQueueName: RabbitMQQueueSettings.StockInputQueueName)));
    
    builder.Services.AutoRegisterHandlersFromAssemblyOf<Program>();
    .
    .
    .
    
  • Adım 15 Stock.Service – (OrderCreatedEventHandler ve StockRollbackMessageHandler’ın Oluşturulması)
    Stock.Service‘i yapılandırdıktan sonra gerekli handler sınıflarını aşağıdaki gibi oluşturalım.

        public class OrderCreatedEventHandler(MongoDBService mongoDBService, IBus bus) : IHandleMessages<OrderCreatedEvent>
        {
            public async Task Handle(OrderCreatedEvent message)
            {
                List<bool> stockResults = new();
                var stockCollection = mongoDBService.GetCollection<Models.Stock>();
    
                foreach (var orderItem in message.OrderItems)
                    stockResults.Add(await (await stockCollection.FindAsync(s => s.ProductId == orderItem.ProductId && s.Count >=
    (long)orderItem.Count)).AnyAsync());
    
                if (stockResults.TrueForAll(s => s.Equals(true)))
                {
                    foreach (var orderItem in message.OrderItems)
                    {
                        var stock = await (await stockCollection.FindAsync(s => s.ProductId == orderItem.ProductId)).FirstOrDefaultAsync();
    
                        stock.Count -= orderItem.Count;
    
                        await stockCollection.FindOneAndReplaceAsync(x => x.ProductId == orderItem.ProductId, stock);
                    }
    
                    StockReservedEvent stockReservedEvent = new(message.CorrelationId)
                    {
                        OrderItems = message.OrderItems
                    };
    
                    await bus.Send(stockReservedEvent);
                }
                else
                {
                    StockNotReservedEvent stockNotReservedEvent = new(message.CorrelationId)
                    {
                        Message = "Stok yetersiz...."
                    };
    
                    await bus.Send(stockNotReservedEvent);
                }
            }
        }
    
        public class StockRollbackMessageHandler(MongoDBService mongoDBService) : IHandleMessages<StockRollbackMessage>
        {
            public async Task Handle(StockRollbackMessage message)
            {
                var stockCollection = mongoDBService.GetCollection<Models.Stock>();
    
                foreach (var orderItem in message.OrderItems)
                {
                    var stock = await (await stockCollection.FindAsync(x => x.ProductId == orderItem.ProductId)).FirstOrDefaultAsync();
    
                    stock.Count += orderItem.Count;
                    await stockCollection.FindOneAndReplaceAsync(x => x.ProductId == orderItem.ProductId, stock);
                }
            }
        }
    

    Yukarıdaki kodları incelerseniz eğer gerekli stok işlemleri gerçekleştirilmekte yahut compensable transaction operasyonu yürütülmektedir.

  • Adım 16 Payment.Service – (Program.cs Yapılandırması)
    Aynı mantıkta Payment.Service‘de de ‘Program.cs’ yapılandırmasını gerçekleştirelim.

    .
    .
    .
    builder.Services.AddRebus(rebus => rebus
         .Routing(r => r.TypeBased().MapAssemblyOf<BaseCorrelation>(RabbitMQQueueSettings.SagaOrchestrationInputQueueName))
         .Transport(t => t.UseRabbitMq(RabbitMQAMQPUrl.AMQPUrl, inputQueueName: RabbitMQQueueSettings.PaymentInputQueueName)));
    
    builder.Services.AutoRegisterHandlersFromAssemblyOf<Program>();
    .
    .
    .
    
  • Adım 17 Payment.Service – (PaymentStartedEventHandler’ın Oluşturulması)
    Ve ödemeyle ilgili işlemleri başlatacak olan PaymentStartedEvent‘e karşın PaymentStartedEventHandler‘ı oluşturalım.

        public class PaymentStartedEventHandler(IBus bus) : IHandleMessages<PaymentStartedEvent>
        {
            public async Task Handle(PaymentStartedEvent message)
            {
                //Ödeme başarılı...
                await bus.Send(new PaymentCompletedEvent(message.CorrelationId));
            }
        }
    

    Burada farazi bir ödeme işleminin yapıldığını varsayarak yolumuza devam edelim.

  • Adım 19 Order.Saga – (Program.cs Yapılandırması)
    Ve son olarak Order.Saga servisinde de ‘Program.cs’ yapılandırmasını gerçekleştirelim.

    .
    .
    .
    builder.Services.AddRebus(rebus => rebus
         .Routing(r => r.TypeBased().MapAssemblyOf<BaseCorrelation>(RabbitMQQueueSettings.SagaOrchestrationInputQueueName))
         .Transport(t => t.UseRabbitMq(RabbitMQAMQPUrl.AMQPUrl, inputQueueName: RabbitMQQueueSettings.OrderInputQueueName)));
    
    builder.Services.AutoRegisterHandlersFromAssemblyOf<Program>();
    .
    .
    .
    
  • Adım 18 Order.Saga – (OrderCompletedEventHandler ve OrderFailedEventHandler’ın Oluşturulması)
    Ve ardından siparişin nihai durumunu belirleyecek olan handler’ları oluşturalım.

        public class OrderCompletedEventHandler(ApplicationDbContext applicationDbContext) : IHandleMessages<OrderCompletedEvent>
        {
            public async Task Handle(OrderCompletedEvent message)
            {
                Order.Saga.Models.Entities.Order order = await applicationDbContext.Orders.FindAsync(message.OrderId);
                if (order != null)
                {
                    order.OrderStatus = Models.Enums.OrderStatus.Completed;
                    await applicationDbContext.SaveChangesAsync();
                }
            }
        }
    
        public class OrderFailedEventHandler(ApplicationDbContext applicationDbContext) : IHandleMessages<OrderFailedEvent>
        {
            public async Task Handle(OrderFailedEvent message)
            {
                Order.Saga.Models.Entities.Order order = await applicationDbContext.Orders.FindAsync(message.OrderId);
                if (order != null)
                {
                    order.OrderStatus = Order.Saga.Models.Enums.OrderStatus.Fail;
                    await applicationDbContext.SaveChangesAsync();
                }
            }
        }
    

    Burada dikkat edilmesi gereken nokta şudur ki, Order.Saga servisinde yapılan veritabanı çalışmalarında Order.API‘dakine benzer context ve entity yapısı inşa edilmiş ve IoC Container’a provide edilmiştir. Tabi tekrara girmemek ve içeriği haddinden fazla uzatmamak için bu kısımlar es geçilmiştir. Yapılan çalışmaların tam teferruatlı içeriğine makalenin sonunda paylaşılan github linki üzerinden gözlemde bulunabilirsiniz.

  • Adım 19 Test
    Artık yapılan bunca çalışmanın testini gerçekleştirebiliriz. Bunun için servislerimizi derleyip ayağa kaldıralım ve aşağıdaki json data eşliğinde bir sipariş talebinde bulunalım.

    {
      "buyerId": 1,
      "orderItems": [
        {
          "productId": 1,
          "count": 10,
          "price": 5
        },
        {
          "productId": 2,
          "count": 30,
          "price": 50
        },
        {
          "productId": 3,
          "count": 5,
          "price": 5
        }
      ]
    }
    

    .NET'te Rebus Kütüphanesiyle Saga Orchestration Yaklaşımının UygulanmasıBu işlem neticesinde sürecin aşağıdaki görseldeki gibi olduğunu göreceğiz..NET’te Rebus Kütüphanesiyle Saga Orchestration Yaklaşımının UygulanmasıAyrıca yeri gelmişken bahsetmekte fayda görmekteyim ki, distributed iş akışı sürecinde Sagas tablosunun ‘data’ kolonundaki binary veriyi aşağıdaki sorguyla metinsele dönüştürebilir ve içeriğine dair gözlemde bulunabilirsiniz.

    SELECT CONVERT(VARCHAR(MAX), data)
    FROM SagaOrchestrationRebusDB.dbo.Sagas
    
    {“$type”:”Saga.Orchestration.Service.SagaDatas.OrderSagaData, Saga.Orchestration.Service”,”Id”:”7e630713-c610-4c9b-8a38-03f61f3060a8″,”Revision”:1,”OrderId”:5,”BuyerId”:1,”TotalPrice”:1825.0,”OrderItems”:{“$type”:”System.Collections.Generic.List`1[[Shared.Datas.OrderItem, Shared]], System.Private.CoreLib”,”$values”:[{“$type”:”Shared.Datas.OrderItem, Shared”,”ProductId”:1,”Count”:10,”Price”:5.0},{“$type”:”Shared.Datas.OrderItem, Shared”,”ProductId”:2,”Count”:30,”Price”:50.0},{“$type”:”Shared.Datas.OrderItem, Shared”,”ProductId”:4,”Count”:55,”Price”:5.0}]},”CorrelationId”:”c5d0b313-2bf4-4081-935f-72fa0c9895f0″}

İşte bu kadar 🙂

Nihai olarak;
Saga Orchestration implementasyonunu Rebus kütüphanesiyle, test adımı da dahil olmak üzere 19 adımda uygulamış ve MassTransit’e nazaran konuya dair alternatif bir yaklaşımı tecrübe etmiş bulunmaktayız. Nacizane fikrim, Rebus kütüphanesi MassTransit’e karşın yapılandırma açısından düşük bir komplikasyona sahip olsa da onun kadar geniş bir yelpazede entegrasyon imkanı sağlamamakta ve küçük yahut orta ölçekli uygulamalarda daha pratik çözümler sunmaktadır.

İlgilenenlerin faydalanması dileğiyle…
Sonraki yazılarımda görüşmek üzere…
İyi çalışmalar…

Not : Örnek çalışmaya aşağıdaki github adresinden erişebilirsiniz.
https://github.com/gncyyldz/Saga.Orchestration.Example.With.Rebus

Bunlar da hoşunuza gidebilir...

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir