Derinlemesine yazılım eğitimleri için kanalımı takip edebilirsiniz...

Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme)

Merhaba,

Önceki, .NET Core Ortamında ‘Event Store’ İle Event Sourcing Yapılanması başlıklı makalemde Event Sourcing yapılanması için Event Store ile yapılması gereken temel ayarlardan ve bağlantı konfigürasyonlarından bahsetmiştik. Bu içeriğimizde ise bir Asp.NET Core uygulamasında belirlediğimiz bir senaryoya istinaden Event Store tool’unun eşliğinde Event Sourcing örneklendirmesi gerçekleştireceğiz. Haliyle hiç vakit kaybetmeksizin direkt olarak içeriğimizin temeli olan senaryoyu ele alarak konuya giriş yapalım.

Senaryo

Örneklendirmede ele alacağımız senaryo bir sistemdeki kullanıcı(Users) davranışları üzerine olacaktır. Bir kullanıcı datası üzerinde varsayım olarak oluşabilecek tüm event’ler aşağıdaki tabloda listelenmiştir.

Açıklama Event
Kullanıcı eklendiğinde UserCreated
Kullanıcı adı değiştirildiğinde UserNameChanged
Kullanıcı email’i onaylandığında UserEmailApproved

Başlarken

Yapısal olarak Event Sourcing için bir data üzerindeki tüm etkinliklerin kayıt altında tutulduğu bir pattern olduğunu önceki makalelerimizde uzun uzadıya konuşmuştuk. Haliyle şimdi bu pattern’ı uygularken yaptığımız girizgahtan da belli olduğu üzere data üzerindeki etkinliklerin kaydını Event Store tool’un da tutacak ve böylece ‘Write Data Store’ olarak ilgili veritabanını kullanacağız. ‘Read Data Store’ olarak kullanılacak olan veritabanındaki güncellemeyi ise Messaging ile gerçekleştireceğiz ve bu işlemi de muhtemelen bir sonraki makalemizde ele alıyor olacağız.

Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme)

Görsel Kaynağı : https://subscription.packtpub.com/book/application_development/9781786469342/7/07lvl1sec43/event-sourcing-and-cqrs
Esasında görüldüğü üzere Event Sourcing, CQRS pattern ile birlikte tam bir uyumluluk sergilemekte olan bir yapılandır. ‘Write Data Store’ olarak herhangi bir veritabanı yahut sağlayıcı kullanılabilir. Biz burada oldukça etkili olan Event Store tool’unu tercih ediyoruz. Amma velakin bir event geldiği taktirde bu yeniliği anlık olarak ‘Read Data Store’ veritabanına yansıtılması elzemdir. Aksi taktirde büyük bir veri tutarsızlığı söz konusu olabilir. Bu konuya da yukarıda da bahsedildiği üzere sonraki makalemizde değinmiş olacak olsakta, bu makalemizin ileriki satırlarında ufaktan temas etmiş olacağız…


Velhasıl, örneklendirmede eşlik edebilmeniz için hali hazırda oluşturulmuş bir Asp.NET Core – API uygulaması açınız ve içerisine EventStore.Client kütüphanesini yükleyiniz. Ardından geliştirilmeye hazır bir adet ‘UsersController’ isminde controller oluşturunuz. Evet, artık bu kadarı kafidir. Yavaştan event sourcing pattern’ını geliştirmeye başlayabiliriz.

Uygulama Geliştirme

Bu makalede belli başlı bir event sourcing stratejisi gösterilecektir. Tabi ki de bu strateji özelleştirilebilir yahut daha farklı stratejilerde bir tasarım ortaya koyulabilir. Ayrıca adım adım geliştirme süreçlerinde isteğiniz ya da ihtiyacınız doğrultusunda öncelik sıralamasını değiştirebilirsiniz. Misal, bizler aşağıda öncelikle event’lerin geliştirilmesinden başlayacağız. Sizler isterseniz öncelikle exception sınıflarını geliştirebilir yahut ‘Aggregate’(yazımızın devamında ne işe yaradığı anlatılacaktır) sınıfını ele alabilirsiniz. Yeter ki olağan olsun 🙂

  • Adım 1 Entity’nin Oluşturulması
    İlk olarak hangi model üzerinde event sourcing operasyonu gerçekleştirdiğimizi bilmek için öncelikle entity model’imizi oluşturalım.

        public class User
        {
            public int Id { get; set; }
            public string Name { get; set; }
            public string UserName { get; set; }
            public string Email { get; set; }
            public bool EmailApprove { get; set; }
        }
    
  • Adım 2 Event’lerin Oluşturulması
    Bir event sourcing tasarımında en önemli aktörler event’lerdir. Dolayısıyla bu event’leri tarif eden, onlara karşılık gelen sınıfların üretilmesi gerekmektedir. Tabi ki de bu sınıfların sistemde bir event olduğuna dair ayırt edici bir özellikle işaretlenmesi için öncelikle bir ‘IEvent’ adında interface oluşturabiliriz.

        public interface IEvent
        {
        }
    

    Bu interface ile uygulamada ki tüm event’lerimizi işaretleyeceğiz.

    UserCreated, kullanıcı eklendiğini/oluşturulduğunu tarif eden event’tir. İçeriksel olarak bir kullanıcıyı oluşturmak için gerekli dataları tutacak property’ler tanımlanmıştır.

        public class UserCreated : IEvent
        {
            public int UserId { get; set; }
            public string Name { get; set; }
            public string UserName { get; set; }
            public string Email { get; set; }
            public bool EmailApprove { get; set; }
        }
    

    UserNameChanged, kullanıcının adının güncellendiğini tarif eden event’tir. İçerisinde hangi kullanıcıya dair(UserId) hangi yeni isim bilgilerini(NewName) tutacak olan property’ler tanımlanmıştır.

        public class UserNameChanged : IEvent
        {
            public int UserId { get; set; }
            public string NewName { get; set; }
        }
    

    UserEmailApproved, kullanıcı email’inin doğrulandığını tarif eden event’tir. Bu işlemin hangi kullanıcı için yapıldığını bilmemiz yeterli olacağından dolayı sadece UserId bilgisi tutulmaktadır.

        public class UserEmailApproved : IEvent
        {
            public int UserId { get; set; }
        }
    
  • Adım 3 Exception Sınıflarının Oluşturulması
    Uygulamadaki event sourcing sürecinde oluşabilecek istisnai durumları tarif edebilmek ve daha dinamik yönetebilmek için gerekli exception sınıfları oluşturulmalıdır. Misal olarak, oluşan herhangi bir event’i gerekli kümeye/aggregate’e/stream’e atayabilmek için öncelikle ilgili stream’in var olması gerekmektedir. Aksi taktirde aşağıdaki StreamNotFoundException isimli exception sınıfı fırlatılarak, sistem uyarılabilir.

        public class StreamNotFoundException : Exception
        {
            public StreamNotFoundException() : base("Stream not found")
            { }
            public StreamNotFoundException(string message) : base(message)
            { }
        }
    
  • Adım 4 Aggregate Abstract Class’ının Oluşturulması
    Bir kullanıcı verisine karşılık oluşabilecek tüm event’lerin toplamı olan ‘Aggregate’i temsil edecek olan abstract class’ın aşağıdaki gibi tasarlanması oldukça idealdir. Gerekli açıklamalar kodun içerisinde yapılmıştır.

        public abstract class Aggregate
        {
            //Oluşan tüm event'leri tutacak koleksiyon.
            protected readonly List<IEvent> events = new();
            public List<IEvent> GetEvents => events;
            //Event'lerin tutulacağı Aggregate/Stream adı.
            public string StreamName { get; private set; }
            public void SetStreamName(string streamName)
                => StreamName = streamName;
            //Stream adının atanıp atanmadığını kontrol eden fonksiyon.
            protected bool CheckStreamName()
                => string.IsNullOrEmpty(StreamName) || string.IsNullOrWhiteSpace(StreamName);
        }
    
  • Adım 5 ‘UserAggregate’ Sınıfının Tasarlanması
    ‘Aggregate’ abstract class’ını tasarladıktan sonra artık sırada concrete aggregate sınıflarının tasarlanması vardır. Bizler bu makalemizde sadece kullanıcı üzerinde bir event sourcing çalışması yaptığımız için aşağıdaki gibi UserAggregate isimli bir sınıfın tasarlanması yeterli olacaktır. Sizler ise projenizde bulunan farklı aggregate’lere göre concrete’leri oluşturmalı ve ihtiyaca binaen gerekli çalışmaları sergilemelisiniz.

        public class UserAggregate : Aggregate
        {
            //Kullanıcı oluşturulduğunda
            public void Created(User model)
            {
                if (CheckStreamName())
                    throw new StreamNotFoundException();
    
                UserCreated userCreated = new()
                {
                    UserId = model.Id,
                    Email = model.Email,
                    EmailApprove = model.EmailApprove,
                    Name = model.Name,
                    UserName = model.UserName
                };
                events.Add(userCreated);
            }
            //Kullanıcı adı değiştirildiğinde
            public void NameChanged(string newName, int userId)
            {
                if (CheckStreamName())
                    throw new StreamNotFoundException();
    
                UserNameChanged userNameChanged = new()
                {
                    NewName = newName,
                    UserId = userId
                };
                events.Add(userNameChanged);
            }
            //Kullanıcı email onaylandığında
            public void EmailApproved(int userId)
            {
                if (CheckStreamName())
                    throw new StreamNotFoundException();
    
                UserEmailApproved userEmailApproved = new()
                {
                    UserId = userId
                };
                events.Add(userEmailApproved);
            }
        }
    

    Yukarıdaki concrete aggregate sınıfını incelerseniz eğer bir kullanıcı verisi üzerinde oluşabilecek tüm olayların fonksiyonlarını barındırmakta ve bu fonksiyonlar içerisinde ilgili olaya dair instance üretilerek base class’ta ki ‘events’ isimli koleksiyona atılmaktadır. Artık bu concrete aggregate sınıfı sayesinde üretilmiş olan event’leri Event Store’a gönderecek ve ihtiyaç doğrultusunda tekrar elde etmemizi sağlayacak repository sınıfının tasarlanması gerekmektedir.

  • Adım 6 AggregateRepository Sınıfının Tasarlanması
    Aşağıda oluşturulmuş olan aggregate repository sınıfı ile yukarıdaki adımda oluşturulan concrete aggregate sınıfındaki event’leri Event Store’da depolayabilmekte yahut depolanmış olan event’leri tekrar elde edebilmekteyiz.

        public class AggregateRepository
        {
            readonly IEventStoreConnection _connection;
            public AggregateRepository(IEventStoreConnection connection)
                => _connection = connection;
            //Oluşturulan event'leri Event Store'a kaydeder.
            public async Task SaveAsync<T>(T aggregate) where T : Aggregate, new()
            {
                List<EventData> events = aggregate.GetEvents
                    .Select(@event => new EventData(
                        eventId: Guid.NewGuid(),
                        type: @event.GetType().Name,//type : Event Store'a kaydedilecek olan event'in türünü sınıf olarak bildiriyoruz.
                        isJson: true,
                        data: Encoding.UTF8.GetBytes(JsonSerializer.Serialize( //Event json türüne serialize ediliyor.
                            value: @event,
                            inputType: @event.GetType(),
                            new() { WriteIndented = true }
                            )),
                        metadata: Encoding.UTF8.GetBytes(@event.GetType().FullName))/*metadata : Metadata olarak binary formatta ilgili
     event'in FullName bilgisini yani namespace ile birlikte full class adını tutmaktayız. Bu bilgiyi, event'leri 'Read Data Store'da
     güncelleme yaparken hangi event'in gerçekleştiğini ayırt edebilmek için kullanacağız. */
                    )
                    .ToList();
    
                if (!events.Any())
                    return;
    
                //Event'ler gönderiliyor...
                await _connection.AppendToStreamAsync(aggregate.StreamName, ExpectedVersion.Any, events);
                aggregate.GetEvents.Clear();
            }
            //Event Store'dan belirtilen Stream'de ki event'leri getirir.
            public async Task<dynamic> GetEvents(string streamName)
            {
                long nextSliceStart = 0L;
                List<ResolvedEvent> events = new();
                StreamEventsSlice readEvents = null;
                do
                {
                    readEvents = await _connection.ReadStreamEventsForwardAsync(
                        stream: streamName,
                        start: nextSliceStart,
                        count: 4096,
                        resolveLinkTos: true
                        );
    
                    if (readEvents.Events.Length > 0)
                        events.AddRange(readEvents.Events);
    
                    nextSliceStart = readEvents.NextEventNumber;
                } while (!readEvents.IsEndOfStream);
                return events.Select(@event => new
                {
                    @event.Event.EventNumber,
                    @event.Event.EventType,
                    @event.Event.Created,
                    @event.Event.EventId,
                    @event.Event.EventStreamId,
                    Data = JsonSerializer.Deserialize(
                        json: Encoding.UTF8.GetString(@event.Event.Data),
                        returnType: Type.GetType(Encoding.UTF8.GetString(@event.Event.Metadata)) /*returnType : Yukarıda 'SaveAsync'
     metodunda metadata olarak tutulan event class'ının tam adı, burada ilgili event'in özgün sınıfına dönüştürülürken kullanılmaktadır.*/
                        ),
                    Metadata = Encoding.UTF8.GetString(@event.Event.Metadata)
                });
            }
            //Event'lerin uygulandığı User datasının son halini getirir.
            public async Task<User> GetData(string streamName)
            {
                dynamic events = await GetEvents(streamName);
                User user = new();
                foreach (var @event in events)
                {
                    switch (@event.Data)
                    {
                        case UserCreated o:
                            user.Id = o.UserId;
                            user.Name = o.Name;
                            user.UserName = o.UserName;
                            user.Email = o.Email;
                            user.EmailApprove = o.EmailApprove;
                            break;
                        case UserNameChanged o:
                            user.Name = o.NewName;
                            break;
                        case UserEmailApproved o:
                            user.EmailApprove = true;
                            break;
                    }
                }
                return user;
            }
        }
    

    Yukarıdaki kod bloğuna göz atarsanız eğer satır aralıklarında açıklama yapılmış olsa da iki hasbihalde bulunmadan geçmek çokta makale adabına uygun olmayacağı için özet mahiyetinde bir kaç cümle karalamakta fayda vardır. Görüldüğü üzere bu aggregate repository sınıfı önceden de bahsedildiği gibi concrete aggregate nesnesindeki event’leri Event Store’a göndermekte ve ihtiyaç doğrultusunda tekrar okuma işlemini gerçekleştirmektedir. Burada en önemli noktalardan birisi ‘SaveAsync’ fonksiyonundaki ‘metadata’ bilgisidir. Bu, eklenecek olan event’in class türünü tam olarak tutmakta ve ihtiyaç doğrultusunda bu türe ilgili event’i dönüştürebilmek için tekrar kullanılmaktadır. Bu dönüşümü de ‘GetEvents’ metodu içerisindeki 60. satırda açıkça görebilmekteyiz.

    ‘GetData’ metodu ise ilgili event’lerin mevcudiyetteki dataya olan etkisini instance olarak gösterebilmek için ekstradan oluşturulmuş örnek bir fonksiyondur. Haliyle bu fonksiyondan yola çıkarak ‘Read Data Store’ tarafında hangi yolla bir güncelleme işlemi yapılacağının ipucunu vermiş bulunmaktayız. Bu konuya dair sonraki makalemizde daha geniş kapsamlı bir incelemede bulunuyor olacağımızdan dolayı şimdilik teknik açıdan bu kadar bir numune yeterlidir kanaatindeyim.

  • Adım 7 Startup.cs Dosyasında Gerekli Konfigürasyonlar
    Yapılan bunca çalışmadan sonra Asp.NET Core API uygulamasında concrete aggregate ve aggregate repository sınıflarının servis olarak eklenebilmesi ve temel Event Store konfigürasyonlarının yapılabilmesi için ‘Startup.cs’ dosyasında aşağıdaki çalışma yapılmalıdır.

        public class Startup
        {
            public Startup(IConfiguration configuration)
            {
                Configuration = configuration;
            }
    
            public IConfiguration Configuration { get; }
    
            public void ConfigureServices(IServiceCollection services)
            {
                IEventStoreConnection connection = EventStoreConnection.Create(
                    connectionString: "ConnectTo=tcp://localhost:1115;DefaultUserCredentials=admin:changeit;UseSslConnection=true;TargetHost=eventstore.org;ValidateServer=false",
                    connectionName: "API_Application",
                    builder: ConnectionSettings.Create().KeepReconnecting()
                );
    
                connection.ConnectAsync().GetAwaiter().GetResult();
                services.AddSingleton(connection);
                services.AddSingleton<AggregateRepository>();
                services.AddSingleton<UserAggregate>();
    
                services.AddControllers();
                .
                .
                .
            }
    
            public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
            {
                .
                .
                .
            }
        }
    
  • Adım 8 Controller’ın Geliştirilmesi
    Son olarak önceden oluşturduğumuz ‘UsersController’ sınıfının içeriğini aşağıdaki gibi tasarlayınız.

        [Route("api/[controller]")]
        [ApiController]
        public class UsersController : ControllerBase
        {
            readonly AggregateRepository _aggregateRepository;
            readonly UserAggregate _userAggregate;
            public UsersController(AggregateRepository aggregateRepository, UserAggregate userAggregate)
            {
                _aggregateRepository = aggregateRepository;
                _userAggregate = userAggregate;
            }
            [HttpPost("[action]")]
            public async Task<IActionResult> Create(CreateUserVM model)
            {
                int userId = 100;
                _userAggregate.SetStreamName($"user-{userId}");
                _userAggregate.Created(new()
                {
                    Id = userId,
                    Email = model.Email,
                    Name = model.Name,
                    UserName = model.UserName
                });
    
                await _aggregateRepository.SaveAsync(_userAggregate);
                return StatusCode((int)HttpStatusCode.Created);
            }
            [HttpPut("[action]")]
            public async Task<IActionResult> UpdateName(UpdateNameUserVM model)
            {
                _userAggregate.NameChanged(model.Name, model.Id);
    
                await _aggregateRepository.SaveAsync(_userAggregate);
                return StatusCode((int)HttpStatusCode.OK);
    
            }
    
            [HttpPut("[action]")]
            public async Task<IActionResult> EmailApprove(EmailApproveUserVM model)
            {
                _userAggregate.EmailApproved(model.Id);
    
                await _aggregateRepository.SaveAsync(_userAggregate);
                return StatusCode((int)HttpStatusCode.OK);
    
            }
            [HttpGet("[action]/{streamName}")]
            public async Task<IActionResult> GetEvents(string streamName)
            {
                dynamic events = await _aggregateRepository.GetEvents($"user-{streamName}");
                return Ok(events);
            }
    
            [HttpGet("[action]/{streamName}")]
            public async Task<IActionResult> GetData(string streamName)
            {
                User user = await _aggregateRepository.GetData($"user-{streamName}");
                return Ok(user);
            }
        }
    

    Yukarıda kullanılan viewmodel sınıflarının içeriği aşağıdaki gibi olacaktır:

        public class CreateUserVM
        {
            public string Name { get; set; }
            public string UserName { get; set; }
            public string Email { get; set; }
        }
    
        public class UpdateNameUserVM
        {
            public int Id { get; set; }
            public string Name { get; set; }
        }
    
        public class EmailApproveUserVM
        {
            public int Id { get; set; }
        }
    

Evet, bu satırlara sağ salim ulaşabildiyseniz eğer hedeflenen event sourcing yapılanmasını başarıyla inşa etmişsiniz demektir 🙂 Şimdi sıra geliştirilen bu uygulamayı test etmeye gelmiştir.

Uygulamayı Çalıştırma ve Test Etme

Uygulamayı derleyip çalıştırdığımızda ‘Startup.cs’ dosyasında yaptığımız konfigürasyonlara istinaden Event Store’a bağlantı gerçekleştirilmekte, dolayısıyla bu bağlantı ilgili tool’un ‘Connections’ kısmında aşağıdaki gibi görülebilmektedir.
Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme)

Velhasıl, şimdi geliştirdiğimiz API’da ki ‘UsersController’a Postman üzerinden sırasıyla istekler gönderelim ve gerçekleştirilen event’lere istinaden mevcudiyetteki datanın üzerindeki gelişimi ‘GetData’ action’ına istek göndererek gözlemleyelim. Nihayetindeki bu gözlem esasında ‘Read Data Store’a izafen veriye dair süreçteki değişimi görmemiz için faydalı olacaktır.

  • Kullanıcı ekleme/oluşturma
    Endpoint : https://localhost:5001/api/users/create
    Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme)GetData :
    Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme)Mevcut data kullanıcı eklendiği için ilk ve default değerlerle oluşturulmuştur.
  • Kullanıcı adı değiştirme
    Endpoint : https://localhost:5001/api/users/updatename
    Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme)GetData :
    Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme)Yapılan isim güncellemesinden sonra bu değişiklik görüldüğü üzere ilgili data’ya yansımıştır. Hatta bir kere daha değişiklik yaparsak bunun ‘Read Data Store’da ki dataya yansıdığını daha net göreceksiniz.
    Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme)
    Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme)Görüldüğü üzere bu seferde ‘Kürşad’dan ‘Elif’e bir değişim yaptık ve ilgili data’da bu değişimi görmekteyiz.
  • Kullanıcı email onaylama
    Endpoint : https://localhost:5001/api/users/emailapprove
    Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme)GetData :
    Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme)Görüldüğü üzere ilgili kullanıcının email’i onaylanmıştır.

Şimdi son olarak 100 id değerine sahip kullanıcı data’sının tüm süreçte yaşadığı değişimleri görebilmek için elimizdeki tüm event’leri hem API hem de Event Store üzerinden okuyalım.

API Event Store
Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme) Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme)

Yukarıdaki görsellere göz atarak event sourcing’in esasında bir veri için ne kadar anlamlı pattern olduğunu görebilmektesiniz. Nihayetinde elimizdeki kullanıcı verisinin şu anki durumunu, zahiri olarak gördüğümüzden daha çok süreçte geçirdiği evrelerden ibaret bir bütün olduğunu görebilmekteyiz.

Evet… Böylece bu içeriğimizde de event sourcing inşasını güzel bir yapılanmayla ele almış bulunuyoruz. Bir sonraki içeriğimizde muhtemelen, messaging yapılanmasından istifade ederek ‘Read Data Store’ olarak seçtiğimiz herhangi bir NoSQL veritabanına(MongoDB yahut Couchbase) event’ları işlemeyi ele alıyor olacağız.

İlgilenenlerin faydalanması dileğiyle…
Sonraki yazılarımda görüşmek üzere…
İyi çalışmalar…

Not : Örnek uygulamayı indirebilmek için buraya tıklayınız.

Bunlar da hoşunuza gidebilir...

3 Cevaplar

  1. Enes Aysan dedi ki:

    Faydalı ve anlaşılır bir yazı olmuş, elinize sağlık.

  1. 13 Haziran 2021

    […] Asp.NET Core + Event Store İle Event Sourcing Uygulaması(Örneklendirme) başlıklı yazımda bir veri üzerinde yapılan tüm işlemleri Event Store tool’u ile […]

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir