.NET Core Ortamında ‘Event Store’ İle Event Sourcing Yapılanması
Merhaba,
Bir önceki Basit Bir Event Sourcing Uygulaması Geliştirelim başlıklı makalemizde Event Sourcing pattern’ının nasıl gerçekleştirilebildiğine pratiksel olarak bir örnek vermeye çalışmıştık. İlgili makalede ele alınan örnekten de görüldüğü üzere uygulamada gerçekleştirilen ‘event’lerin in-memory’de depolanması yerine daha yönetilebilir bir vaziyette olması için veritabanında yahut bu amaçla geliştirilmiş özel tool’lar da tutulması daha elverişli olacak kanaatindeyiz. Dolayısıyla bu içeriğimizde Event Sourcing pattern’ını uygulayabilmek için geliştirilmiş olan ‘Event Store’ tool’unu sizlerle tanıştıracak ve birçok nimetinden haberdar olup, istifade ediyor olacağız.
Event Store Nedir?
Event Store, Event Sourcing için geliştirilmiş olan event’leri depolamamızı sağlayan ve bununla birlikte ‘Read/Query’ veritabanlarına kayıt atabilmek için dahili bir ‘Message Broker’ hizmeti sağlayan Open Source bir veritabanıdır. HTTP ve TCP protokollerini desteklemektedir.
Hangi Protokolü Tercih Edilmelidir?
TCP, HTTP’ye nazaran daha hızlı olacağından dolayı tercih edilebilir. Özellikle event eklendiği zaman subscribe olan consumer’larla zamansal açıdan 2|3 kat daha hızlı iletişim kurulabilmektedir. Bu fark yüksek performanslı ortamlarda daha da kendini gösterecektir.
Event Store Kurulumu Nasıl Yapılır?
Event Store’u ister PC’nize isterseniz de Docker ortamına kurabilirsiniz. Bizler bu içeriğimizde Docker’ı tercih edeceğiz.
Docker
Event Store’u Docker ortamında ayağa kaldırabilmek için aşağıdaki komuttu çalıştırabilirsiniz.
docker run --name eventstore-node -it -p 2113:2113 -p 1113:1113 eventstore/eventstore:release-5.0.9
Bu şekilde Dockerize edilen Event Store sunucusuna ileride bağlantı sağlanmaya çalışıldığı zaman aşağıdaki hata kaçınılmaz olacaktır.
Hatanın metinsel hali:
EventStore.ClientAPI.Exceptions.RetriesLimitReachedException: Item Operation ReadStreamEventsForwardOperation
Bu hatanın kaynağı ilgili sunucunun dahili bir SSL sertifikası olmamasından kaynaklanmaktadır. İlgili hatayı aşabilmek için öncelikle gerçek sertifika kullanılmalıdır. Bunun için Event Store’u kullanacağınız uygulamaya aşağıdaki içeriğe sahip Dockerfile dosyasını oluşturunuz.
FROM eventstore/eventstore:release-5.0.8 RUN apt-get update -y \ && apt-get install -y openssl \ && openssl req -x509 -sha256 -nodes -days 3650 -subj "/CN=eventstore.org" -newkey rsa:2048 -keyout eventstore.pem -out eventstore.csr \ && openssl pkcs12 -export -inkey eventstore.pem -in eventstore.csr -out eventstore.p12 -passout pass: \ && openssl pkcs12 -export -inkey eventstore.pem -in eventstore.csr -out eventstore.pfx -passout pass: \ && mkdir -p /usr/local/share/ca-certificates \ && cp eventstore.csr /usr/local/share/ca-certificates/eventstore.crt \ && update-ca-certificates \ && apt-get autoremove \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
Ardından powershell’den ilgili dizine odaklanarak aşağıdai image’i oluşturunuz.
docker build -t eventstore/eventstore:with-cert-local --no-cache .
Son olarak aşağıdaki gibi bir container’ı ayağa kaldırınız.
docker run --name EventStore -it -p 1113:1113 -p 1115:1115 -p 2113:2113 -e EVENTSTORE_CERTIFICATE_FILE=eventstore.p12 -e EVENTSTORE_EXT_SECURE_TCP_PORT=1115 eventstore/eventstore:with-cert-local
Bu şekilde ayağa kaldırılan container’da artık hata almayacağız. Yazımızın devamında bağlantı kurmaya çalışacağımız container bu olacaktır. Ayrıca bu ayarlardan sonra artık TCP client’ı olarak aşağıdaki connection string’i kullanmamız gerekecektir.
"ConnectTo=tcp://localhost:1115;DefaultUserCredentials=admin:changeit;UseSslConnection=true;
Event Store Admin UI Tanıtımı
Event Store container’ı ayağa kaldırdıktan sonra http://localhost:2113/ adresi üzerinden Admin UI’a erişebilirsiniz. Kullanıcı adı admin
, şifre changeit
‘tir.
Şimdi sırasıyla Admin UI’da ki sayfaları incelemeye başlayalım.
- Dashboard
Dashboard sayfası, olabildiğince Event Store tool’unun istatiksel verilerini ve aktif bağlantıları özet olarak sunan bir sayfadır.
- Stream Browser
Event Store aracında, event kümelerine ‘stream’ adı verilmektedir. Dolayısıyla ilgili tool’a depolanan event’lerin akışını bu pencereden takip edebilmekteyiz.
- Event Stream
Bir stream’de ki eventlerin görüntüsünü veren sayfadır. Dikkat edilirse bir event’te :
Event # Event No Name Event adı Type Event türü/tipi Created Date Event oluşturulma tarihi bilgileri mevcuttur. Bir başka deyişle bir event bu bilgilerden meydana gelmektedir.
.NET Core İle Event Store Kullanımı
- Gereksinimler
.NET Core ortamında Event Store’u kullanabilmek için EventStore.Client kütüphanesinin yüklenmesi yeterlidir. - Event Store’a Bağlanma
Event Store’a bağlanabilmek için aşağıdaki komutu kullanabilirsiniz :IEventStoreConnection connection = EventStoreConnection.Create( connectionString: "ConnectTo=tcp://localhost:1115;DefaultUserCredentials=admin:changeit;UseSslConnection=true;TargetHost=eventstore.org;ValidateServer=false", connectionName: "Console_Application", builder: ConnectionSettings.Create().KeepReconnecting() ); await connection.ConnectAsync();
- Event Store Bağlantı Olayları
Event Store’a bağlanırken aşağıdaki bağlantı olaylarından istifade edebilirsiniz :connection.Connected += static (sender, clientConnectionEventArgs) => { Console.WriteLine("Bağlantı sağlanmıştır."); Console.WriteLine($"Connection Name : {clientConnectionEventArgs.Connection.ConnectionName}"); Console.WriteLine($"Address Family : {clientConnectionEventArgs.RemoteEndPoint.AddressFamily}"); }; connection.Disconnected += (sender, clientConnectionEventArgs) => { Console.WriteLine("Bağlantı kesilmiştir."); Console.WriteLine($"Connection Name : {clientConnectionEventArgs.Connection.ConnectionName}"); Console.WriteLine($"Address Family : {clientConnectionEventArgs.RemoteEndPoint.AddressFamily}"); }; connection.Reconnecting += (sender, clientReconnectingEventArgs) => { Console.WriteLine("Bağlantı yeniden deneniyor."); Console.WriteLine($"Connection Name : {clientReconnectingEventArgs.Connection.ConnectionName}"); }; connection.ErrorOccurred += (sender, clientErrorEventArgs) => { Console.WriteLine("Hata oluştu!."); Console.WriteLine($"Connection Name : {clientErrorEventArgs.Connection.ConnectionName}"); Console.WriteLine($"Exception Message : {clientErrorEventArgs.Exception.Message}"); };
- Event Ekleme
Event Store’a aşağıdaki gibi event ekleyebilirsiniz.EventData eventPayload = new( eventId: Guid.NewGuid(), type: "EventAdded", isJson: true, data: Encoding.UTF8.GetBytes("Event eklenmiştir..."), metadata: Encoding.UTF8.GetBytes("Event metadata...") ); var result = await connection.AppendToStreamAsync("Example-Event", ExpectedVersion.Any, eventPayload);
Dikkat edilirse eğer Event Store’a atılacak olan event’ler ‘EventData’ türündendir.
- Event Okuma
Event Store’dan tüm event’leri okuyabilmek için ise aşağıdaki gibi çalışılması yeterli olacaktır.StreamEventsSlice readEvents = await connection.ReadStreamEventsForwardAsync("Example-Event", 0, 100, true); foreach (var @event in readEvents.Events) { Console.WriteLine($"Data\t\t: {Encoding.UTF8.GetString(@event.Event.Data)}"); Console.WriteLine($"Created \t: {@event.Event.Created}"); Console.WriteLine($"Created Epoch \t: {@event.Event.CreatedEpoch}"); Console.WriteLine($"EventId \t: {@event.Event.EventId}"); Console.WriteLine($"Event Number \t: {@event.Event.EventNumber}"); Console.WriteLine($"Event StreamId \t: {@event.Event.EventStreamId}"); Console.WriteLine($"Event Type \t: {@event.Event.EventType}"); Console.WriteLine($"Metadata \t: {Encoding.UTF8.GetString(@event.Event.Metadata)}"); Console.WriteLine("|||||||||||||||||||||||||||"); }
Burada dikkat edilirse eğer birden fazla event’i
ReadStreamEventsForwardAsync
fonskyionu aracılığıyla stream olarak okuyabilmekteyiz. İlgili fonksiyon aracılığıyla opsiyonel aralıkta event okuyabildiğimiz için 0. event’ten başlayarak 100 adet event’i ileri dönük okumasını ve stream olarak döndürmesini bildirmiş bulunuyoruz.Benzer olarak
ReadStreamEventsBackwardAsync
fonksiyonu ilede geriye dönük stream okuması gerçekleştirilebilmektedir.StreamEventsSlice readEvents = await connection.ReadStreamEventsBackwardAsync("Example-Event", 99, 100, true);
Tüm bunların dışında
ReadAllEventsForwardAsync
veReadAllEventsBackwardAsync
fonksiyonları sayesinde tüm stream’lerdeki olayları okuyabilmekteyiz. Bu fonksiyonlar stream olarak değil de normal bir şekilde okuma gerçekleştirir.List<ResolvedEvent> allEvents = new(); Position nextSliceStart = Position.Start; //ReadAllEventsBackwardAsync -> Position.End AllEventsSlice readEvents = null; do { readEvents = await connection.ReadAllEventsForwardAsync(nextSliceStart, 200, true); nextSliceStart = readEvents.NextPosition; allEvents.AddRange(readEvents.Events); } while (!readEvents.IsEndOfStream); //allEvents Foreach
Ayriyetten tekil olarak event number bazlı event okumasıda gerçekleştirebiliriz. Bunun içinde aşağıdaki gibi
ReadEventAsync
fonksiyonunun kullanılması yeterli olacaktır.EventReadResult readResult = await connection.ReadEventAsync("Example-Event", 3, true); Console.WriteLine($"Data\t\t: {Encoding.UTF8.GetString(readResult.Event.Value.Event.Data)}"); Console.WriteLine($"Created \t: {readResult.Event.Value.Event.Created}"); Console.WriteLine($"Created Epoch \t: {readResult.Event.Value.Event.CreatedEpoch}"); Console.WriteLine($"EventId \t: {readResult.Event.Value.Event.EventId}"); Console.WriteLine($"Event Number \t: {readResult.Event.Value.Event.EventNumber}"); Console.WriteLine($"Event StreamId \t: {readResult.Event.Value.Event.EventStreamId}"); Console.WriteLine($"Event Type \t: {readResult.Event.Value.Event.EventType}"); Console.WriteLine($"Metadata \t: {Encoding.UTF8.GetString(readResult.Event.Value.Event.Metadata)}");
- Event Store Transaction Kullanımı
Event Store’da transaction başlatılarak işlemler gerçekleştirilebilir.class Program { static async Task Main(string[] args) { IEventStoreConnection connection = EventStoreConnection.Create( connectionString: "ConnectTo=tcp://localhost:1115;DefaultUserCredentials=admin:changeit;UseSslConnection=true;TargetHost=eventstore.org;ValidateServer=false", connectionName: "Console_Application", builder: ConnectionSettings.Create().KeepReconnecting() ); await connection.ConnectAsync(); using EventStoreTransaction transaction = await connection.StartTransactionAsync("Example-Event-3", ExpectedVersion.Any); await transaction.WriteAsync(CreateSampleEvent(1)); await transaction.WriteAsync(CreateSampleEvent(2)); await connection.AppendToStreamAsync("Example-Event-3", ExpectedVersion.Any, CreateSampleEvent(3)); await transaction.WriteAsync(CreateSampleEvent(4)); await transaction.WriteAsync(CreateSampleEvent(5)); await transaction.CommitAsync(); } static EventData CreateSampleEvent(int i) { return new EventData( eventId: Guid.NewGuid(), type: "event-type", isJson: true, data: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { a = i })), metadata: Encoding.UTF8.GetBytes("{}") ); ; } }
Yukarıdaki kod bloğunu incelerseniz eğer; 12. ve 18. satır aralığında transaction işlemleri gerçekleştirilmiştir.
Transaction, 12. satırdaki
StartTransactionAsync
fonksiyonuyla başlatılmış ve ardından ilgili transaction üzerindenWriteAsync
metoduyla event’ler eklenmiştir. Transaction üzerinden eklenen tüm eventler 18. satırdaCommitAsync
fonksiyonu aracılığıyla fiziksel olarak işlenmiştir.Lakin burada 15. satıra dikkat ederseniz eğer transaction’ın dışında bir işlem yapılmaktadır. Buradan anlaşılan transaction diğer işlemleri kilitlememekte ve rahatlıkla farklı transaction’lara izin vermektedir. Burada yapılan işlem esasında iki farklı transaction ile gerçekleştirilmektedir. Velhasıl bu örnek kodu çalıştırdığımızda event’lerin kayıt sırası yandaki görseldeki gibi olacaktır. (Sıra : 3 – 1 – 2 – 4 – 5)
Neden event’lerin sırası bu şekilde oldu? diye sorarsanız eğer cevabımız,
AppendToStreamAsync
fonksiyonunun transaction’ı direkt commit etmesidir. Zaten transaction yönetimindeki nihai gayemiz commit işleminin irademizle gerçekleştirilip daha tutarlı bir çalışma yapmak değil midir? 🙂Ayrıca bir transaction’ı
transaction.Rollback()
metoduyla rollback edebilir ve tüm işlemleri geri alabilirsiniz. - Stream Silme
Oluşturulan bir stream’i silmek istersenizDeleteStreamAsync
metodunu kullanabilirsiniz.DeleteResult deleteResult = await connection.DeleteStreamAsync("Example-Event-3", ExpectedVersion.Any);
- Subscriptions/Message Broker
Şahsi kanaatimce Event Store’un en güzel özelliklerinden birisi dahili message broker yapılanması barındırıyor olmasıdır. Bir stream’eAppendToStreamAsync
ile eklenen herhangi bir event’ten anında haberdar olabilmek için ilgili stream’e subscribe olabilmekte ve gerçekleşen işlemlerle ilgili bildirimler alınabilmektedir. Esasında Messaging ismini verdiğimiz bu olay sayesinde eklenen event’e dair bilgiler edinmekte ve bu bilgiler eşliğinde genellikle reading veritabanına gerekli güncellemeler yansıtılmaktadır.Bu özelliğin birbirinden güzel üç niteliği mevcuttur. Bunlardan ilki, bildirimleri mevcut event’lerin dışında gelecek olanlar içinde sağlayabiliyor olmasıdır. Yani demek istenilen o ki, 100 event varsa eğer bizler 101.’den itibaren bildirim almayı tercih edebilmekteyiz. İşte buna Volatile Subscriptions(Değişken Abonelikler) denmektedir.
İkincisi, belirli bir limitin üstündeki event’lerin de bildirimini talep edebiliriz. Buna da örnek vermek gerekirse, 100 event’lik bir durumda başlangıç noktasını 50 olarak belirleyebilir ve 51.’den sonraki tüm event’lerin bildirimini alabilirsiniz. Buna da Catch-up Subscriptions(Yakalama Abonelikleri) denmektedir. Bu her iki abonelik türüde bağlantı koptuğu taktirde son bulmaktadır.
Sonuncu olarak, EventStoreDB’nin 3.2.0 versiyonundan sonra gelen ve bağlantı koptuğu durumlarda diğer türlerin aksine mevcut subscribe’ı koparmayan Persistent Subscriptions(Kalıcı Abonelikler) mevcuttur. Bu abonelik türünde, abonelik durumu sunucu tarafında kaydedilmekte ve aynı stream üzerindeki birden fazla customer’a en az bir kez message iletiminin garantisini vermektedir.
Şimdi gelin bu üç niteliği pratikte inceleyelim.
EventStoreSubscription subscription = await connection.SubscribeToStreamAsync( stream: "Example-Event-7", resolveLinkTos: true, eventAppeared: async (eventStoreSubscription, resolvedEvent) => Console.WriteLine($"Event eklenmiştir.{Encoding.UTF8.GetString(resolvedEvent.Event.Data)}"), subscriptionDropped: (eventStoreSubscription, subscriptionDropReason, exception) => Console.WriteLine($"Bağlantı kopmuştur. {subscriptionDropReason}") ); int i = 0; while (i++ < 10) { EventData eventPayload = new( eventId: Guid.NewGuid(), type: "EventAdded" + i, isJson: true, data: Encoding.UTF8.GetBytes("Event eklenmiştir..." + i), metadata: Encoding.UTF8.GetBytes("Event metadata..." + i) ); var result = await connection.AppendToStreamAsync("Example-Event-7", ExpectedVersion.Any, eventPayload); }
Yukarıdaki kod bloğunu incelerseniz eğer connection nesnesinin
SubscribeToStreamAsync
fonksiyonu aracılığıyla ‘stream’ parametresine verilen değerdeki stream’e subscribe olunmaktadır. Süreçte herhangi bir event eklendiği vakit ‘eventAppeared’ delegasyonuna bağlanan fonksiyon tetiklenmekte ve gerekli aksiyon alınabilmektedir.Aşağıdaki gibi
SubscribeToStreamFrom
fonksiyonu ile de konfigürasyona detaylı müdahale ederek, subscribe işlemini gerçekleştirebilirsiniz.CatchUpSubscriptionSettings settings = new( maxLiveQueueSize: 10000, readBatchSize: 5, verboseLogging: false, resolveLinkTos: true, subscriptionName: "mySubscription" ); connection.SubscribeToStreamFrom( stream: "Example-Event-8", settings: settings, lastCheckpoint: StreamPosition.Start, eventAppeared: async (eventStoreSubscription, resolvedEvent) => Console.WriteLine($"Event eklenmiştir.{Encoding.UTF8.GetString(resolvedEvent.Event.Data)}"), liveProcessingStarted: eventStoreCatchUpSubscription => Console.WriteLine("İşlenmeye devam ediyor..."), subscriptionDropped: (eventStoreSubscription, subscriptionDropReason, exception) => Console.WriteLine($"Bağlantı kopmuştur. {subscriptionDropReason}") );
Peki, Asp.NET Core’da yahut farklı bir client’ta bu stream’lere nasıl subscribe olacağız? sorunuzu duyar gibiyim…
Event eklendiğinde abone olunan stream’i dinleyebilmek için bir Background Service oluşturabilir yahut console, api vs. gibi harici bir servis tarafından ilgili stream’e subscribe yapabilirsiniz. Aşağıdaki örnek kod bloğunda Background Service ile bir incele yapılmıştır. Yazının devamında ise console üzerinden harici bir servis üzerinden de teoride örneklendirme gerçekleştirilecektir. Asp.NET Core mimarisinde background service oluşturabilmek için
Microsoft.Extensions.Hosting
namespace’in de kiBackgroundService
sınıfını aşağıdaki gibi kullanmamız gerekmektedir. Ayrıca ilgili projeye malum kütüphanenin(EventStore.Client) yüklenmesi gerekmektedir.public class EventStoreBackgroundService : BackgroundService { readonly IEventStoreConnection _connection; public EventStoreBackgroundService( IEventStoreConnection connection) { _connection = connection; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await _connection.SubscribeToStreamAsync( stream: "Example-Event-10", resolveLinkTos: true, eventAppeared: async (eventStoreSubscription, resolvedEvent) => Console.WriteLine($"Event eklenmiştir.{Encoding.UTF8.GetString(resolvedEvent.Event.Data)}"), subscriptionDropped: (eventStoreSubscription, subscriptionDropReason, exception) => Console.WriteLine($"Bağlantı kopmuştur. {subscriptionDropReason}") ); } }
Ardından ‘Startup.cs’ dosyasında aşağıdaki konfigürasyonların yapılması gerekmektedir.
public class Startup { public void ConfigureServices(IServiceCollection services) { IEventStoreConnection connection = EventStoreConnection.Create( connectionString: "ConnectTo=tcp://localhost:1115;DefaultUserCredentials=admin:changeit;UseSslConnection=true;TargetHost=eventstore.org;ValidateServer=false", connectionName: "Console_Application", builder: ConnectionSettings.Create().KeepReconnecting() ); connection.ConnectAsync().GetAwaiter().GetResult(); services.AddSingleton(connection); services.AddHostedService<EventStoreBackgroundService>(); services.AddControllers(); } . . . }
Asp.NET Core için yapılması gereken konfigürasyon işte bu kadar…
Console üzerinden örneklendirmeye gelirsek eğer yine aynı isimde stream’i dinleyen bir bağlantı oluşturulması ve ona yukarıda incelenen fonksiyonlardan biriyle subscribe olunması yeterli olacaktır.
Nihai olarak,
Event Sourcing pattern’ı uygulayabilmek için veriler üzerindeki değişiklikleri ifade edecek olan event’leri depolayabilmek için kullanacağımız Event Store tool’unu detaylıca incelemiş bulunmaktayız. Haliyle bu bilgiler temelinde sonraki makalemizde Event Store’u kullanarak Event Sourcing’e dair pratiksel bir örnek gerçekleştiriyor olacağız. O halde sonraki makalemizde görüşmek üzere diyelim…
İlgilenenlerin faydalanması dileğiyle…
İyi çalışmalar…
Not : Örnek kod dosyalarını indirmek için buraya tıklayınız.
1 Cevap
[…] .NET Core Ortamında ‘Event Store’ İle Event Sourcing Yapılanması başlıklı makalemde Event Sourcing yapılanması için Event Store ile yapılması gereken temel […]