Derinlemesine yazılım eğitimleri için kanalımı takip edebilirsiniz...

Microservice – Saga – Commands/Orchestration Implemantasyonu İle Transaction Yönetimi

Merhaba,

Önceki içeriklerimizden Microservice – Saga – Events/Choreography Implemantasyonu İle Transaction Yönetimi başlıklı yazımızda Saga pattern’ının iki implemantasyonundan biri olan choreography yöntemini tüm detaylarıyla ve pratik olarak incelemiştik. Bu içeriğimizde ise yine Saga pattern’ının ikinci implemantasyon yöntemi olan Orchestration’ı pratik olarak tam teferruatlı inceliyor olacağız. Yazımız uzun ve meşakkatli olacağından dolayı vakit kaybetmeksizin buyrun hemen başlayalım…

Events/Choreography yöntemi varken Commands/Orchestration implemantasyonunun amacı nedir?
Herşeyden önce bu soruyu sorarak başlamakta fayda görmekteyim. Choreography implementasyonu neyimize yetmedi de Orchestration yönteminden medet umar hale geldik konuşmakta fayda var 🙂
Microservice - Saga - Commands-Orchestration Implemantasyonu İle Transaction Yönetimi
Yandaki şemaya göz atarsak eğer Choreography implementasyonunun stratejik yapılanmasını görmekteyiz. Dikkat ederseniz Choreography’de tüm servisler Event Broker ile iletişim kurmak ve hangi servisten hangi event’in geleceğiyle ilgilenmek mecburiyetindedir. Haliyle Choreography adı üzerinde bütünsel bir koreografiye ayak uydurulabilir bir inşa gerektirmekte ve bazen aynı işi birden fazla farklı servisten gelebilecek şekilde koordine ederek farklı consumer’lar da yapılmasını gerektirebilmektedir.

Bunu daha da somutlaştırabilmek için basit bir e-ticaret senaryosundan örneklendirme yapabiliriz. Misal; bir e-ticaret sisteminde müşteriler tarafından yapılan siparişler sırasıyla Order.API, Stock.API ve Payment.API servisleri üzerinden yürütüldüğünü varsayalım. Bu sistemde Stock.API ya da Payment.API servislerinde oluşabilecek herhangi bir aksi duruma istinaden Order.API gerekli müdahalede bulunabilmek ve siparişi iptal edebilmek için her iki servisten gelecek başarısızlık durumunu ifade eden event’leri consume etmek mecburiyetindedir. Haliyle hangi servisten gelirse gelsin bir siparişin iptal edilmesi aynı işi gerektireceğinden dolayı Choreography implementasyonu burada bu operasyonun çoklatılmasını mecburi kılmakta ve aynı işi hem Stock.API için hem de Payment.API için consume edecek şekilde kodlanılabilir olmayı gerektirmektedir.

Bu yüzden Choreography implementasyonu hem lüzumsuz kod israfı, hem de yönetilebilirlik açısından 2 ile 4 servisin söz konusu olduğu çalışmalarda ideal olarak sınırlandırılmaktadır. Daha fazla servisin söz konusu olduğu durumlarda ise artık servisler arası yönetilebilirlik maliyetini düşürmek için merkezi bir yönetime olan ihtiyaç oldukça artmaktadır. İşte artık bu sınırda yeni implementasyon kabulü olarak Orchestration devreye girmektedir.
Microservice - Saga - Commands-Orchestration Implemantasyonu İle Transaction Yönetimi
Orchestration implementasyonu, servisler arasındaki distributed transaction’ı sağlayabilmek için merkezi bir orchestrator üzerinden tüm trafiği yönetmekte ve böylece servisler açısından eylemlerin kaynağıyla ilgilenmeksizin hangi event’ler de hangi aksiyonların alınması gerektiğinden başka ekstra bir bilginin bilinmesine gerek duyulmaksızın çalışılmasını sağlamaktadır.

Yukarıdaki örnek e-ticaret senaryosundan devam edersek eğer bu sefer Order.API, Stock.API ve Payment.API servislerinin yanında bu servisler arasında iletişimi yani gerekli organizasyonu sağlayacak olan bir Orchestrator servisi olacaktır. Artık Stock.API veya Payment.API servislerinden herhangi birinde olabilecek olası bir aksilik durumunda gerekli event’lerle bu Orchestrator servisi tetiklenecek ve yine bu servis üzerinden gerekli sipariş iptal event’i Order.API‘ya iletilecektir. Haliyle burada Order.API artık Orchesrator kanadını dinleyeceğinden dolayı iptal durumunun hangi servisten geldiğiyle ilgilenmeksizin sadece ilgili event’in consume edilmesine odaklanacak ve böylece servisler arası iletişimde yönetilebilirlik hat safhaya çıkacaktır.

Orchestration implementasyonu nasıl gerçekleştirilmektedir?
Microservice - Saga - Commands-Orchestration Implemantasyonu İle Transaction Yönetimi
Orchestration, yukarıdaki paragrafta da değinildiği gibi merkezi bir orchestrator aracılığıyla distributed transaction’ın yönetilmesini sağlayan bir stratejiye dayanmaktadır. Haliyle bu strateji kabul edildikten sonra geriye yapılması gereken bu orchestrator sorumluluğunu üstlenecek olan servisin bir şekilde geliştirilmesi kalmaktadır. Misal olarak bu servis yukarıdaki şemada olduğu gibi salt bir şekilde message broker üzerinden merkezi haberleşme birimi olabilecekken, bir yandan da State Machine olarak tasarlanmış ayrı bir worker servis olabilir. Ki bizler Saga pattern operasyonlarında orchestrator olarak state machine oluşturarak operasyon sürecindeki her bir adımı ayrı ayrı kontrol edecek ve o anki duruma göre event yönetimini sağlayarak distributed transaction yönetimini state yapılanması üzerinden yönetmeyi tercih edeceğiz. Neden mi? Çünkü bu yöntem best practices olarak kabul görmektedir. Ve bu makalede de bu yöntem üzerinden pratik bir senaryo icra edilecektir.

Peki neden ve nasıl state machine kullanacağız?
Yine bu soruya yukarıdaki örnek e-ticaret senaryosu üzerinden cevap vermeye çalışırsak eğer kullanıcı tarafından yapılan sipariş isteği Order.API tarafından karşılandıktan sonra state machine OrderCreated durumunu tutacaktır. Ardından siparişteki ürünlerin stok bilgileri Stock.API tarafından güncellendikten sonra state machine’de ilgili siparişe dair durum değeri StockReserved olarak güncellenecektir. Benzer mantıkla stok bilgilerinde herhangi bir tutarsızlık söz konusu olursa eğer StockNotReserved durumu tutulacak yok eğer ödeme sürecinde bir hata söz konusuysa PaymentFailed durumu tutulacaktır. Ödeme başarılıysa eğer ilgili siparişe dair state Finalize yapılarak nihayetinde silinecektir. Haliyle bir sipariş sürecindeki tüm adımlar state machine tarafından tutulacağı için ilgili siparişin hangi durumlarda aksadığını(ya da aksi durum yaşadığını) veya tamamlanıp tamamlanmadığını rahatlıkla görebilecek ve yönetebileceğiz.

State machine’i nasıl kullanacağız sorusuna gelirsek eğer, burada State Design Pattern ile kendi state’lerimizi yönetebilecek bir tasarım ortaya koyabileceğimiz gibi daha efektif ve hızlı çözüm getirebilmek için bizlere hazır bir state machine yapılanması sunan open source Automatonymous kütüphanesinden de istifade edebiliriz. Bu kütüphaneyi salt bir şekilde belirtilen adresindeki yönergeler doğrultusunda uygulayabilir ve kullanabiliriz. Lakin bizler genellikle message broker çalışmalarında Enterprise Service Bus(ESB) kullanmayı tercih ettiğimiz ve bundan dolayı genellikle MassTransit kütüphanesini kullandığımızdan dolayı MassTransit geliştiricileri state machine yapılanmasını da destekleyebilmek için ayrıca bir geliştirme yapmak yerine direkt olarak Automatonymous kütüphanesini bünyelerine katmış olduklarından bizler bu Automatonymous kütüphanesini MassTransit kütüphanesi üzerinden kullanıyor olacağız.

Haliyle artık çok uzatmadan örnek senaryomuzun sınırlarını çizelim ve gerekli projelerin oluşturulmasını ve kullanılacak kütüphanelerin kurulumunu gerçekleştirelim.

Servislerin Oluşturulması

Bu senaryomuzda esasında bir önceki Microservice – Saga – Events/Choreography Implemantasyonu İle Transaction Yönetimi başlıklı makalemizde ele aldığımız senaryoyla birebir aynı olacaktır. Lakin implementasyon olarak choreography yerine orchestration kullanacağımızdan dolayı ayrıca bir state machine görevi görecek servisimiz daha olacaktır.

Yine de senaryonun genel hattını tekrar çizmemiz icap ederse; bir e-ticaret sistemindeki sipariş sürecini modelliyor olacağız. Bu süreçte Order.API, Stock.API ve Payment.API isimli servisler sorumlu olacak ve sırasıyla işlevsellik göstereceklerdir. Ayrıca bu servisler arasında Saga orchestration implementasyonuna uygun bir şekilde merkezi distributed transaction yönetimini sağlayacak olan SagaStateMachine.Service adında worker service oluşturulacaktır. Tüm bu servisleri oluşturabilmek için aşağıdaki dotnet cli komutlarından istifade edebilirsiniz;

(Service) dotnet new webapi --name Order.API
(Service) dotnet new webapi --name Stock.API
(Service) dotnet new webapi --name Payment.API
(Worker Service) dotnet new worker --name SagaStateMachine.Service
(Class Library) dotnet new classlib --name Shared : Servisler arası mesajlaşma için event’leri vs. bu class library’de oluşturacağız.

Servislere MassTransit Kurulumu ve Temel Konfigürasyonlar

Saga pattern kullanıyorsak eğer asenkron bir iletişim modeli olmazsa olmaz bir yaklaşımdır. (bknz: önceki makale/choreography) Dolayısıyla bizler bu asenkron iletişim sürecinde message broker olarak RabbitMQ‘yu tercih edeceğiz ve yukarıdaki paragraflarda bahsedildiği üzere bu message broker’ı daha efektif hale getirebilmek için MassTransit Enterprise Service Bus kütüphanesinden istifade edeceğiz. Dolayısıyla yukarıda oluşturulan tüm servislere bahsi geçen ilgili kütüphaneleri aşağıdaki dotnet cli komutları eşliğinde yükleyelim.

Akabinde SagaStateMachine.Service haricindeki tüm servislerin ‘Startup.cs’ dosyasında aşağıdaki konfigürasyonları gerçekleştirerek, RabbitMQ entegrasyonunu sağlayalım.

    public class Startup
    {
        .
        .
        .
        public void ConfigureServices(IServiceCollection services)
        {
            .
            .
            .
            services.AddMassTransit(configure =>
            {
                configure.UsingRabbitMq((context, configurator) =>
                {
                    configurator.Host(Configuration.GetConnectionString("RabbitMQ"));
                });
            });

            services.AddMassTransitHostedService();
            .
            .
            .
        }
        .
        .
        .
    }

İdeal olarak her zaman yaptığımız gibi gerekli connection bilgilerini yapılandırma dosyasından yahut environment’tan talep etmekteyiz. Misal ilgili bilgilerin ‘appsettings.json’ dosyasından geldiğini varsayarsak eğer ilgili dosyanın içeriği aşağıdaki gibi olacaktır.

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "AllowedHosts": "*",
  "ConnectionStrings": {
    "RabbitMQ": "amqps://l***kqjU9x1bR@cattle.rmq**om/ll***"
  }
}

Burada RabbitMQ için cloudamqp.com adresindeki cloud ortamda yaratılan instance’a uygun bir bağlantı verilmiştir. Önceki makalede de RabbitMQ’nun cloud ortamına dair herhangi bir fikriniz olmadığı taktirde RabbitMQ – Cloud Ortamda(CloudAMQP) Kurulumu başlıklı makaleye göz atmanızı tavsiye etmiştim. Ve hala ediyorum 🙂

SagaStateMachine.Service‘in Yapılandırılması

Şimdi Saga orchestration implementasyonunun merkezi olan ve state machine’e karşılık gelen SagaStateMachine.Service‘in konfigürasyonlarını gerçekleştirelim.

SagaStateMachine.Service, özünde bir worker service’dir. Yani arkaplanda çalışan bir servis. Haliyle içerisinde ‘Startup.cs’ olmayan bu worker servis uygulamasının konfigürasyonlarını yapabilmek için ‘Program.cs’ dosyasındaki ‘CreateHostBuilder’ isimli metot içerisinde çağrılan ‘ConfigureService’ metoduna aşağıdaki müdahalelerde bulunmamız gerekmektedir.

    public class Program
    {
        .
        .
        .
        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    .
                    .
                    .
                    services.AddMassTransit(configure =>
                    {
                        configure.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
                        {
                            cfg.Host(hostContext.Configuration.GetConnectionString("RabbitMQ"));
                        }));
                    });

                    services.AddMassTransitHostedService();
                    .
                    .
                    .
                });
    }

Evet… Aslında yukarıdaki verilen tüm kodlar ilerideki adımlarda mevcut hallerinden daha da gelişerek bizlere eşlik edecektir. Şimdilik en temel konfigürasyonlarla yolculuğumuza başlamaya hazırız.

Servislerin Geliştirilmesi

Şimdi içeriğimizde sınırları çizilen senaryoya uygun bir şekilde Saga orchestration implementasyonu eşliğinde servislerimizi adım adım geliştirebiliriz. Tabi ki de burada pratiksel olarak ele alınan uygulamanın herhangi bir adımının makalenin seyrine göre farklı ihtiyaçlara istinaden değişikliğe uğrayabilme ihtimali vardır. Dolayısıyla iyi takip etmenizi ve hatta gerekirse sürece sizlerin de pratiksel olarak eşlik etmenizi tavsiye ederim… Buyrun başlayalım…

  • Adım 1 Order.API | Temel Altyapının Atılması
    Önceki makalemizde olduğu gibi burada da servislerimizin temel yapılanmasını inşa ederek başlayacağız. Tabi ki de sizler bu kısımları hızlıca geçebilir yahut bir önceki makalede oluşturulan projenin altyapısını burada kullanabilirsiniz.

    Şimdi ilk olarak Order.API servisinde, yapılacak siparişleri temsil edecek olan ‘Order’ ve ‘OrderItem’ nesnelerini modelleyerek başlayalım;

        public class Order
        {
            public int Id { get; set; }
            public int BuyerId { get; set; }
            public List<OrderItem> OrderItems { get; set; }
            public OrderStatus OrderStatus { get; set; }
            public DateTime CreatedDate { get; set; }
            public decimal TotalPrice { get; set; }
        }
    
        public class OrderItem
        {
            public int Id { get; set; }
            public int ProductId { get; set; }
            public int Count { get; set; }
            public decimal Price { get; set; }
        }
    

    Ayrıca bir siparişin durumunu ifade eden ‘OrderStatus’ enum’unu da tasarlayalım;

        public enum OrderStatus
        {
            Suspend,
            Completed,
            Fail
        }
    

    Ardından bu modelleri veritabanına migrate edebilmek için bir aşağıdaki gibi context nesnesi modelleyelim;

        public class ApplicationDbContext : DbContext
        {
            public ApplicationDbContext(DbContextOptions options) : base(options)
            {
            }
            public DbSet<Order> Orders { get; set; }
            public DbSet<OrderItem> OrderItems { get; set; }
        }
    

    Order.API‘da önceki makalede olduğu gibi MS SQL Server kullanacağımızdan dolayı Microsoft.EntityFrameworkCore.SqlServer kütüphanesini yükleyerek akabinde ilgili servisin ‘Startup.cs’ dosyasında aşağıdaki entegrasyonları gerçekleştirelim.

            public void ConfigureServices(IServiceCollection services)
            {
                .
                .
                .
                services.AddDbContext<ApplicationDbContext>(options =>
                    options.UseSqlServer(Configuration.GetConnectionString("SQLServer")));
                .
                .
                .
            }
    

    Yukarıdaki kod bloğuna dikkat ederseniz yapılandırma dosyasından(appsettings.json) ‘SQLServer’ değeri çekilmektedir. Haliyle ilgili dosyaya da aşağıdaki gibi değerlerimizi ekliyoruz;

      "ConnectionStrings": {
        "RabbitMQ": "amqps://llgi****dDpOkqjU9x1bR@ca**e.rmq2.cloudamqp.com/l*****",
        "SQLServer": "Server=***;Database=SagaOrchestrationDB;User Id=sa;Password=***;"
      }
    

    Böylece olması gereken temeli yapısal olarak inşa etmiş bulunmaktayız. Artık sırada inşa edilen bu altyapının migrate edilmesi vardır. Bunun için ilgili servise Microsoft.EntityFrameworkCore.Tools kütüphanesini yükleyerek akabinde ‘Package Manager Console’ üzerinden add-migration mig_1 talimatını vererek bir migration oluşturabilir ve ardından update-database talimatı ile bu migration’ı hedef SQL sunucusuna migrate edebiliriz. Tabi bu işlemleri Microsoft.EntityFrameworkCore.Design kütüphanesi eşliğinde dotnet cli ile de gerçekleştirebiliriz. Bunun içinde powershell üzerinden sırasıyla dotnet ef migrations add mig_1 ve dotnet ef database update talimatlarının verilmesi yeterli olacaktır.

  • Adım 2 Order.API | Sipariş Oluşturma
    Order.API servisinde sipariş talebini karşılayabilmek için OrdersController.cs isminde bir controller sınıfı oluşturalım ve içeriğini aşağıdaki gibi geliştirelim.

        [Route("api/[controller]")]
        [ApiController]
        public class OrdersController : ControllerBase
        {
            readonly ApplicationDbContext _applicationDbContext;
            public OrdersController(ApplicationDbContext applicationDbContext)
            {
                _applicationDbContext = applicationDbContext;
            }
            [HttpPost]
            public async Task<IActionResult> CreateOrder(OrderVM model)
            {
                Order.API.Models.Order order = new()
                {
                    BuyerId = model.BuyerId,
                    OrderItems = model.OrderItems.Select(oi => new OrderItem
                    {
                        Count = oi.Count,
                        Price = oi.Price,
                        ProductId = oi.ProductId
                    }).ToList(),
                    OrderStatus = OrderStatus.Suspend,
                    TotalPrice = model.OrderItems.Sum(oi => oi.Count * oi.Price),
                    CreatedDate = DateTime.Now
                };
    
                await _applicationDbContext.AddAsync<Order.API.Models.Order>(order);
    
                await _applicationDbContext.SaveChangesAsync();
                return Ok(true);
            }
        }
    

    Yukarıdaki kod bloğunu incelerseniz eğer ‘CreateOrder’ action metodunda gelen sipariş bilgilerine göre bir ‘Order’ nesnesi oluşturulmakta ve veritabanına eklenmektedir. Burada oluşturulan bu ‘Order’ nesnesinin durum bilgisine ilk etapta Suspend verildiğine dikkatinizi çekerim. Nihayetinde bir sipariş süreci farklı servislerde yapılan periyodik işlemler sürecinden geçmekte ve nihai olarak distributed transaction sağlanana kadar bu sipariş durumu hakkında kesin karar verilememektedir.

    Yukarıdaki yapılanmada viewmodel olarak kullanılan ‘OrderVM’ nesnesinin içeriğini de merak ederseniz aşağıdaki gibi olacaktır:

        public class OrderVM
        {
            public int BuyerId { get; set; }
            public List<OrderItemVM> OrderItems { get; set; }
        }
    
        public class OrderItemVM
        {
            public int ProductId { get; set; }
            public int Count { get; set; }
            public decimal Price { get; set; }
        }
    
  • Adım 3 Stock.API | Temel Altyapının Atılması
    Şimdi sıra Stock.API‘ın altyapısını oluşturmaya gelmiştir. Burada şimdilik yapılacak işlem oldukça basit olacaktır. Çeşitliliği arttırmak için bu servisimizde önceki choreography implemantasyonunu ele aldığımız makalemizde olduğu gibi MongoDB kullanacağız. Haliyle MongoDB altyapısının atılması gerekmektedir.

    Bunun için ilgili serviste ‘Services’ isimli bir klasör oluşturalım ve içerisine MongoDbService adında bir sınıf oluşturalım ve içeriğini aşağıdaki gibi inşa edelim.

        public class MongoDbService
        {
            readonly IMongoDatabase _database;
            readonly IConfiguration _configuration;
            public MongoDbService(IConfiguration configuration)
            {
                _configuration = configuration;
                MongoClient client = new(_configuration["MongoDB:Server"]);
                _database = client.GetDatabase(_configuration["MongoDB:DBName"]);
            }
            public IMongoCollection<T> GetCollection<T>() => _database.GetCollection<T>(typeof(T).Name.ToLowerInvariant());
        }
    

    Yukarıdaki kodu inşa edebilmek için MongoDB.Driver kütüphanesinin yüklenmesi gerekmektedir. Kod içerisindeki yapılandırma dosyasından(appsettings.json) gelen değerler aşağıdaki gibi olacaktır;

      "MongoDB": {
        "Server": "mongodb://localhost:27017",
        "DBName": "SagaOrchestrationDB"
      }
    

    Bu servisi inşa ettikten sonra tek yapılması gereken istenildiği yerde dependency injection ile talep edebilmek için ‘Startup.cs’ dosyasında aşağıdaki gibi belirtilmesidir;

            public void ConfigureServices(IServiceCollection services)
            {
                .
                .
                .
                services.AddSingleton<MongoDbService>();
                .
                .
                .
            }
    

    Ayrıca Stock.API‘ı ayağa kaldırdığımızda MongoDB’de hazır dummy data bulunması için ‘Program.cs’ dosyasında aşağıdaki gibi çalışma yapılması test sürecinde işimizi oldukça kolaylaştıracaktır.

        public class Program
        {
            public static void Main(string[] args)
            {
                var host = CreateHostBuilder(args).Build();
    
                using IServiceScope scope = host.Services.CreateScope();
                MongoDbService mongoDbService = scope.ServiceProvider.GetRequiredService<MongoDbService>();
                if (!mongoDbService.GetCollection<Models.Stock>().FindSync(x => true).Any())
                {
                    mongoDbService.GetCollection<Stock.API.Models.Stock>().InsertOne(new()
                    {
                        ProductId = 21,
                        Count = 200
                    });
                    mongoDbService.GetCollection<Stock.API.Models.Stock>().InsertOne(new()
                    {
                        ProductId = 22,
                        Count = 100
                    });
                    mongoDbService.GetCollection<Stock.API.Models.Stock>().InsertOne(new()
                    {
                        ProductId = 23,
                        Count = 50
                    });
                    mongoDbService.GetCollection<Stock.API.Models.Stock>().InsertOne(new()
                    {
                        ProductId = 24,
                        Count = 10
                    });
                    mongoDbService.GetCollection<Stock.API.Models.Stock>().InsertOne(new()
                    {
                        ProductId = 25,
                        Count = 30
                    });
                }
    
                host.Run();
            }
    
            public static IHostBuilder CreateHostBuilder(string[] args) =>
                Host.CreateDefaultBuilder(args)
                    .ConfigureWebHostDefaults(webBuilder =>
                    {
                        webBuilder.UseStartup<Startup>();
                    });
        }
    

    Yukarıdaki yapılanmanın inşa edilebilmesi için aşağıdaki ‘Stock’ modeline ihtiyaç vardır;

        public class Stock
        {
            [BsonRepresentation(BsonType.ObjectId)]
            [BsonId]
            [BsonElement(Order = 0)]
            public ObjectId Id { get; set; }
            [BsonRepresentation(BsonType.Int64)]
            [BsonElement(Order = 1)]
            public int ProductId { get; set; }
            [BsonRepresentation(BsonType.Int64)]
            [BsonElement(Order = 2)]
            public int Count { get; set; }
            [BsonRepresentation(BsonType.DateTime)]
            [BsonDateTimeOptions(Kind = DateTimeKind.Utc)]
            [BsonElement(Order = 3)]
            public DateTime CreatedDate { get; set; } = DateTime.Now;
        }
    
  • Adım 4 SagaStateMachine.Service | State Machine Altyapısının Kurulması
    Orchestration implementasyonunda en kritik noktalardan biri State Machine’i iyi kavrayabilmekten geçmektedir. Makalemizin önceki paragraflarında State Machine olarak MassTransit kütüphanesinin dahili olarak getirdiği Automatonymous kütüphanesinden faydalanacağımızdan bahsetmiştik. Bu kütüphane bizlere state machine yapılanması için belli başlı aktörler sunmaktadır. Şimdi gelin öncelikle bu aktörlerin kimler olduğunu tanımlayalım;

    Aktör Açıklama
    State Machine State Machine yapılanmasını bizlere sunan sınıftır. Sorumluluk olarak state’leri, event’leri ve belirli davranışları belirleyen merkezi bir rol oynar. Yani distributed transaction’ı yönetecek olan sınıftır diyebiliriz. Bir sınıfın state machine olabilmesi için MassTransitStateMachine<T> arayüzünü uygulaması gerekmektedir. Generic olarak belirtilen T ise bir State Instance almaktadır.
    State Instance Bir state machine verisini temsil eden sınıftır. Misal olarak aynı anda yapılan iki farklı sipariş isteğinin her biri bir state instance olarak veritabanında tutulacaktır. Haliyle gelen bu istekleri birbirlerinden ayırt edebilmek için CorrelationId değeri kullanılmaktadır. Bu değer tekilleştirici niteliğe sahiptir. Bir sınıfın state instance olabilmesi için SagaStateMachineInstance arayüzünü uygulaması gerekmektedir.

    Orchestration implementasyonunda dört temel event vardır. Bu event’ler;

    • Tetikleyici Event: İlgili service’i tetikleyecek/çalıştıracak/işlevsel hale getirecek olan event’tir.
    • Başarılı Event: İşlevin başarıyla sonuçlandığını ifade eden event’tir.
    • Başarısız Event: İşlevin başarısızlıkla sonuçlandığını ifade eden event’tir.
    • Compensable Event: Yapılan işlemlerin geri alınmasını bildiren event’tir.

    Dolayısıyla gelen her tetikleyici event için yeni bir state instance oluşturulacak ve diğer event’ler de ise önceden oluşturulmuş state instance CorrelationId üzerinden tespit edilip üzerinden işlem gerçekleştirilecektir.

    State DbContext State Machine’in verilerinin tutulacağı veritabanını yönetecek olan context nesnesidir. SagaDbContext abstract class’ından türemektedir.
    State Map Veritabanına kaydedilen State Instance’da ki property’lerin validasyon ayarlarının yapılmasını sağlayan sınıftır. SagaClassMap<T> abstract class’ından türemektedir. Generic olan T parametresi bir State Instance alır.

    Şimdi gelin bu aktörleri tek tek oluşturalım ve SagaStateMachine.Service‘in altyapısını kurmuş olalım. Tabi burada ilgili sınıfların oluşturulması yukarıdaki tablodaki sırayı seyretmeyecek, öncelik dikkate alınarak gerçekleştirilecektir.

    • State Instance
          /// <summary>
          /// OrderStateInstance : Her bir sipariş eklendiğinde(tetikleyici event geldiğinde)
          /// bu siparişe karşılık Saga State Machine'de tutulacak olan satırı Order State Instance
          /// olarak tarif etmekteyiz.
          /// </summary>
          public class OrderStateInstance : SagaStateMachineInstance
          {
              /// <summary>
              /// Her bir State Instance özünde bir siparişe özeldir. Haliyle bu State Instance'ları
              /// birbirinden ayırabilmek için CorrelationId(yani bildiğiniz unique id) kullanılmaktadır
              /// </summary>
              public Guid CorrelationId { get; set; }
              public string CurrentState { get; set; }
              public int OrderId { get; set; }
              public int BuyerId { get; set; }
              public decimal TotalPrice { get; set; }
              public DateTime CreatedDate { get; set; }
          }
      

      Dikkat ederseniz eğer State Machine’in herbir satırına karşılık gelecek olan State Instance’da bizlere lazım olan verileri de tutmaktayız. Bu demek oluyor ki, choreography implementasyonundaki event’lerin taşıyacağı datalarla orchestration implementasyonundaki state machine’de ki event’lerin aşıyacağı datalar farklıdır…

    • State Map
          public class OrderStateMap : SagaClassMap<OrderStateInstance>
          {
              protected override void Configure(EntityTypeBuilder<OrderStateInstance> entity, ModelBuilder model)
              {
                  entity.Property(x => x.BuyerId).IsRequired();
                  entity.Property(x => x.OrderId).IsRequired();
                  entity.Property(x => x.TotalPrice).HasDefaultValue(0);
              }
          }
      

      SagaClassMap<T>‘e erişebilmek için MassTransit.EntityFrameworkCore kütüphanesini uygulamaya yüklemeniz gerekmektedir.

    • State DbContext
          public class OrderStateDbContext : SagaDbContext
          {
              public OrderStateDbContext(DbContextOptions options) : base(options)
              {
              }
      
              /// <summary>
              /// ISagaClassMap : Order State Instance'da ki property'lerin
              /// validasyon ayarlarının yapılmasını sağlar.
              /// </summary>
              protected override IEnumerable<ISagaClassMap> Configurations
              {
                  get
                  {
                      yield return new OrderStateMap();
                  }
              }
          }
      
    • State Machine
          public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
          {
              public OrderStateMachine()
              {
                  //State Instance'da ki hangi property'nin sipariş sürecindeki state'i tutacağı bildiriliyor.
                  //Yani artık tüm event'ler CurrentState property'sin de tutulacaktır!
                  InstanceState(instance => instance.CurrentState);
      
      
                  /*
                      .
                      Event çalışmaları
                      .
                   */
              }
          }
      

      State Machine şimdilik en sade haliyle oluşturulmuştur. Burada dikkat edilmesi gereken öncelikli husus veritabanına durum kaydı yapan State Instance property’lerinden hangisinin gerçek state bilgisini tuttuğu ayarıdır. Bunu yukarıdaki koda dikkat ederseniz ‘InstanceState’ ile gerçekleştirmekteyiz.

    Son olarak yapılan bu inşaların uygulamaya entegre edilmesi gerekmektedir. Bunun için, SagaStateMachine.Service bir worker service olduğundan dolayı ‘Program.cs’ dosyasındaki ‘CreateHostBuilder’ metodunda aşağıdaki gibi çalışmamız gerekmektedir.

            public static IHostBuilder CreateHostBuilder(string[] args) =>
                Host.CreateDefaultBuilder(args)
                    .ConfigureServices((hostContext, services) =>
                    {
                        services.AddMassTransit(configure =>
                        {
                            configure.AddSagaStateMachine<OrderStateMachine, OrderStateInstance>()
                              .EntityFrameworkRepository(options =>
                              {
                                  options.AddDbContext<DbContext, OrderStateDbContext>((provider, builder) =>
                                  {
                                      builder.UseSqlServer(hostContext.Configuration.GetConnectionString("SQLServer"));
                                  });
                              });
    
                            configure.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
                            {
                                cfg.Host(hostContext.Configuration.GetConnectionString("RabbitMQ"));
                            }));
                        });
    
                        services.AddMassTransitHostedService();
                    });
    

    Yukarıdaki kod bloğunda 7 ile 14. satırlara göz atarsanız eğer ‘AddSagaStateMachine’ metodu ile MassTransit servisine state machine entegre edilmekte ve generic olarak hangi state machine nesnesine hangi state instance nesnesinin olacağı bildirilmektedir. Ayrıca ‘EntityFrameworkRepository’ metodu ile state’lerin hangi veritabanında depolanacağı bildirilmektedir. Malum burada da SQL Server kullanacağımızdan dolayı Microsoft.EntityFrameworkCore.SqlServer kütüphanesini yüklemeniz gerekmektedir.

    16 ile 19. satırlar arasında ise bu servisin RabbitMQ ile entegrasyonu gerçekleştirilmiştir. Bunuda makalemizin yukarı paragraflarında -SagaStateMachine.Service‘in Yapılandırılması- başlığı altında önceden gerçekleştirmiştik.

    Şimdi Microsoft.EntityFrameworkCore.Tools kütüphanesi eşliğinde add-migration mig_1 ve update-database talimatlarını vererek gerekli migrate işlemlerini gerçekleştirelim.

    Son olarak bağlantı cümlecikleri ‘appsettings.json’ dosyasından geleceğinden dolayı ilgili dosya içeriğide aşağıdaki gibi olmalıdır;

      "ConnectionStrings": {
        "SQLServer": "Server=***;Database=SagaStateMachineDB;User Id=sa;Password=***;",
        "RabbitMQ": "amqps://*kqjU9x**ttle.rmq2.cloudamqp**.com/ll**ys"
      }
    
  • Adım 5 Shared | Event ve Message’ların Oluşturulması
    Şimdi State Machine için gerekli event ve message’ları işlevsel açıdan kullanım sırasına uygun şekilde oluşturalım.

    Event ile Message arasındaki temel fark; event’ler olmuş olayları, message’lar ise olacak olayları temsil eder!

    Event’ler;

    1. OrderStartedEvent (Tetikleyici Event)
      Tetikleyici event’tir. Bir sipariş sürecinin başladığını ifade eder. Haliyle tetikleyici olduğu için State Machine’de bir Order’a karşılık State Instance satırı ekleyecektir. Dikkat ederseniz içerisinde ‘OrderId’, ‘BuyerId’ gibi State Instance’da belirttiğimiz verilere karşılık property’ler mevcuttur. Haliyle bir sipariş süreci bu veriler eşliğinde başlatılacaktır.

          public class OrderStartedEvent
          {
              public int OrderId { get; set; }
              public int BuyerId { get; set; }
              public decimal TotalPrice { get; set; }
              public List<OrderItemMessage> OrderItems { get; set; }
          }
      
    2. OrderCreatedEvent (Başarılı Event)
      Sipariş oluşturulduktan sonra Stock.API‘ı tetikleyebilmek için yayınlanacak event’tir.

          public class OrderCreatedEvent : CorrelatedBy<Guid>
          {
              public OrderCreatedEvent(Guid correlationId)
              {
                  CorrelationId = correlationId;
              }
              public Guid CorrelationId { get; }
              public List<OrderItemMessage> OrderItems { get; set; }
          }
      

      Burada dikkatinizi çekmek istediğim iki husus mevcuttur. Bunlardan ilki, bu event’in artık hangi kullanıcının hangi siparişi için yayınlandığına dair bir veri taşımasına gerek yoktur! Çünkü bu veriler zaten State Machine’de tutulmaktadır. İkinci husus ise, ilgili siparişin State Machine’de hangi korelasyon değerine sahip olan instance’a karşılık geldiğini ifade edecek olan ‘CorrelationId’ property’sini uygulayacak CorrelatedBy<Guid> arayüzüdür!

      Bu property’de ki korelasyon değeri sayesinde tetikleyici event(OrderStartedEvent) geldikten sonra oluşturulan State Instance hangisiyse onun üzerinde state bilgisi değiştirilecektir.

    3. OrderCompletedEvent (Başarılı Event)
      Siparişin başarıyla sonlandığını ifade eden event’tir. Bu event State Machine tarafından yayıldığında Order.API servisinde ilgili ‘OrderId’ye karşılık gelen siparişin durumu Completed‘a çekilecektir. Ayrıca dikkat ederseniz bu event’te herhangi bir korelasyon değeri bulunmamaktadır. Çünkü bu event, siparişin sonlandığını ifade ettiğinden dolayı State Machine tarafından da bu event geldiğinde ilgili siparişe dair kayıt sonlandırılmış(Finilize) olacaktır.

          public class OrderCompletedEvent
          {
              public int OrderId { get; set; }
          }
      
    4. OrderFailedEvent (Başarısız Event)
      Siparişin başarısız olduğunu ifade eden event’tir. Order.API tarafından ilgili siparişin Fail durumuna çekilmesini sağlayacaktır.

          public class OrderFailedEvent
          {
              public int OrderId { get; set; }
              public string Message { get; set; }
          }
      
    5. StockReservedEvent (Başarılı Event)
      Stock.API‘da ilgili stok işlemlerinin başarıyla gerçekleştirildiğini ifade eden event’tir. CorrelatedBy<Guid> arayüzü sayesinde ‘CorrelationId’ property’si uygulanarak State Machine’de hangi instance’ın mevzu bahis olduğunu taşımaktadır.

          public class StockReservedEvent : CorrelatedBy<Guid>
          {
              public StockReservedEvent(Guid correlationId)
              {
                  CorrelationId = correlationId;
              }
              public Guid CorrelationId { get; }
              public List<OrderItemMessage> OrderItems { get; set; }
          }
      
    6. StockNotReservedEvent (Başarısız Event)
      Stok işlemlerinin başarısız olduğunu ifade eden event’tir.

          public class StockNotReservedEvent : CorrelatedBy<Guid>
          {
              public StockNotReservedEvent(Guid correlationId)
              {
                  CorrelationId = correlationId;
              }
              public Guid CorrelationId { get; }
              public string Message { get; set; }
          }
      
    7. PaymentStartedEvent (Başarılı Event)
      Stock.API‘da işlemler başarılıysa eğer Payment.API‘ı tetikletecek olan event’tir.

          public class PaymentStartedEvent : CorrelatedBy<Guid>
          {
              public PaymentStartedEvent(Guid correlationId)
              {
                  CorrelationId = correlationId;
              }
              public Guid CorrelationId { get; }
              public decimal TotalPrice { get; set; }
              public List<OrderItemMessage> OrderItems { get; set; }
          }
      
    8. PaymentCompletedEvent (Başarılı Event)
      Ödeme işleminin başarılı olduğunu ifade eden event’tir.

          public class PaymentCompletedEvent : CorrelatedBy<Guid>
          {
              public PaymentCompletedEvent(Guid correlationId)
              {
                  CorrelationId = correlationId;
              }
              public Guid CorrelationId { get; }
          }
      
    9. PaymentFailedEvent (Başarısız Event)
      Ödeme işleminin başarısız olduğunu ifade eden event’tir.

          public class PaymentFailedEvent : CorrelatedBy<Guid>
          {
              public PaymentFailedEvent(Guid correlationId)
              {
                  CorrelationId = correlationId;
              }
              public Guid CorrelationId { get; }
              public List<OrderItemMessage> OrderItems { get; set; }
              public string Message { get; set; }
          }
      

    Message’lar;

    1. StockRollBackMessage (Compensable Event)
      ‘PaymentFailedEvent’ event’i yayınlanırsa eğer Stock.API‘da yapılan işlemlerin geri alınmasını sağlayacak olan message’dır.

          public class StockRollBackMessage
          {
              public List<OrderItemMessage> OrderItems { get; set; }
          }
      

    Bu arada yukarıda kod aralarında kullanılan ‘OrderItemMessage’ nesnesinin içeriği ise aşağıdaki gibi olacaktır.

        public class OrderItemMessage
        {
            public int ProductId { get; set; }
            public int Count { get; set; }
            public decimal Price { get; set; }
        }
    
  • Adım 6 Shared | Kuyruk İsimlerinin Ayarlanması
    Şimdi sırada servislerimizle State Machine arasındaki iletişimleri sağlayacak olan kuyruk isimlerinin ayarlanması var. Bunun için Shared class library’sinde ‘RabbitMQSettings.cs’ isminde bir static class oluşturalım ve içerisini aşağıdaki gibi dolduralım.

        public static class RabbitMQSettings
        {
            public const string StateMachine = "state-machine-queue";
            public const string Stock_OrderCreatedEventQueue = "stock-order-created-queue";
            public const string Payment_StartedEventQueue = "payment-started-queue";
            public const string Order_OrderCompletedEventQueue = "order-order-completed-queue";
            public const string Order_OrderFailedEventQueue = "order-order-failed-queue";
            public const string Stock_RollbackMessageQueue = "stock-roolback-queue";
        }
    

    Yukarıdaki kuyrukları tanımladığımız kodlara göz atarsanız eğer hususi bir standart belirlediğimizi görebileceksiniz. Burada standardımız gönderilecek event ya da message’ın hangi servis tarafından tüketileceği ile ilişkilendirilmektedir! Şöyle ki; [Consumer]_[Event/MessageName]Queue

  • Adım 7 Order.API | Siparişin Kuyruğa Atılması ve Böylece Tetikleyici Event’in Yayılması
    Hatırlarsanız eğer 2. adımda Order.API servisinde ‘OrdersController.cs’ controller’ı üzerinde bir sipariş oluşturmuştuk. İşte şimdi oluşturulan o siparişi State Machine kuyruğuna göndermemiz gerekmektedir. Bunun için ilgili controller içerisindeki ‘CreateOrder’ action metodu içerisinde aşağıdaki eklentilerin yapılması gerekmektedir;

        [Route("api/[controller]")]
        [ApiController]
        public class OrdersController : ControllerBase
        {
            readonly ApplicationDbContext _applicationDbContext;
            readonly ISendEndpointProvider _sendEndpointProvider;
            public OrdersController(ApplicationDbContext applicationDbContext,
                ISendEndpointProvider sendEndpointProvider)
            {
                _applicationDbContext = applicationDbContext;
                _sendEndpointProvider = sendEndpointProvider;
            }
            [HttpPost]
            public async Task<IActionResult> CreateOrder(OrderVM model)
            {
                Order.API.Models.Order order = new()
                {
                    BuyerId = model.BuyerId,
                    OrderItems = model.OrderItems.Select(oi => new OrderItem
                    {
                        Count = oi.Count,
                        Price = oi.Price,
                        ProductId = oi.ProductId
                    }).ToList(),
                    OrderStatus = OrderStatus.Suspend,
                    TotalPrice = model.OrderItems.Sum(oi => oi.Count * oi.Price),
                    CreatedDate = DateTime.Now
                };
    
                await _applicationDbContext.AddAsync<Order.API.Models.Order>(order);
    
                await _applicationDbContext.SaveChangesAsync();
    
                OrderStartedEvent orderStartedEvent = new()
                {
                    BuyerId = model.BuyerId,
                    OrderId = order.Id,
                    TotalPrice = model.OrderItems.Sum(oi => oi.Count * oi.Price),
                    OrderItems = model.OrderItems.Select(oi => new Shared.OrderItemMessage
                    {
                        Price = oi.Price,
                        Count = oi.Count,
                        ProductId = oi.ProductId
                    }).ToList()
                };
    
                ISendEndpoint sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new($"queue:{RabbitMQSettings.StateMachine}"));
                await sendEndpoint.Send<OrderStartedEvent>(orderStartedEvent);
                return Ok(true);
            }
        }
    

    34 ile 48. satır aralığına göz atarsanız eğer sonradan inşa ettiğimiz ‘OrderStartedEvent’ nesnesi sayesinde bir tetikleyici event oluşturulmakta ve bu state machine kuyruğuna gönderilmektedir. Artık state machine ile bu ve diğer event’lerin kontrolünü sağlayarak siparişin durumu hakkında distributed transaction’ı sağlayabiliriz.

  • Adım 8 SagaStateMachine.Service | State Machine’de Event’lerin Tanımlanması
    Evet… Şimdi en zor ve kritik noktaya gelmiş bulunmaktayız. Artık State Machine’imiz üzerinde gerekli geliştirmeyi sağlayacak ve uygulama state’ini bütünsel olarak yönetiyor olacağız. Bunun için ‘OrderStateMachine’ sınıfını açalım ve ilk olarak tetikleyici event geldiğinde yeni bir State Instance nesnesi oluşturması gerektiğini ifade edelim.

        public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
        {
            public Event<OrderStartedEvent> OrderStartedEvent { get; set; }
    
            public OrderStateMachine()
            {
                //State Instance'da ki hangi property'nin sipariş sürecindeki state'i tutacağı bildiriliyor.
                //Yani artık tüm event'ler CurrentState property'sin de tutulacaktır!
                InstanceState(instance => instance.CurrentState);
    
                //Eğer gelen event OrderStartedEvent ise CorrelateBy metodu ile veritabanında(database)
                //tutulan Order State Instance'da ki OrderId'si ile gelen event'te ki(@event) OrderId'yi
                //kıyasla. Bu kıyas neticesinde eğer ilgili instance varsa kaydetme. Yani yeni bir korelasyon
                //üretme! Yok eğer yoksa yeni bir korelasyon üret(SelectId)
                Event(() => OrderStartedEvent,
                    orderStateInstance =>
                    orderStateInstance.CorrelateBy<int>(database => database.OrderId, @event => @event.Message.OrderId)
                    .SelectId(e => Guid.NewGuid()));
            }
        }
    

    State Machine üzerinde bir event’i temsil edebilmek için 3. satırdaki gibi Event<T> türünden bir property oluşturulmalıdır. Bunu tüm event’ler için yapacağız. Şimdilik sembolik olarak ‘OrderStartedEvent’ için yapmış olduk. Aceleye gerek yok 🙂 Adım adım hepsi gelecek…

    Ardından 11. satıra göz atarsanız eğer ‘Event’ metodu ile gelen event’in ‘OrderStartedEvent’ olması durumunda ne yapacağını belirliyoruz. Evet, ‘Event’ metodu ise gelen event’e göre aksiyon almamızı sağlayan bir fonksiyondur. Burada gelen event ‘OrderStartedEvent’ ise State Machine veritabanındaki(database) ‘OrderId’ değeri ile event’te(OrderStartedEvent) gelen ‘OrderId’ değerini kıyaslıyoruz. Eğer bu kıyaslama neticesinde herhangi bir State Instance geliyorsa bu yeni bir sipariş değildir! Haliyle kaydetmeyeceğiz. Amma velakin bu gelen event bir tetikleyici yani başlatıcı bir event olduğu için yeni bir State Instance üretilecektir. Haliyle yeni bir korelasyon değeri olan ve bundan sonraki aynı korelasyon değerine sahip olan event’lerin durum bilgisini tutacak olan state instance eklenecektir. Bu yeni state instance ekleme işleminden de ‘SelectId’ metodu sorumludur.

    Şimdi diğer event’lerin hepsini ekleyelim;

        public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
        {
            public Event<OrderStartedEvent> OrderStartedEvent { get; set; }
            public Event<StockReservedEvent> StockReservedEvent { get; set; }
            public Event<PaymentCompletedEvent> PaymentCompletedEvent { get; set; }
            public Event<PaymentFailedEvent> PaymentFailedEvent { get; set; }
            public Event<StockNotReservedEvent> StockNotReservedEvent { get; set; }
    
            public OrderStateMachine()
            {
                //State Instance'da ki hangi property'nin sipariş sürecindeki state'i tutacağı bildiriliyor.
                //Yani artık tüm event'ler CurrentState property'sin de tutulacaktır!
                InstanceState(instance => instance.CurrentState);
    
                //Eğer gelen event OrderStartedEvent ise CorrelateBy metodu ile veritabanında(database)
                //tutulan Order State Instance'da ki OrderId'si ile gelen event'te ki(@event) OrderId'yi
                //kıyasla. Bu kıyas neticesinde eğer ilgili instance varsa kaydetme. Yani yeni bir korelasyon
                //üretme! Yok eğer yoksa yeni bir korelasyon üret(SelectId)
                Event(() => OrderStartedEvent,
                    orderStateInstance =>
                    orderStateInstance.CorrelateBy<int>(database => database.OrderId, @event => @event.Message.OrderId)
                    .SelectId(e => Guid.NewGuid()));
    
                //StockReservedEvent fırlatıldığında veritabanındaki hangi correlationid değerine sahip state
                //instance'ın state'ini değiştirecek bunu belirtmiş olduk!
                Event(() => StockReservedEvent,
                    orderStateInstance =>
                    orderStateInstance.CorrelateById(@event => @event.Message.CorrelationId));
    
                //StockNotReservedEvent fırlatıldığında veritabanındaki hangi correlationid değerine sahip state
                //instance'ın state'ini değiştirecek bunu belirtmiş olduk!
                Event(() => StockNotReservedEvent,
                    orderStateInstance =>
                    orderStateInstance.CorrelateById(@event => @event.Message.CorrelationId));
    
                //PaymentCompletedEvent fırlatıldığında veritabanındaki hangi correlationid değerine sahip state
                //instance'ın state'ini değiştirecek bunu belirtmiş olduk!
                Event(() => PaymentCompletedEvent,
                    orderStateInstance =>
                    orderStateInstance.CorrelateById(@event => @event.Message.CorrelationId));
    
                //PaymentFailedEvent fırlatıldığında veritabanındaki hangi correlationid değerine sahip state
                //instance'ın state'ini değiştirecek bunu belirtmiş olduk!
                Event(() => PaymentFailedEvent,
                    orderStateInstance =>
                    orderStateInstance.CorrelateById(@event => @event.Message.CorrelationId));
            }
        }
    

    Dikkat ederseniz tetikleyici event dışındaki tüm event’ler taşıdıkları korelasyon değeri ile eşleşen veritabanındaki State Instance satırı üzerinde işlem gerçekleştirmektedir/gerçekleştirecektir. Çünkü State Instance oluşturmak sade ve sadece tetikleyici event’in sorumluluğundadır. Diğer event’ler artık bu oluşturulmuş state instance üzerinde durum bilgisinin değişmesini sağlamaktadırlar.

    Ayrıca 5. adımda oluşturulan tüm event’lerin tanımlanmadığına dikkatinizi çekerim. Nihayetinde State Machine’e gelecek olan event’ler sadece bunlar olacaktır. Diğerleri ise State Machine tarafından Publish yahut Send edilecek event’lerdir. Buradaki temel ayrım budur. State Machine tarafından consume edilecek event’ler yukarıdaki gibi tanımlanırken, gönderilecek event’ler tanımlanmazlar!

  • Adım 9 SagaStateMachine.Service | State Machine’de State’lerin Tanımlanması
    Şimdi ise State Machine’de durumların(state) tanımlanmasını sağlayalım. Bunun için yine SagaStateMachine.Service içerisindeki ‘OrderStateMachine’ sınıfını açalım ve aşağıdaki eklentilerde bulunalım.

        public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
        {
            .
            .
            .
            public State OrderCreated { get; set; }
            public State StockReserved { get; set; }
            public State PaymentCompleted { get; set; }
            public State PaymentFailed { get; set; }
            public State StockNotReserved { get; set; }
    
            public OrderStateMachine()
            {
                InstanceState(instance => instance.CurrentState);
    
                .
                . Event(..)
                .
            }
        }
    

    Yukarıdaki kod bloğuna bakarsanız önceden yapılmış ‘Event’ tanımlamalarını her ne kadar yansıtmasakta onlarla birlikte o anki state’lerin durumunu ifade edecek olan ‘State’ property’leri tanımlanmıştır. Bu property’ler State Machine tarafından kullanılacak durumları ifade etmektedir.

    Bu state’leri aşağıdaki gibi kullanmakta ve gerekli kontrolleri sağlayabilmekteyiz.

        public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
        {
            .
            .
            .
    
            public State OrderCreated { get; set; }
            public State StockReserved { get; set; }
            public State PaymentCompleted { get; set; }
            public State PaymentFailed { get; set; }
            public State StockNotReserved { get; set; }
            public OrderStateMachine()
            {
                InstanceState(instance => instance.CurrentState);
    
                .
                . Event(..)
                .
    
                //İlgili instance'ın state'i initial/başlangıç aşamasındayken(Initially) 'OrderStartedEvent'
                //tetikleyici event'i geldiyse(When) şu işlemleri yap(Then). Ardından bu işlemler yapıldıktan
                //sonra ilgili instance'ı 'OrderCreated' state'ine geçir(TransitionTo). Ardından 'Stock.API'ı
                //tetikleyebilmek/haberdar edebilmek için 'OrderCreatedEvent' event'ini gönder(Publish/Send)
                Initially(When(OrderStartedEvent)
                    .Then(context =>
                    {
                        context.Instance.BuyerId = context.Data.BuyerId;
                        context.Instance.OrderId = context.Data.OrderId;
                        context.Instance.TotalPrice = context.Data.TotalPrice;
                        context.Instance.CreatedDate = DateTime.Now;
                    })
                    .Then(context => Console.WriteLine("Ara işlem 1"))
                    .Then(context => Console.WriteLine("Ara işlem 2"))
                    .TransitionTo(OrderCreated)
                    .Then(context => Console.WriteLine("Ara işlem 3"))
                    .Send(new Uri($"queue:{RabbitMQSettings.Stock_OrderCreatedEventQueue}"), context => new OrderCreatedEvent(context.Instance.CorrelationId)
                    {
                        OrderItems = context.Data.OrderItems
                    }));
    
                //Eğer state 'OrderCreated' ise(During) ve o anda 'StockReservedEvent' event'i geldiyse(When)
                //o zaman state'i 'StockReserved' olarak değiştir(TransitionTo) ve belirtilen kuyruğa 
                //'PaymentStartedEvent' event'ini gönder(Send)
                During(OrderCreated,
                    When(StockReservedEvent)
                    .TransitionTo(StockReserved)
                    .Send(new Uri($"queue:{RabbitMQSettings.Payment_StartedEventQueue}"), context => new PaymentStartedEvent(context.Instance.CorrelationId)
                    {
                        OrderItems = context.Data.OrderItems,
                        TotalPrice = context.Instance.TotalPrice
                    }),
    
                    //Yok eğer State 'OrderCreated' iken(During) 'StockNotReservedEvent' event'i geldiyse(When)
                    //o zaman state'i 'StockNotReserved' olarak değiştir(TransitionTo) ve belirtilen
                    //kuyruğa 'OrderFailedEvent' event'ini gönder.
                    When(StockNotReservedEvent)
                    .TransitionTo(StockNotReserved)
                    .Send(new Uri($"queue:{RabbitMQSettings.Order_OrderFailedEventQueue}"), context => new OrderFailedEvent()
                    {
                        OrderId = context.Instance.OrderId,
                        Message = context.Data.Message
                    }));
    
                //Eğer ilgili sipariş 'StockReserved' durumunda iken(During) 'PaymentCompletedEvent' event'i geldiyse(When)
                //'PaymentCompleted' state'i olarak değiştir(TransitionTo) ve ardından belirtilen kuyruğa 
                //'OrderCompletedEvent' event'ini gönder. Ayrıca artık bu sipariş başarılı olacağından dolayı
                //State Machine tarafından bu State Instance'ı başarıyla sonlandır(Finalize) Haliyle böylece sonuç olarak
                //ilgili instance'ın state'inde 'Final' yazacaktır!
    
                During(StockReserved,
                    When(PaymentCompletedEvent)
                    .TransitionTo(PaymentCompleted)
                    .Send(new Uri($"queue:{RabbitMQSettings.Order_OrderCompletedEventQueue}"), context => new OrderCompletedEvent
                    {
                        OrderId = context.Instance.OrderId
                    })
                    .Finalize(),
    
                    //Yok eğer mevcut state 'StockReserved' iken(During) 'PaymentFailedEvent' event'i gelirse(When)
                    //o zaman state'i 'PaymentFailed' olarak değiştir(TransitionTo) ve belirtilen kuyruklara 
                    //'OrderFailedEvent' ve 'StockRollBackMessage' event'lerini gönder(Send).
                    When(PaymentFailedEvent)
                    .TransitionTo(PaymentFailed)
                    .Send(new Uri($"queue:{RabbitMQSettings.Order_OrderFailedEventQueue}"), context => new OrderFailedEvent()
                    {
                        OrderId = context.Instance.OrderId,
                        Message = context.Data.Message
                    })
                    .Send(new Uri($"queue:{RabbitMQSettings.Stock_RollbackMessageQueue}"), context => new StockRollBackMessage
                    {
                        OrderItems = context.Data.OrderItems
                    }));
    
                //Finalize olan instance'ları veritabanından kaldırıyoruz!
                SetCompletedWhenFinalized();
            }
        }
    

    Şimdi yukarıdaki kod bloğunu satır satır izah etmemiz gerekirse eğer ilk olarak 24 ile 39. satır aralığına göz atmamız gerekmektedir. Tetikleyici event geldiğinde State Machine’de ilk karşılayıcı state Initially fonksiyonu tarafından tanımlanmış olan ‘Initial’ olacaktır. Burada When fonksiyonu ile o anki gelen event ‘OrderStartedEvent’ ise Then fonksiyonu içerisindeki işlemleri yaptırıyoruz. Then fonksiyonuna bakarsanız eğer oluşturulacak olan State Instance’ın hangi property’sine tetikleyici event’te gelen hangi property’lerin atanacağı belirtilmektedir. Zaten örnek mahiyetinde olması için ara işlem olarak ifade edilen Then fonksiyonları ile araya girip farklı işlemler yapabildiğimizi göstermekteyim. Devamında ise bu işlemlerden sonra TransitionTo fonksiyonu aracılığıyla ilgili siparişe ait instance’ın durumunu ‘OrderCreated’a çekiyoruz ve ardından Send fonksiyonu ile Stock.API‘a ‘OrderCreatedEvent’ini göndererek haber veriyoruz.

    Diğer satırlara bakmadan önce burada aydınlatmamız gereken bir kod mantığı vardır. O da context.Instance ile context.Data arasındaki farktır. context.Instance, veritabanındaki ilgili siparişe karşılık gelen instance satırını temsil etmektedir. context.Data ise o anki ilgili event’tan gelen datayı temsil eder.

    Devam edersek eğer 44 ile 62. satır aralığını ele alalım. Burada ise artık tetikleyici event’ten sonraki durumları kontrol ettiğimiz During fonksiyonu ile o anki durumun ‘OrderCreated’ olup olmadığını denetliyoruz. Eğer ‘OrderCreated’ ise ve gelen event ‘StockReservedEvent’ ise(When) durumu ‘StockReserved’ olarak güncelliyoruz(TransitionTo) ve Payment.API‘a ‘PaymentStartedEvent’i gönderiyoruz(Send).

    56. satırda ise gelen event’in ‘StockNotReservedEvent’ olma durumuna istinaden(When) state’i ‘StockNotReserved’ olarak güncelliyoruz(TransitionTo) ve bu sefer de bu siparişin başarısız olduğunu haber edebilmek için Order.API‘ı ‘OrderFailedEvent’ ile uyarıyoruz(Send).

    70 ile 92. satır aralığına bakarsak eğer yine benzer mantıkta ‘StockReserverd’ durumunda iken(During) ‘PaymentCompletedEvent’ geliyorsa eğer(When) durumu ‘PaymentCompleted’e çekiyoruz(TransitionTo) ve ardından siparişin başarıyla tamamlandığına dair Order.API‘a ‘OrderCompletedEvent’ ile haber veriyoruz. Tabi burada önemli olan husus sipariş tamamlandıysa bunu Finalize ile bildiriyor olmamızdır.

    82. satırda ise yine aynı durumda ‘PaymentFailedEvent’ geliyor ise(When) durumu ‘PaymentFailed’e çekiyoruz(TransitionTo) ve siparişte bir hata olduğuna dair bir yandan Order.API‘a ‘OrderFailedEvent’ bir yandan da Stock.API‘a ise ‘StockRollBackMessage’ event’leri gönderiliyor.

    Son olarak finalize olan instance’ları veritabanında tutmanın lüzumu olmadığı için SetCompletedWhenFinalized fonksiyonu ile instance’ı fiziksel olarak veritabanından kaldırıyoruz.

    Tabi burada yapılan çalışmalar neticesinde State Machine’in en son halini paylaşmamız gerekirse eğer aşağıdaki gibi olacaktır.

        public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
        {
            public Event<OrderStartedEvent> OrderStartedEvent { get; set; }
            public Event<StockReservedEvent> StockReservedEvent { get; set; }
            public Event<PaymentCompletedEvent> PaymentCompletedEvent { get; set; }
            public Event<PaymentFailedEvent> PaymentFailedEvent { get; set; }
            public Event<StockNotReservedEvent> StockNotReservedEvent { get; set; }
    
    
            public State OrderCreated { get; set; }
            public State StockReserved { get; set; }
            public State PaymentCompleted { get; set; }
            public State PaymentFailed { get; set; }
            public State StockNotReserved { get; set; }
            public OrderStateMachine()
            {
                //State Instance'da ki hangi property'nin sipariş sürecindeki state'i tutacağı bildiriliyor.
                //Yani artık tüm event'ler CurrentState property'sin de tutulacaktır!
                InstanceState(instance => instance.CurrentState);
    
                //Eğer gelen event OrderStartedEvent ise CorrelateBy metodu ile veritabanında(database)
                //tutulan Order State Instance'da ki OrderId'si ile gelen event'te ki(@event) OrderId'yi
                //kıyasla. Bu kıyas neticesinde eğer ilgili instance varsa kaydetme. Yani yeni bir korelasyon
                //üretme! Yok eğer yoksa yeni bir korelasyon üret(SelectId)
                Event(() => OrderStartedEvent,
                    orderStateInstance =>
                    orderStateInstance.CorrelateBy<int>(database => database.OrderId, @event => @event.Message.OrderId)
                    .SelectId(e => Guid.NewGuid()));
    
                //StockReservedEvent fırlatıldığında veritabanındaki hangi correlationid değerine sahip state
                //instance'ın state'ini değiştirecek bunu belirtmiş olduk!
                Event(() => StockReservedEvent,
                    orderStateInstance =>
                    orderStateInstance.CorrelateById(@event => @event.Message.CorrelationId));
    
                //StockNotReservedEvent fırlatıldığında veritabanındaki hangi correlationid değerine sahip state
                //instance'ın state'ini değiştirecek bunu belirtmiş olduk!
                Event(() => StockNotReservedEvent,
                    orderStateInstance =>
                    orderStateInstance.CorrelateById(@event => @event.Message.CorrelationId));
    
                //PaymentCompletedEvent fırlatıldığında veritabanındaki hangi correlationid değerine sahip state
                //instance'ın state'ini değiştirecek bunu belirtmiş olduk!
                Event(() => PaymentCompletedEvent,
                    orderStateInstance =>
                    orderStateInstance.CorrelateById(@event => @event.Message.CorrelationId));
    
                //PaymentFailedEvent fırlatıldığında veritabanındaki hangi correlationid değerine sahip state
                //instance'ın state'ini değiştirecek bunu belirtmiş olduk!
                Event(() => PaymentFailedEvent,
                    orderStateInstance =>
                    orderStateInstance.CorrelateById(@event => @event.Message.CorrelationId));
    
                //İlgili instance'ın state'i initial/başlangıç aşamasındayken(Initially) 'OrderStartedEvent'
                //tetikleyici event'i geldiyse(When) şu işlemleri yap(Then). Ardından bu işlemler yapıldıktan
                //sonra ilgili instance'ı 'OrderCreated' state'ine geçir(TransitionTo). Ardından 'Stock.API'ı
                //tetikleyebilmek/haberdar edebilmek için 'OrderCreatedEvent' event'ini gönder(Publish/Send)
                Initially(When(OrderStartedEvent)
                    .Then(context =>
                    {
                        context.Instance.BuyerId = context.Data.BuyerId;
                        context.Instance.OrderId = context.Data.OrderId;
                        context.Instance.TotalPrice = context.Data.TotalPrice;
                        context.Instance.CreatedDate = DateTime.Now;
                    })
                    .Then(context => Console.WriteLine("Ara işlem 1"))
                    .Then(context => Console.WriteLine("Ara işlem 2"))
                    .TransitionTo(OrderCreated)
                    .Then(context => Console.WriteLine("Ara işlem 3"))
                    .Send(new Uri($"queue:{RabbitMQSettings.Stock_OrderCreatedEventQueue}"), context => new OrderCreatedEvent(context.Instance.CorrelationId)
                    {
                        OrderItems = context.Data.OrderItems
                    }));
    
                //Eğer state 'OrderCreated' ise(During) ve o anda 'StockReservedEvent' event'i geldiyse(When)
                //o zaman state'i 'StockReserved' olarak değiştir(TransitionTo) ve belirtilen kuyruğa 
                //'PaymentStartedEvent' event'ini gönder(Send)
                During(OrderCreated,
                    When(StockReservedEvent)
                    .TransitionTo(StockReserved)
                    .Send(new Uri($"queue:{RabbitMQSettings.Payment_StartedEventQueue}"), context => new PaymentStartedEvent(context.Instance.CorrelationId)
                    {
                        OrderItems = context.Data.OrderItems,
                        TotalPrice = context.Instance.TotalPrice
                    }),
    
                    //Yok eğer State 'OrderCreated' iken(During) 'StockNotReservedEvent' event'i geldiyse(When)
                    //o zaman state'i 'StockNotReserved' olarak değiştir(TransitionTo) ve belirtilen
                    //kuyruğa 'OrderFailedEvent' event'ini gönder.
                    When(StockNotReservedEvent)
                    .TransitionTo(StockNotReserved)
                    .Send(new Uri($"queue:{RabbitMQSettings.Order_OrderFailedEventQueue}"), context => new OrderFailedEvent()
                    {
                        OrderId = context.Instance.OrderId,
                        Message = context.Data.Message
                    }));
    
                //Eğer ilgili sipariş 'StockReserved' durumunda iken(During) 'PaymentCompletedEvent' event'i geldiyse(When)
                //'PaymentCompleted' state'i olarak değiştir(TransitionTo) ve ardından belirtilen kuyruğa 
                //'OrderCompletedEvent' event'ini gönder. Ayrıca artık bu sipariş başarılı olacağından dolayı
                //State Machine tarafından bu State Instance'ı başarıyla sonlandır(Finalize) Haliyle böylece sonuç olarak
                //ilgili instance'ın state'inde 'Final' yazacaktır!
    
                During(StockReserved,
                    When(PaymentCompletedEvent)
                    .TransitionTo(PaymentCompleted)
                    .Send(new Uri($"queue:{RabbitMQSettings.Order_OrderCompletedEventQueue}"), context => new OrderCompletedEvent
                    {
                        OrderId = context.Instance.OrderId
                    })
                    .Finalize(),
    
                    //Yok eğer mevcut state 'StockReserved' iken(During) 'PaymentFailedEvent' event'i gelirse(When)
                    //o zaman state'i 'PaymentFailed' olarak değiştir(TransitionTo) ve belirtilen kuyruklara 
                    //'OrderFailedEvent' ve 'StockRollBackMessage' event'lerini gönder(Send).
                    When(PaymentFailedEvent)
                    .TransitionTo(PaymentFailed)
                    .Send(new Uri($"queue:{RabbitMQSettings.Order_OrderFailedEventQueue}"), context => new OrderFailedEvent()
                    {
                        OrderId = context.Instance.OrderId,
                        Message = context.Data.Message
                    })
                    .Send(new Uri($"queue:{RabbitMQSettings.Stock_RollbackMessageQueue}"), context => new StockRollBackMessage
                    {
                        OrderItems = context.Data.OrderItems
                    }));
    
                //Finalize olan instance'ları veritabanından kaldırıyoruz!
                SetCompletedWhenFinalized();
            }
        }
    
  • Adım 10 Order.API | OrderCompletedEventConsumer ve OrderFailedEventConsumer İsimli Consumer’ların Oluşturulması
    Şimdi Order.API için consume edilmesi gereken event’leri tüketecek olan consumer’ları oluşturalım.

    OrderCompletedEventConsumer, ‘OrderCompletedEvent’ neticesinde siparişi başarıyla sonlandıran consumer’dır.

        public class OrderCompletedEventConsumer : IConsumer<OrderCompletedEvent>
        {
            readonly ApplicationDbContext _applicationDbContext;
            public OrderCompletedEventConsumer(ApplicationDbContext applicationDbContext)
            {
                _applicationDbContext = applicationDbContext;
            }
            public async Task Consume(ConsumeContext<OrderCompletedEvent> context)
            {
                Models.Order order = await _applicationDbContext.Orders.FindAsync(context.Message.OrderId);
                if (order != null)
                {
                    order.OrderStatus = OrderStatus.Completed;
                    await _applicationDbContext.SaveChangesAsync();
                }
            }
        }
    

    OrderFailedEventConsumer, ‘OrderFailedEvent’ neticesinde siparişi başarısız olarak sonlandıran consumer’dır.

        public class OrderFailedEventConsumer : IConsumer<OrderFailedEvent>
        {
            readonly ApplicationDbContext _context;
            public OrderFailedEventConsumer(ApplicationDbContext context)
            {
                _context = context;
            }
            public async Task Consume(ConsumeContext<OrderFailedEvent> context)
            {
                Models.Order order = await _context.FindAsync<Models.Order>(context.Message.OrderId);
                if (order != null)
                {
                    order.OrderStatus = OrderStatus.Fail;
                    await _context.SaveChangesAsync();
                    Console.WriteLine(context.Message.Message);
                }
            }
        }
    
  • Adım 11 Stock.API | OrderCreatedEventConsumer ve StockRollbackMessageConsumer İsimli Consumer’ların Oluşturulması
    Aynı şekilde Stock.API için de consumer’ları oluşturalım.
    OrderCreatedEventConsumer, ‘OrderCreatedEvent’e göre stok bilgilerini güncelleyen consumer’dır.

        public class OrderCreatedEventConsumer : IConsumer<OrderCreatedEvent>
        {
            readonly MongoDbService _mongoDbService;
            readonly ISendEndpointProvider _sendEndpointProvider;
            readonly IPublishEndpoint _publishEndpoint;
    
            public OrderCreatedEventConsumer(
                MongoDbService mongoDbService,
                ISendEndpointProvider sendEndpointProvider,
                IPublishEndpoint publishEndpoint)
            {
                _mongoDbService = mongoDbService;
                _sendEndpointProvider = sendEndpointProvider;
                _publishEndpoint = publishEndpoint;
            }
    
            public async Task Consume(ConsumeContext<OrderCreatedEvent> context)
            {
                List<bool> stockResult = new();
                IMongoCollection<Models.Stock> collection = _mongoDbService.GetCollection<Models.Stock>();
    
                //Sipariş edilen ürünlerin stok miktarı sipariş adedinden fazla mı? değil mi?
                foreach (OrderItemMessage orderItem in context.Message.OrderItems)
                    stockResult.Add((await collection.FindAsync(s => s.ProductId == orderItem.ProductId && s.Count > orderItem.Count)).Any());
    
                //Eğer fazlaysa sipariş edilen ürünlerin stok miktarı güncelleniyor.
                ISendEndpoint sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{RabbitMQSettings.StateMachine}"));
                if (stockResult.TrueForAll(sr => sr.Equals(true)))
                {
                    foreach (OrderItemMessage orderItem in context.Message.OrderItems)
                    {
                        Models.Stock stock = await (await collection.FindAsync(s => s.ProductId == orderItem.ProductId)).FirstOrDefaultAsync();
                        stock.Count -= orderItem.Count;
                        await collection.FindOneAndReplaceAsync(x => x.ProductId == orderItem.ProductId, stock);
                    }
    
                    StockReservedEvent stockReservedEvent = new(context.Message.CorrelationId)
                    {
                        OrderItems = context.Message.OrderItems
                    };
                    await sendEndpoint.Send(stockReservedEvent);
                }
                //Eğer az ise siparişin iptal edilmesi için gerekli event gönderiliyor.
                else
                {
                    StockNotReservedEvent stockNotReservedEvent = new(context.Message.CorrelationId)
                    {
                        Message = "Stok yetersiz..."
                    };
    
                    await sendEndpoint.Send(stockNotReservedEvent);
                }
            }
        }
    

    StockRollbackMessageConsumer, ‘StockRollBackMessage’a göre güncellenmiş stok bilgilerini tekrar geri alan consumer’dır. Bir başka deyişle compensable işlemini gerçekleştirmektedir.

        public class StockRollbackMessageConsumer : IConsumer<StockRollBackMessage>
        {
            readonly MongoDbService _mongoDbService;
            public StockRollbackMessageConsumer(MongoDbService mongoDbService)
            {
                _mongoDbService = mongoDbService;
            }
            public async Task Consume(ConsumeContext<StockRollBackMessage> context)
            {
                var collection = _mongoDbService.GetCollection<Models.Stock>();
    
                foreach (var item in context.Message.OrderItems)
                {
                    Models.Stock stock = await (await collection.FindAsync(s => s.ProductId == item.ProductId)).FirstOrDefaultAsync();
                    if (stock != null)
                    {
                        stock.Count += item.Count;
                        await collection.FindOneAndReplaceAsync(s => s.ProductId == item.ProductId, stock);
                    }
                }
            }
        }
    
  • Adım 12 Payment.API | PaymentStartedEventConsumer Consumer’ının Oluşturulması
    Payment.API‘da da ‘PaymentStartedEvent’i dinleyip tetiklenecek olan consumer’ı oluşturalım. Bu consumer’da gerekli ödeme işlemlerini farazi olarak gerçekleştiriyor olacağız.

        public class PaymentStartedEventConsumer : IConsumer<PaymentStartedEvent>
        {
            readonly ISendEndpointProvider _sendEndpointProvider;
            readonly IPublishEndpoint _publishEndpoint;
            public PaymentStartedEventConsumer(ISendEndpointProvider sendEndpointProvider,
                IPublishEndpoint publishEndpoint)
            {
                this._sendEndpointProvider = sendEndpointProvider;
                _publishEndpoint = publishEndpoint;
            }
    
            public async Task Consume(ConsumeContext<PaymentStartedEvent> context)
            {
                ISendEndpoint sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{RabbitMQSettings.StateMachine}"));
                if (context.Message.TotalPrice <= 100)
                    await sendEndpoint.Send(new PaymentCompletedEvent(context.Message.CorrelationId));
                else
                    await sendEndpoint.Send(new PaymentFailedEvent(context.Message.CorrelationId)
                    {
                        Message = "Bakiye yetersiz!",
                        OrderItems = context.Message.OrderItems
                    });
            }
        }
    
  • Adım 13 Tüm Servisler | Consumer’ların Startup.cs’de Tanımlanması
    Şu ana kadar oluşturulan tüm consumer’lar ilgili servislerin ‘Startup.cs’ dosyasında aşağıdaki gibi tanımlanmalıdırlar;
    Order.API;

            public void ConfigureServices(IServiceCollection services)
            {
                .
                .
                .
                services.AddMassTransit(configure =>
                {
                    configure.AddConsumer<OrderCompletedEventConsumer>();
                    configure.AddConsumer<OrderFailedEventConsumer>();
    
                    configure.UsingRabbitMq((context, configurator) =>
                    {
                        configurator.Host(Configuration.GetConnectionString("RabbitMQ"));
    
                        configurator.ReceiveEndpoint(RabbitMQSettings.Order_OrderCompletedEventQueue, e =>
                        e.ConfigureConsumer<OrderCompletedEventConsumer>(context));
                        configurator.ReceiveEndpoint(RabbitMQSettings.Order_OrderFailedEventQueue, e =>
                        e.ConfigureConsumer<OrderFailedEventConsumer>(context));
                    });
                });
                .
                .
                .
            }
    

    Stock.API;

            public void ConfigureServices(IServiceCollection services)
            {
                .
                .
                .
                services.AddMassTransit(configure =>
                {
                    configure.AddConsumer<OrderCreatedEventConsumer>();
                    configure.AddConsumer<StockRollbackMessageConsumer>();
    
                    configure.UsingRabbitMq((context, configurator) =>
                    {
                        configurator.Host(Configuration.GetConnectionString("RabbitMQ"));
                        configurator.ReceiveEndpoint(RabbitMQSettings.Stock_OrderCreatedEventQueue, e => e.ConfigureConsumer<OrderCreatedEventConsumer>(context));
                        configurator.ReceiveEndpoint(RabbitMQSettings.Stock_RollbackMessageQueue, e => e.ConfigureConsumer<StockRollbackMessageConsumer>(context));
                    });
                });
                .
                .
                .
            }
    

    Payment.API;

            public void ConfigureServices(IServiceCollection services)
            {
                .
                .
                .
                services.AddMassTransit(configure =>
                {
                    configure.AddConsumer<PaymentStartedEventConsumer>();
    
                    configure.UsingRabbitMq((context, configurator) =>
                    {
                        configurator.Host(Configuration.GetConnectionString("RabbitMQ"));
    
                        configurator.ReceiveEndpoint(RabbitMQSettings.Payment_StartedEventQueue, e =>
                        e.ConfigureConsumer<PaymentStartedEventConsumer>(context));
                    });
                });
                .
                .
                .
            }
    

    Ayrıca SagaStateMachine.Service‘de aşağıdaki gibi ‘OrderStateInstance’ı receive ederek State Machine’in hangi kuyruğu dinleyeceğini bildirmek gerekmektedir;

        public class Program
        {
            .
            .
            .
            public static IHostBuilder CreateHostBuilder(string[] args) =>
                Host.CreateDefaultBuilder(args)
                    .ConfigureServices((hostContext, services) =>
                    {
                        services.AddMassTransit(configure =>
                        {
                            .
                            .
                            .
    
                            configure.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
                            {
                                cfg.Host(hostContext.Configuration.GetConnectionString("RabbitMQ"));
    
                                cfg.ReceiveEndpoint(RabbitMQSettings.StateMachine, e =>
                                e.ConfigureSaga<OrderStateInstance>(provider));
                            }));
                        });
    
                        services.AddMassTransitHostedService();
    
                    });
        }
    

    İşte bu kadar 🙂

Vee tüm işlemlerimiz bitti. Evet, biliyorum… İnşa süreci biraz zahmetli ve meşakkatli geçmiş olabilir. ☺ Ama şimdi sıra, yapılan tüm bu çalışmaların meyvesini toplamaya gelmiş bulunmaktadır. Ha gayret son kez test edelim şu yapılanmayı…

Test Edelim

İlk olarak tüm servisleri ayağa kaldırdığımızda RabbitMQ ile connection sağlayıp sağlamadıklarını gözlemleyelim.
Microservice - Saga - Commands-Orchestration Implemantasyonu İle Transaction YönetimiGörüldüğü üzere tüm servislerimiz başarıyla bağlantı sağlamış bulunmaktadır. Şimdi aşağıdaki gibi tutarlı verilerle bir sipariş isteğinde bulunalım ve süreci adım adım takip edelim.
Microservice - Saga - Commands-Orchestration Implemantasyonu İle Transaction YönetimiBu verilerle istek yapıldıktan sonra sipariş oluşturuluyor ve State Machine veritabanında ilgili siparişin durumu ‘OrderCreated’ olarak ayarlanıyor.
Microservice - Saga - Commands-Orchestration Implemantasyonu İle Transaction YönetimiArdından stok bilgileri MongoDB’de aşağıdaki gibi güncelleniyor.
Microservice - Saga - Commands-Orchestration Implemantasyonu İle Transaction YönetimiTabi bu durumda State Machine veritabanında ilgili siparişe dair durumumuz ‘StockReserved’ olarak güncelleniyor.
Microservice - Saga - Commands-Orchestration Implemantasyonu İle Transaction YönetimiVe ardından ilgili sipariş durumu ‘Orders’ tablosunda Completed olarak güncelleniyor ve ardından State Machine’den siparişe dair veri kaldırılıyor!
Microservice - Saga - Commands-Orchestration Implemantasyonu İle Transaction YönetimiSipariş isteğini tutarsız verilerle yaparsak eğer burada hem sipariş hatalı bir şekilde sonlandırılmış olacak hemde State Machine veritabanında bu siparişe dair sonlanışın neden olduğu state bilgisini rahatlıkla görebiliyor olacağız.

Misal; tutarsız stok bilgileriyle bir istek yaparsak eğer son görüntü aşağıdaki gibi olacaktır;
Microservice - Saga - Commands-Orchestration Implemantasyonu İle Transaction YönetimiYahut ödeme durumunda bir tutarsızlık oluşursa sistemin son hali aşağıdaki gibi olacaktır;
Microservice - Saga - Commands-Orchestration Implemantasyonu İle Transaction Yönetimi

Nihai olarak;
Oldukça uzun ve zahmetli bir içerik ile Saga pattern’ı Orchestration implementasyonu eşliğinde ele almış bulunmaktayız. İçeriğimiz okunurken elbette yer yer zor yahut karışık geliyor olabilir. Lakin bu kadar kompleks ve bol kritik gerektiren bir konuyu yazıya aktarabilmek sizlerin okurken sarfettiğiniz enerjiye eşdeğer bir zahmet gerektirdiğine emin olabilirsiniz. Elbet bir niyet ile başlayan bu uğraşın neticesini iyi kötü getirebilmenin sevinciyle hepinize iyi okumalar diliyor ve bu noktaya kadar sabredip eşlik ettiyseniz tüm samimiyetimle teşekkür ediyorum…

İlgilenenlerin faydalanması dileğiyle…
Sonraki yazılarımda görüşmek üzere…
İyi çalışmalar…

Not : Örnek projeyi indirebilmek için buraya tıklayınız.

Bunlar da hoşunuza gidebilir...

5 Cevaplar

  1. mnsr dedi ki:

    Aydınlatıcı içerik için emeğinize sağlık Teşekkür ederim.

  2. demet dedi ki:

    cok iyi bir içerik olmus tesekkurler

  3. Yakup dedi ki:

    Merhaba, başarılı olmayan işlemlerin tekrardan tetiklenmesi için nasıl bir yol izlemeliyiz?

  4. Gençay dedi ki:

    Değerli okuyucularım,

    Yukarıdaki içerikte oluşturduğumuz örnek uygulamaya aşağıdaki github adresinden erişebilirsiniz :

    https://github.com/gncyyldz/Saga-Orchestration

    İyi okumalar, bol faydalar…

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir