Derinlemesine yazılım eğitimleri için kanalımı takip edebilirsiniz...

Outbox Design Pattern Nedir? Nasıl Uygulanır? (Inbox Pattern Eşliğinde)

Merhaba,

Günlük yaşamda gurbet ellerde bulunan bir yakınınıza hatıra dolu bir mektup yazıp göndermek istediğiniz olmuştur. Mesela böyle bir istek karşılığında ne yapardınız hiç düşündünüz mü? Muhtemelen, mektubu kaleme aldıktan sonra zarflayıp, gönderilecek adres bilgisi eşliğinde postaya bırakırdınız ve ardından gerisi posta memurlarının iş planlamasına kalacağından dolayı unutarak günlük rutininize dönerdiniz. Öyle değil mi? Gönderdiğiniz mektubun, varış noktasına ne şekilde hangi yöntemlerle ne kadar zamanda varacağı sizi pek ilgilendirmezdi ve hatta mektubun bir gün alıcıya vardığında belki size geri dönüp dönmeyeceğini bile önemsemeksizin yaşamanıza devam ederdiniz. Gayet doğal… Lakin göndermek istediğiniz mektubu bırakacağınız posta teşkilatının çalışmadığı bir döneme denk geldiğini düşünürseniz eğer vaz mı geçerdiniz? Hayır dimi! Bu gün çalışmayan posta teşkilatı elbet yarın belki yarından da yakın çalışır diye mektubu elinizde tutar ve ne zaman teşkilat faal olur o zaman gider mektubu postaya emanet ederdiniz.

Buradaki varsayımdan şöyle bir sonuç çıkarabiliriz; yaptığınız iş her ne kadar sizden çıksa da bütünsel olarak nihayetlenmediği sürece işin sorumluluğundan kurtuldunuz anlamına gelmemektedir. Siz mektubu yazmış olabilirsiniz ama bazen posta teşkilatına emanet etmek için de sabretmeniz gerekebilir. (Yılmaz komutanımın değimiyle iş her ne kadar sizden çıksa da o işin öldüğünden emin olmanız ve gerekirse sizin öldürmeniz gerekmektedir!) Nasıl ki, -ben mektubu yazdım ama posta teşkilatı çalışmadığı için gönderemedim- demek doğru bir mazeret değilse, dağıtık mimarilerdeki servisler arası iletişim süreçlerinde kaybolan mesajlar açısından -‘A’ servis göndermiş ‘B’ servis ayakta olmadığı için veriyi alamamış- şeklinde bir durumda mazeret değildir. Ne de olsa, ‘A’ servisin ilgili mesajı üretmesi kadar ‘B’ servise eriştirmesi de onun sorumluluğundadır.

Şimdi gelin buradaki beşeri örnek üzerinden sadede gelelim ve sunulan davranışı yazılımsal açıdan metafor ederek modelleyip, konumuz olan Outbox Design Pattern’a girizgâh eyleyelim.

Outbox Design Pattern Nedir?

Outbox pattern, Guaranteed Delivery Pattern(Garantili Teslimat Deseni)’a dayanır!

Asenkron iletişim kuran sistemlerde, servisler arası(ya da servis ile message broker arası) iletişim süreçlerinde gönderilen mesajların yerini bulamadan bağlantının kopması, hata alınması ve süreçte beklenmeyen fiziksel yahut yazılımsal kesintilerin meydana gelmesi (vs.) durumlarına istinaden olası veri kayıplarına ve verisel açıdan servisler arası tutarsızlıklara mahal vermemek için mesajın kaybolmasını engelleyerek, yaşanan olumsuz durumun/durumların geçmesi üzerine bağlantının tekrardan kurulması durumunda hedef noktaya mesajın tekrardan gönderilmesi için devreye giren basit ama kritik noktalarda muazzam etkili bir pattern’dır.

Outbox Design Pattern’ın Teorik Davranışı Nasıldır?

Outbox Design Pattern(Outbox Tasarım Deseni)

Şema Kaynağı : https://github.com/obsidiandynamics/goharvest

Outbox pattern’ı teoride anlatabilmek için yukarıdaki şemadan istifade edebiliriz. Bir servise gelen istek üzerine yapılan işlemler neticesinde farklı bir servise yahut bir message broker’a yeni bir message/event publish edileceğini düşünelim. Bu işlem ihtiyaç anında gerçekleştirilirse eğer gönderilen mesajın hedef servise yahut message broker’a vardığından emin olmalı, eğer ki bir problem söz konusu olursa bu servisin de yaptığı tüm işlemler bütünsel tutarlılık için geri alınmalıdır(Bknz : Compensable Transaction). Ama bu ne kadar doğru olacaktır? Nihayetinde distributed bir mimaride bazen servislerin geçici süreliğine problem yaşaması oldukça normal bir durumdur. İlgili servisin ayağa kalkana kadar işlenecek verilerin/mesajların/event’lerin tutulması ve servis ayağa kalktığı zaman bunların işlenmesi en ideal olanıdır! Bizler yapılan işlemleri geri almak yerine bu davranışı tercih etmek istiyor olabiliriz. Aksi taktirde yapılanların geri alınmaması ciddi manada veri tutarsızlığına sebebiyet verecektir. Yukarıdaki şemaya göz atarsanız eğer ilgili servisin gelen istek neticesinde görevini tamamladığı ve artık işleyişin devamı için farklı bir servise olayı fırlatması gerekmektedir. Bunu yaparken hedef servis türlü sebeplerden dolayı ayakta değilse eğer bu serviste yapılan işlemler bütünsel açıdan tutarsızlık arz edecektir. İşte böyle bir durumda dış servis yahut message broker ile iletişim işlemini direkt yapmak yerine ilgili message/event’i giden kutusuna(Outbox) karşılık gelecek olan bir tabloya işleyip bekletmekte fayda vardır. Ardından düzenli aralıklarla bu tabloyu tarayan ve orada kayıtlı olan message/event’leri hedef noktaya publish edecek olan ‘Outbox Publisher Application’ aracılığıyla ilgili message/event’ler alıcı servislere iletilmeye çalışılır. Böylece mimari tasarımı açısından, servisler arası iletişim süreci alıcı servisin yahut message broker’ın ayakta olup olmaması durumuna göre olası veri kayıpları riskinden arındırılarak daha ehemmiyetli olması sağlanacak ve mesajın en az bir kere hedefe varacağı garanti altına alınarak neticede servisler arası loosely coupled(gevşek bağlılık) konusunda bir adım daha atılmış olunacaktır.

Sevisler arası iletişim süreçlerinde kaybolan mesajlar birçok geliştiricinin uykusunun kaçırmasının temel kaynağıdır! O yüzden bu konu ciddi kritik arz etmektedir!

Nihayetinde bakarsanız eğer asenkron süreçte servisler arası iletişim verilerinin Outbox’da tutularak; olası aksaklıklardan, sistem kesintilerinden veya bağlantı kopukluklarından dolayı oluşabilecek iletişim ve veri transferi problemlerinden etkilenmemesi sağlanarak bütünlüğün korunması amaçlanmaktadır.

İçeriğimize girizgâh eylediğimiz mektup misaliyle bu durumu karşılaştırırsak eğer; siz ve yakınınız iki servise, mektup ise aranızdaki iletişim sürecindeki mesaj yahut event’e ve posta ise message broker’a karşılık gelmektedir. Outbox’ın ise sizin yakınınıza göndermek istediğiniz mesajın(mektubun) posta memurlarına verilip verilmediğinin garantisini sağlayan bir kontrol mekanizması olduğunu düşünebilirsiniz.

Outbox pattern sayesinde servisler arasındaki iletişim süreçlerinde mesajların nihai olarak başarılı bir şekilde hedef servis(ler)e iletildiğinden emin olabilir ve veri tutarsızlığına karşı bütünsel açıdan önleyici bir davranış sergilediğinizin bilinciyle gönül rahatlığıyla çalışmalarınıza devam edebilirsiniz.

Outbox pattern distributed mimarilerde atomik bir davranış sergilenmesini sağlar.

Outbox pattern’ı daha iyi anlayabilmek için Microservice Mimarilerde Saga Pattern İle Transaction Yönetimi başlıklı makalemde teorik olarak değindiğim distributed transaction kavramını ve bu kavramın uygulanabilir olması için gerçekleştirilen belli başlı teknikleri bilmeniz gerekmektedir. İşte tüm bunlar bir yana Outbox pattern sayesinde distributed transaction tekniklerini uygulamaya gerek kalmaksızın mesajları diğer servislere erişim güvenilirliğini sağlayarak gönderebilir ve böylece uygulama bütünlüğü açısından pratik olacak şekilde daha emin adımlarla veri tutarlılığını sağlayabilirsiniz.

Microservice yaklaşımlarında, kritik mesajlara sahip olan her bir servis kendi Outbox tablosuna sahip olmalı ve mesajları önce bu tabloya atıp ardından bir publisher aracılığıyla hedef servise ya da message broker’a göndermelidir.

Outbox Pattern Hangi Durumlarda Kullanılır?

Outbox pattern yukarıdaki satırlarda vurguladığımız gibi bir servis tarafından mesajlar/event’ler yayınlandığı esnada doğrudan hedef servise yahut message broker’a göndermek yerine bu mesajları bir veritabanı tablosuna kaydederek ardından bir publisher aracılığıyla belirli zaman aralıklarında veya CDC(Change Data Capture) aracılığıyla hedef servislere iletmeyi amaçlayan bir pattern’dır.

Buradan anlaşılıyor ki, servislere ya da message broker’a gönderilecek mesaja dair en az bir kere iletme garantisini sağlamak istiyorsanız eğer Outbox pattern kullanılabilir.

Özellikle bir serviste aynı anda iki işlemin yapıldığı durum söz konusuysa Outbox pattern biçilmiş kaftan olacaktır. Buna herhangi bir e-ticaret uygulamasından küçük bir kesiti misal olarak sunarsak eğer; ‘Order Service’ üzerinde, kullanıcıdan gelen sipariş üzerine hem ‘Orders’ tablosuna bir kayıt atıldığını hem de siparişin oluşturulduğuna dair ‘OrderCreatedEvent’ isimli bir event’in message broker’a yazıldığını tahayyül edelim… Bir servis yaptığı işlemler sürecinde bunun gibi iki farklı yapıya(veritabanı-message broker) dair kalıcı işlem gerçekleştiriyorsa eğer bu durumu Dual Write olarak nitelendiriyoruz.

Belirli bir olay meydana geldiğinde hem veritabanına hem de bir message broker’a aynı anda veri yazma işlemine Dual Write denir.

İşte, Dual Write durumunun söz konusu olduğu anlarda veritabanına kaydın atılıp message broker’a gönderilememesi yahut bunun tam tersi gibi aksilikler meydana gelirse eğer burada hem o olayın yaşandığı servis hem de message broker’ı dinleyen diğer servis(ler) için ciddi verisel farklar yaratabileceğinden dolayı uygulama bütünlüğü açısından tutarsızlıklar söz konusu olabilir. Eee Dual Write’tan kaynaklanabilecek bu tutarsızlıklar sistem sağlıklıyken ortaya çıkmayabilir yahut fark edilmesi oldukça güç olabilir ancak uzun vadede mutlaka kendilerini gösterecektirler. İşte bu olası tutarsızlıkları sıfır ihtimale indirgeyebilmek için Outbox pattern hayati bir rol üstlenebilir. Outbox pattern, veritabanında yapılacak değişikliğin ardından yürütülecek olan işlemleri, veritabanı içerisinde bir görev olarak saklama temeline dayanır. Böylece bir transaction içerisinde hem veritabanında yapmak istediğimiz işlemi gerçekleştiriyor hem de bu işlemden sonra yürüteceğimiz aksiyon için işaret bırakmış oluyoruz.

Unutmayın!
Dual Write; distributed, event based vs. gibi uygulamalarda sıklıkla sorunlara neden olan bir durumdur. Ve bu sorunları tespit edebilmek, maliyetleri ölçebilmek ve düzeltebilmek oldukça zor ve zahmetli olacaktır!

Outbox pattern Dual Write durumu varsa düşünülmelidir!

Outbox Pattern vs Event Sourcing

Yukarıdaki satırlar sizlere Event Sourcing‘i anımsatmış olabilir. Nihayetinde yapılacak işlemlere dair(publish/subscribe) bir tabloda tutulacak kayıtlar söz konusudur. Evet, Outbox pattern deyince akla event sourcing’in gelmesi normaldir lakin Outbox pattern event sourcing değildir. Belki alternatifidir diyebiliriz ama değildir! Outbox pattern ile event sourcing arasındaki en radikal farkı ele almamız gerekirse eğer; event sourcing’de bir işlenen veriye dair tüm olaylar kayıt altına alınırken, Outbox’da ise işlenecek olan mesajların/event’lerin işlendikten sonra silinmesi ya da işlendiğine dair güncellenmesi söz konusudur. Event sourcing’de kesinlikle herhangi bir olayın silinmesi söz konusu değildir bilakis bir veriye dair yapılan bir işlemin geri alınması bile yine ayrı bir olay olarak kabul edilmekte ve böylece meydana gelen olayların bütünü ilgili verinin nihai halini ortaya sermekte ve süreçte de ne gibi değişimler yaşadığını bizlere söylemektedir.

Outbox pattern ile ilgili en önemli husus yalnızca geçici bir mesaj/olay deposu olmasıdır. Mesajlar hedef servise ya da message broker’a ulaştırıldığı taktirde ya silinmeli ya da işlendiğine dair güncellenmelidir.

Outbox Pattern’da Mesajlar Nasıl Publish Edilmelidir?

Outbox pattern ile Outbox tablosuna kaydedilen mesajları/event’leri publish etmenin makul olan iki farklı yolu vardır diyebiliriz;

  • Pulling Publisher
    Outbox tablosunda bulunan mesajları belirli bir zaman aralığında sorgulayıp publish eden uygulamanın geliştirilmesi yöntemidir. Bu uygulama basit bir console application ya da worker service olabilir. İçeriğimiz sürecinde bizlerin pulling publisher’ı örneklendirirken worker service üzerinden bir geliştirme yaptığımızı göreceksiniz.

    Pulling publisher uygulamasının Outbox tablosunu sorgulama zaman aralığı ne kadar küçükse veritabanına maliyeti doğru orantılı bir şekilde artacaktır. Bu durum ise ilgili yöntemin tek ve öz dezavantajıdır diyebiliriz.

    Ayrıca bu yöntemde dikkat edilmesi gereken bir diğer husus ise; geliştirilen publisher uygulamasını scale etmek istediğiniz taktirde Outbox’da ki herhangi bir mesajı farklı instance’lar da tekrar işleme olasılığının olmasıdır. Bu durumu engelleyebilmek için ise kullanılan veritabanına göre bazı önlemlerin alınması gerekmektedir. Bu önlemlerin bir kısmı aşağıdaki veritabanlarıyla sağlanabilir;

    Son olarak bu yöntemin uygulama basitliği açısından avantajlı olduğunu söyleyebiliriz.

  • Transaction Log Tailing
    Outbox table’ın bulunduğu veritabanının transaction loglarını okuyarak değişiklikleri yakalayıp mesajları publish etme yöntemidir. Konuya dair Debezium, Eventuate Tram vs. incelenebilir.

    Genel kültür olması açısından bu yöntemin event sourcing için geliştirilmiş olan Event Store aracında da kullanıldığını bilmenizde fayda vardır!

Outbox Pattern’ın Kullanılacağı Örnek Senaryo Bileşenleri Neler Olabilir?

Outbox pattern’ın hangi durumlarda kullanılacağını yukarıdaki satırlarda Outbox Pattern Hangi Durumlarda Kullanılır? başlığı altında değerlendirmeye çalışmıştık. Burada ise bahsedilen durumları yaşayabileceğimiz senaryolara birkaç örnek vermek faydalı olacaktır kanaatindeyim;

  • Bir sipariş verildikten sonra kullanıcıya e-posta göndermek,
  • Kullanıcı kaydı hakkında message broker’a bir event göndermek,
  • Bir sipariş verildikten sonra stoktaki ürün sayısını güncellemek

vs. gibi tüm senaryolarda Outbox pattern ile yapılacak işlemin devamını kayda alabilir ve veri kaybını önceleyecek şekilde bir publisher eşliğinde diğer işi üstlenen servis yahut message broker ile iletişim kurabilirsiniz.

Outbox Pattern’ın Idempotents Sorunsalı!

Outbox pattern’de, publish edilen mesajların ya da event’lerin işlendiğine dair işaretlenmesine yahut tablodan işlenenlerin silinmesine rağmen düşük ihtimalde olsa bir handikap söz konusu olabilir. Bu handikap, mesajlar işlendikten/publish edildikten sonra sıra Outbox table üzerinde işaretlenmeye yahut ilgili mesajın silinmesi işlemine gelindiğinde ilgili veritabanı ile oluşabilecek bir olası iletişim hatası nedeniyle bu işlemin gerçekleştirilememesidir. Evet, mesaj yayınlanmıştır ama bu yayınlama neticede Outbox table’a yansıtılamamıştır. Böyle bir durumda Outbox table’ın bulunduğu veritabanı ile tekrar iletişim kurulduğunda işlenmiş olan mesaj tekrardan işleme tabi tutulacak ve uygulama açısından bütünsel tutarlılığı sarsabilecek bir durum meydana gelebilecektir. İşte şimdi Peki bu duruma karşı nasıl bir davranış sergilememiz gerekmektedir? sorusu akıllara gelmektedir…

Bu duruma istinaden mesajları/event’leri tüketen consumer’ların Idempotents olarak tasarlanması bu olası hatayı ortadan kaldıracaktır.

Idempotent; matematik ve bilgisayar bilimlerinde kullanılan ve ilk işlem haricinde sonraki tüm işlemler için etkisi söz konusu olmayan ve sonucu değiştirmeden uygulanabilen bir özelliği ifade eden terminolojik bir terimdir. Matematiksel olarak f(f(x)) = f(x) şeklinde formülüze edilebilir.

Idempotent’e matematiksel örnek olarak mutlak değeri verebiliriz. Nihayetinde mutlak değerin amacı bir sayının sıfıra olan uzaklığını ölçmek olduğu için |n| ile |-n| işlemlerini ne kadar değerlendirirsek değerlendirelim, bu iki ölçümde sonuç açısından herhangi bir fark hiçbir zaman söz konusu olmayacaktır.

Idempotent kavramına biz yazılımcıların anlayacağı dilden daha farklı örnekler verebilmemiz açısından rest mimarisi üzerinden de bir metafor gerçekleştirebiliriz. Rest mimarisinde; get, put ve delete action’ları idempotent iken, post metodu idempotent değildir! Çünkü get, put ve delete her ne kadar tekrarlı tetiklenirse tetiklensinler yaptıkları ilk işlemin dışında ekstradan sonuç üretemezler. Yani örnek olarak, ‘3’ id değerine sahip bir kullanıcıyı silmek(delete) istiyorsanız eğer bunu ilk istekte gerçekleştirir, sonraki isteklerde ise ‘3’ id değerine karşılık bir kullanıcı olmayacağından dolayı ekstradan bir sonuç üretecek işlem gerçekleştiremezsiniz. Veya update products set name='abc' sorgusunu ilk çalıştırdığınızda tüm ürünlerin adı ‘abc’ olacak sonraki tetiklemelerde ise bu sorgu bir değişikliğe yol açmayacaktır. İşte bundan dolayı bu üçü idempotent özellik sergilemektedir. Amma velâkin, adı ‘Hilmi’ olan bir kullanıcıyı eklemek(post) isterseniz her istek gönderdiğinizde adı ‘Hilmi’ olan bir kullanıcı eklenecektir. Bu da sonucu etkileyecektir. Çünkü ilk istekte adı ‘Hilmi’ olan kullanıcı sayısı n iken, beşinci istekte n + 5 olacaktır. Haliyle post action’ı netice itibariyle veritabanı üzerinde işlemden önceki son hale nazaran değişiklik/fark meydana getireceği için idempotent özellik göstermeyen bir fonksiyondur.

Onca örneğin yanında bir de günlük hayattan bir misalde bulunalım isterim. Diyelim ki X bankasının bankamatiğine gittiniz ve bakiyenizi görmek istediniz. İşte bu işlem bir idempotent işlemdir. Nihayetinde bankamatik üzerinden hesabınızdaki bakiyeyi görme işlemini ne kadar yaparsanız yapın sonuç değişmeyecektir. Lakin aynı hesap üzerinden para çekme ya da para yatırma işlemi yapmanız idempotent olmayan bir işlemdir. Çünkü yapılan her bir işlem neticesinde bakiye değeri değişecektir.

Yukarıda verilen örneklerden yola çıkarak konumuz olan distributed yapılanmalarda idempotent davranışını genellersek eğer, bir mesaj birden çok kez yayınlansa dahi consumer’lar açısından aynı etkiye sahip bir işlevsellikte olması gerekmektedir. Bu durum da bir mesajın herhangi bir soruna yol açmaksızın güvenlice yeniden gönderilebileceğini ifade edebiliriz.

Distributed mimarilerde, servisler arası yayınlanacak mesajların yahut event’lerin idempotent olarak tasarlanması doğru ve tutarlı davranış için gerekli olandır.

Peki, yayınlanacak mesajlarda idempotency’yi başarmanın yolu nedir?
Bunun için publish edilecek olan her bir mesaj yahut event için özel bir anahtar, token veya id gibi değer üretilmeli ve böylece consumer’lar tarafından bu mesaj daha önceden tüketilmiş mi? tüketilmemiş mi? ayırt edilebilir bir nitelik kazandırılmalıdır. Bunu şöyle örneklendirebiliriz;Outbox Design Pattern(Outbox Tasarım Deseni)Yukarıdaki görsel şemayı incelerseniz eğer Outbox table’a kayıt atılan verilere bir ‘IdempotentToken’ değeri eşlik etmektedir. Bu şekilde publisher ile message broker’a gönderilen mesajlar consumer tarafında işlenirken öncelikle ‘Inbox Table’ adını verdiğimiz bir tabloya işlenmeli ardından process edilmelidirler. Inbox table’da eğer ki aynı ‘IdempotentToken’ değerine sahip bir kayıt varsa işlem gerçekleştirilmemeli, eğer yoksa process edilmelidir. Burada kullanılan yöntem özünde Inbox Pattern‘ı barındırmaktadır.

Inbox Pattern
Outbox pattern’ın akrabası olan bir pattern’dır diyebiliriz 🙂 Outbox pattern’da işlemler(algoritmalar) gerçekleştirilip, yayınlanacak mesajların zemini oluşturulduktan sonra Outbox table’a kaydedilip ardından bir publisher aracılığıyla yayınlama işlemi gerçekleştirilirken, Inbox pattern’da ise işlenecek(algoritması gerçekleştirilecek) mesajlar önce Inbox table’a eklenip ardından process edilirler. Misal olarak; gelen siparişi veritabanına kaydedip ardından stok bilgisinin güncellenmesi için Outbox table’a bu siparişle ilgili bir kaydın oluşturulup publisher ile message broker’a gönderilmesi Outbox pattern iken, stok bilgilerini güncellemekten sorumlu consumer’ın ilgili message broker’dan bu mesajları alıp önce Inbox table’a işleyip ardından process’e tabi tutması Inbox Pattern’dır. Tabi burada işlenen mesajların Inbox table’da nihai olarak tamamlandığına dair güncellenmesi gerekmektedir.

Bazı consumer’lar davranışları itibarıyla varsayılan olarak idempotent’tirler.

Outbox Table’ın Şeması Nasıl Olmalıdır?

Outbox table’ın yapısını oluşturabilmek için aşağıdaki gibi bir tasarımda bulunmak yeterlidir kanaatindeyim.

CREATE TABLE [dbo].[OutboxTable]
(
	[OccuredOn] [datetime2](7) NOT NULL,
	[ProcessedDate] [datetime2](7) NULL,
	[Type] [nvarchar](max) NOT NULL,
	[Payload] [nvarchar](max) NOT NULL,
	[IdempotentToken] [uniqueidentifier] NOT NULL
)

Outbox Design Pattern(Outbox Tasarım Deseni)Bu tablodaki kolon yapısını izah etmemiz gerekirse eğer;

  • OccuredOn, mesajın oluşturulduğu tarih bilgisini tutar.
  • ProcessedDate, mesajın işlendiği tarih bilgisini tutar. Mesaj yayınlanmadığı sürece null değerine sahip olacaktır. Yayınlandıktan sonra o anın tarih değeri atanacaktır. İsterseniz bu bilgiyi işlemek yerine mesajı silebilirsiniz!
  • Type, publish edilecek mesajın ya da event’in türünü tutar.
  • Payload, mesajın json formatındaki verisini tutar.
  • IdempotentToken, consumer’lar tarafından yinelenebilecek mesajların tespit edilebilmesi için mesaja idempotent özelliği ekleyen ve bir yandan da mesajın benzersiz kimliğe sahip olmasını sağlayan veriyi tutar.

Outbox Pattern’ı Pratikte Örneklendirelim

Şimdi bu içeriğimizin en kritik noktasına gelmiş bulunuyoruz. Outbox pattern’ı pratikte ele alabilmek için örneklendiriyor olacağız. Tabi bunun için Onion Architecture altyapısında bir çalışma sergileyecek ve mesajların publish edilmesi için makale sürecinde bahsettiğimiz Pulling Publisher ve Transaction Log Tailing yöntemlerini teker teker uygulayıp, örneklendireceğiz. Ayrıca örneklendirme sürecinde yine önceki satırlarda bahsettiğimiz Inbox Pattern‘ı da uygulayarak olması gereken bir altyapıyla somut bir örnek ortaya koymuş olacağız.

Örneklendirme sürecinde senaryo olarak genel e-ticaret süreçlerindeki basit bir sipariş alma ve stok güncelleme işlemlerini ayrı servisler şeklinde Outbox pattern eşliğinde ele alıyor olacağız.

O halde buyrun başlayalım…

  • Adım 1 (Order.API projesinin altyapısının oluşturulması)
    Outbox Design Pattern(Outbox Tasarım Deseni)Her şeyden önce ilk olarak sipariş işlemlerinden sorumlu olan Order.API projesini Onion Architecture altyapısında oluşturarak başlayalım. Projenin fiziksel yapısını Nedir Bu Onion Architecture? Tam Teferruatlı İnceleyelim başlıklı makalemizde ele aldığımız gibi inşa ederek temellendirelim.
  • Adım 2 (Entity’lerin oluşturulması)
    ‘Domain’ katmanında, uygulama sürecinde örnek amaçlı siparişleri temsil edecek olan ‘Order’ ve bu siparişler oluşturulduktan sonra event’inin publish edilmesi için Outbox table’a karşılık gelecek olan ‘OrderOutbox’ entity’lerini oluşturalım.

        public class Order
        {
            public Guid Id { get; set; }
            public int Quantity { get; set; }
            public string Description { get; set; }
            //public ICollection<Product> Products { get; set; } ... Example
        }
    
        public class OrderOutbox
        {
            public OrderOutbox()
            {
            }
            public DateTime OccuredOn { get; set; }
            public DateTime? ProcessedDate { get; set; }
            public string @Type { get; set; }
            public string Payload { get; set; }
            public Guid IdempotentToken { get; set; }
        }
    
  • Adım 3 (Context sınıfının oluşturulması, veritabanının migrate edilmesi)
    ‘Persistence’ katmanında aşağıdaki context nesnesini oluşturalım.

        public class OutboxExampleDbContext : DbContext
        {
            public OutboxExampleDbContext(DbContextOptions options) : base(options)
            {
            }
            public DbSet<Order> Orders { get; set; }
            public DbSet<OrderOutbox> OrderOutboxes { get; set; }
            protected override void OnModelCreating(ModelBuilder modelBuilder)
            {
                modelBuilder.ApplyConfiguration(new OrderConfiguration());
                modelBuilder.ApplyConfiguration(new OrderOutboxConfiguration());
            }
        }
    

    Burada 10 ve 11. satırlarda kullanılan konfigürasyon sınıflarının içeriği aşağıdaki gibidir.

        public class OrderConfiguration : IEntityTypeConfiguration<Order>
        {
            public void Configure(EntityTypeBuilder<Order> builder)
            {
                builder.HasKey(p => p.Id);
            }
        }
    
        public class OrderOutboxConfiguration : IEntityTypeConfiguration<OrderOutbox>
        {
            public void Configure(EntityTypeBuilder<OrderOutbox> builder)
            {
                builder.HasKey(p => p.IdempotentToken);
            }
        }
    

    Bu geliştirmelerden sonra veritabanının migrate edilmesi için aşağıdaki talimatları verelim.
    add-migration mig_1
    update-database

  • Adım 4 (Persistence katmanı ServiceRegistrations sınıfının oluşturulması)
    ‘Persistence’ katmanında geliştirilen context nesnesini IoC Container’a ekleyebilmek için aşağıdaki gibi ‘ServiceRegistrations’ sınıfını oluşturalım.

        public static class ServiceRegistrations
        {
            public static void AddPersistenceServices(this IServiceCollection services)
            {
                services.AddDbContext<OutboxExampleDbContext>(options => options.UseSqlServer("Server=localhost, 1433;Database=OutboxExampleDB;User ID=SA;Password=1q2w3e4r+!;"));
            }
        }
    

    Ardından ‘Order.API’ projesinin ‘Program.cs’ dosyasında ilgili sınıf içerisinde oluşturulan AddPersistenceServices isimli extension metodu aşağıdaki gibi çağıralım.

    using OutboxExample.Persistence;
    
    var builder = WebApplication.CreateBuilder(args);
    
    builder.Services.AddPersistenceServices();
    builder.Services.AddControllers();
    builder.Services.AddEndpointsApiExplorer();
    .
    .
    .
    
  • Adım 5 (MediatR kütüphanesinin kurulması ve temel sipariş oluşturma işlemlerinin CQRS altyapısının atılması)
    Onion Architecture’da CQRS + MediatR Pattern + AutoMapper + Fluent Validation başlıklı makalede olduğu gibi ‘Application’ katmanında MediatR kütüphanesini kullanarak sipariş oluşturma işlemlerinin CQRS altyapı inşasını gerçekleştirelim.

        public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommandRequest, CreateOrderCommandResponse>
        {
            public Task<CreateOrderCommandResponse> Handle(CreateOrderCommandRequest request, CancellationToken cancellationToken)
            {
                throw new NotImplementedException();
            }
        }
    
        public class CreateOrderCommandRequest : IRequest<CreateOrderCommandResponse>
        {
            public int Quantity { get; set; }
            public string Description { get; set; }
        }
    
        public class CreateOrderCommandResponse
        {
        }
    

    Burada oluşturalım ‘CreateOrderCommandHandler’ sınıfının içeriğini sonraki adımlarımızda dolduruyor olacağız.

  • Adım 6 (Application katmanı ServiceRegistrations sınıfının oluşturulması)
    ‘Application’ katmanında da geliştirilen servisleri IoC Container’a ekleyebilmek için aşağıdaki ‘ServiceRegistrations’ sınıfını oluşturup ardından yine ‘Order.API’ projesi içerisindeki ‘Program.cs’de gösterildiği şekilde çağıralım.

        public static class ServiceRegistrations
        {
            public static void AddApplicationServices(this IServiceCollection services)
            {
                services.AddMediatR(typeof(ServiceRegistrations));
            }
        }
    
    using OutboxExample.Persistence;
    using OutboxExample.Application;
    
    var builder = WebApplication.CreateBuilder(args);
    
    builder.Services.AddPersistenceServices();
    builder.Services.AddApplicationServices();
    builder.Services.AddControllers();
    builder.Services.AddEndpointsApiExplorer();
    .
    .
    .
    
  • Adım 7 (‘OrdersController’ın oluşturulması)
    Siparişleri alabilmek için ‘OrdersController’ı aşağıdaki gibi oluşturalım.

        [Route("api/[controller]")]
        [ApiController]
        public class OrdersController : ControllerBase
        {
            IMediator _mediator;
            public OrdersController(IMediator mediator)
            {
                _mediator = mediator;
            }
    
            [HttpPost]
            public async Task<IActionResult> Post(CreateOrderCommandRequest createOrderCommandRequest)
            {
                return Ok(await _mediator.Send(createOrderCommandRequest));
            }
        }
    
  • Adım 8 (Repository’lerin oluşturulması)
    ‘Application’ katmanı içerisindeki handler sınıflarında ‘Persistence’ katmanındaki context’e erişip işlemler gerçekleştirebilmek için repository arayüzlerine ihtiyacımız bulunmaktadır. Bunun için ‘Application’ katmanında aşağıdaki gibi repository arayüzlerini oluşturup, ‘Persistence’ katmanında ise concrete nesnelerini oluşturmamız gerekmektedir.

    ‘Application’;

        public interface IRepository<T>
        {
            IQueryable<T> GetAll();
            IQueryable<T> GetWhere(Expression<Func<T, bool>> method);
            Task AddAsync(T model);
            Task SaveChangesAsync();
        }
    
        public interface IOrderRepository : IRepository<Order>
        {
        }
    
        public interface IOrderOutboxRepository : IRepository<OrderOutbox>
        {
        }
    

    ‘Persistence’;

        public class Repository<T> : IRepository<T> where T : class
        {
            readonly OutboxExampleDbContext _context;
            public Repository(OutboxExampleDbContext context)
            {
                this._context = context;
            }
    
            public DbSet<T> Table { get => _context.Set<T>(); }
    
            public async Task AddAsync(T model)
                  => await Table.AddAsync(model);
    
            public IQueryable<T> GetAll()
                => Table;
    
            public IQueryable<T> GetWhere(Expression<Func<T, bool>> method)
                => Table.Where(method);
    
            public async Task SaveChangesAsync()
                => await _context.SaveChangesAsync();
        }
    
        public class OrderRepository : Repository<Order>, IOrderRepository
        {
            public OrderRepository(OutboxExampleDbContext context) : base(context)
            {
            }
        }
    
        public class OrderOutboxRepository : Repository<OrderOutbox>, IOrderOutboxRepository
        {
            public OrderOutboxRepository(OutboxExampleDbContext context) : base(context)
            {
            }
        }
    
  • Adım 9 (Repository sınıflarını IoC Container’a eklemek)
    Oluşturulan repository sınıflarını IoC Container’a ekleyebilmek için ‘Persistence’ katmanında bulunan ‘ServiceRegistrations’ sınıfında aşağıdaki çalışmaların yapılması gerekmektedir.

        public static class ServiceRegistrations
        {
            public static void AddPersistenceServices(this IServiceCollection services)
            {
                services.AddDbContext<OutboxExampleDbContext>(options => options.UseSqlServer("Server=localhost, 1433;Database=OutboxExampleDB;User ID=SA;Password=1q2w3e4r+!;"));
    
                services.AddScoped<IOrderRepository, OrderRepository>();
                services.AddScoped<IOrderOutboxRepository, OrderOutboxRepository>();
            }
        }
    
  • Adım 10 (CreateOrderCommandHandler işlevinin geliştirilmesi ve Outbox Table’a mesajın/event’in işlenmesi)
    API aracılığıyla gelen siparişleri ‘Orders’ tablosuna kaydedelim ve ardından stok bilgilerinin güncellenmesi için message broker’a yayınlanacak olan mesajı/event’i Outbox pattern güvencesiyle yayınlayabilmek için Outbox table’a ilgili mesajı/event’i kaydedelim.

        public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommandRequest, CreateOrderCommandResponse>
        {
            readonly IOrderRepository _orderRepository;
            readonly IOrderOutboxRepository _orderOutboxRepository;
    
            public CreateOrderCommandHandler(IOrderRepository orderRepository, IOrderOutboxRepository orderOutboxRepository)
            {
                _orderRepository = orderRepository;
                _orderOutboxRepository = orderOutboxRepository;
            }
    
            public async Task<CreateOrderCommandResponse> Handle(CreateOrderCommandRequest request, CancellationToken cancellationToken)
            {
                Order order = new() { Description = request.Description };
                await _orderRepository.AddAsync(order);
    
                OrderOutbox orderOutbox = new()
                {
                    OccuredOn = DateTime.UtcNow,
                    ProcessedDate = null,
                    Payload = JsonSerializer.Serialize(order),
                    Type = nameof(OrderCreatedEvent),
                    IdempotentToken = Guid.NewGuid()
                };
                await _orderOutboxRepository.AddAsync(orderOutbox);
                await _orderOutboxRepository.SaveChangesAsync();
                return new();
            }
        }
    

    22. satırda işlenen mesajın/event’in type’ı için ‘Shared’ class library’sinde oluşturulmuş olup aşağıdaki içeriğe sahip olan ‘OrderCreatedEvent’ini belirttiğimize dikkatinizi çekerim.

        public class OrderCreatedEvent : IEvent
        {
            public Guid OrderId { get; set; }
            public int Quantity { get; set; }
            public string Description { get; set; }
            public Guid IdempotentToken { get; set; }
        }
    
  • Adım 11 (Outbox table’dan mesajların okunması)
    Uygulamanın bu noktasında Pulling Publisher veya Transaction Log Tailing olmak üzere iki farklı yöntemden birini tercih edebileceğimizi önceki satırlarda konuşmuştuk. Şimdi gelin bu her iki yöntemi de ayrı ayrı uygulayarak teknik değerlendirmede bulunalım.

    • Pulling Publisher
      Bu yöntemde belirli bir zaman aralığında Outbox table’da yayınlanmamış mesajları çekip publish etme sorumluluğunu üstlenecek bir console app. ya da worker service altyapısında uygulama geliştirmemiz gerekmektedir. Bizler bu içeriğimizde worker service üzerinden bir geliştirmede bulunuyor olacağız.

      O halde ‘OutboxExample.ProcessOutboxJob.Service’ isminde bir worker service oluşturalım ve saniye cinsinden aralıklı tetikleme işlemini gerçekleştirebilmek için Quartz.NET kütüphanesini ilgili uygulamaya yükleyelim. Ayrıca Outbox table’daki mesajları/event’leri message broker’a yayınlayabilmek için MassTransit kütüphanesinden de faydalanıyor olacağız.

      Install-Package Quartz
      Install-Package Quartz.Extensions.DependencyInjection
      Install-Package Quartz.Extensions.Hosting
      Install-Package MassTransit.RabbitMQ

      İlgili kütüphaneleri uygulamaya yükledikten sonra worker service’in ‘Program.cs’ dosyasını açalım ve aşağıdaki konfigürasyonel işlemleri gerçekleştirelim.

      using OutboxExample.ProcessOutboxJob.Service;
      using OutboxExample.ProcessOutboxJob.Service.Jobs;
      using Quartz;
      using MassTransit;
      
      IHost host = Host.CreateDefaultBuilder(args)
          .ConfigureServices((hostContext, services) =>
          {
              services.AddQuartz(configurator =>
              {
                  configurator.UseMicrosoftDependencyInjectionJobFactory();
      
                  JobKey jobKey = new("OrderOutboxPublishJob");
      
                  //Bir job tanımlanıp 'OrderOutboxPublishJob' isimli sınıfa bağlanıyor.
                  configurator.AddJob<OrderOutboxPublishJob>(options => options.WithIdentity(jobKey));
      
                  TriggerKey triggerKey = new("OrderOutboxPublishTrigger");
                  //Job 5 saniyelik aralıklarla çalışacak şekilde ayarlanıyor.
                  configurator.AddTrigger(options => options.ForJob(jobKey)
                              .WithIdentity(triggerKey)
                              .StartAt(DateTime.UtcNow)//Trigger'ın başlangıç tarihini belirliyoruz.
                              .WithSimpleSchedule//Trigger'ın başladıktan sonraki programını belirtiyoruz.
                              (
                                  builder => builder.WithIntervalInSeconds(5) //Trigger'ın kaç saniyede bir tetikleneceğini belirliyoruz.
                                                    .RepeatForever() //Trigger'ın sonsuza denk çalışacağını belirtiyoruz.
                              ));
              });
      
              services.AddQuartzHostedService(options => options.WaitForJobsToComplete = true);
      
              services.AddMassTransit(configurator =>
              {
                  configurator.UsingRabbitMq((context, _configurator) =>
                  {
                      _configurator.Host(hostContext.Configuration["RabbitMQ:Host"], "/", hostConfigurator =>
                      {
                          hostConfigurator.Username(hostContext.Configuration["RabbitMQ:Username"]);
                          hostConfigurator.Password(hostContext.Configuration["RabbitMQ:Password"]);
                      });
                  });
              });
          })
          .Build();
      
      await host.RunAsync();
      

      Burada görüldüğü üzere 9 ile 30. satır aralığında Quartz.NET ile ilgili konfigürasyonlar gerçekleştirilirken, 32 ile 42. satır aralığında ise MassTransit ile ilgili konfigürasyonlar gerçekleştirilmektedir.

      Şimdi ise 16. satırda job’ın bağlandığı ‘OrderOutboxPublishJob’ sınıfını inşa edelim. Bu sınıf işlevsel olarak Outbox table’ı sorgulayacak ve publish edilmeyen mesajları elde edip 5 saniyede bir message broker’a yollayacaktır. Buradaki sorgulama sürecinde bizlere yardımcı olabilmesi için micro ORM aracı olan Dapper‘dan istifade edebiliriz.

      Install-Package Dapper

          public class OrderOutboxPublishJob : IJob
          {
              readonly IPublishEndpoint _publishEndpoint;
      
              public OrderOutboxPublishJob(IPublishEndpoint publishEndpoint)
              {
                  _publishEndpoint = publishEndpoint;
              }
      
              public async Task Execute(IJobExecutionContext context)
              {
                  if (OrderSingletonDatabase.DataReaderState)
                  {
                      OrderSingletonDatabase.DataReaderBusy();
                      List<OrderOutbox> orderOutboxes = (await OrderSingletonDatabase.QueryAsync<OrderOutbox>($@"SELECT * FROM OrderOutboxes
                                                                                                                 WHERE ProcessedDate IS NULL
                                                                                                                 ORDER By OccuredOn DESC"))
                                                                              .ToList();
                      foreach (OrderOutbox orderOutbox in orderOutboxes)
                      {
                          if (orderOutbox.Type == nameof(OrderCreatedEvent))
                          {
                              Order? order = JsonSerializer.Deserialize<Order>(orderOutbox.Payload);
                              if (order != null)
                              {
                                  OrderCreatedEvent orderCreatedEvent = new()
                                  {
                                      Description = order.Description,
                                      OrderId = order.Id,
                                      Quantity = order.Quantity,
                                      IdempotentToken = orderOutbox.IdempotentToken
                                  };
      
                                  await _publishEndpoint.Publish(orderCreatedEvent);
                              }
                          }
      
                          /*
                              Outbox table'da ki publish edilmemiş mesajları/event'leri message
                              broker'a publish ettik. Şimdi yayınlanmış bu mesajların yayınlandığına
                              dair veritabanında Outbox table'da gerekli güncellemelerin/işaretlemelerin
                              yapılması gerekmektedir(ya da yayınlanmış mesaja dair kayıt silinmelidir)
                              İşte tam bu noktada Outbox table'ın olduğu veritabanı ile yaşanabilecek
                              olası bağlantı kopukluklarından dolayı Idempotent tasarımı kullanıyor olacağız.
                              Çünkü bir sonraki job'ın tetiklenme sürecinde bu işlendiğine dair güncellenemeyen
                              ama özünde işlenen mesajlar tekrardan/yineli bir şekilde message broker'a gönderilecek
                              ve consumer tarafından veri tutarsızlığına meydan verebilecek şekilde tüketilecektir.
                              Consumer gerektiği taktirde Inbox pattern'ın getirisi olan Inbox
                              table'ı kullanarak hangi mesajları/event'leri işleyip işlemediğini tutacak ve
                              ihtimal olarak yinelenebilecek mesajların tutarsızlığa sebebiyet verebilecek durumları
                              böylece engellenmiş olacaktır.
                           */
                          int result = await OrderSingletonDatabase.ExecuteAsync(@$"UPDATE OrderOutboxes SET ProcessedDate = GETDATE()
                                                                                    WHERE IdempotentToken = '{orderOutbox.IdempotentToken}'");
      
                      }
                      OrderSingletonDatabase.DataReaderReady();
                      Console.WriteLine("Order outbox table checked!");
                  }
      
              }
          }
      

      Yukarıdaki ‘OrderOutboxPublishJob’ içeriğine göz atarsanız eğer; 15. satırda Outbox table’dan ‘ProcessedDate’ kolonu null olan tüm kayıtlar çekiliyor ve 19 ile 56. satır aralığında bu kayıtlar hangi türde bir event ise onun instance’ına dönüştürülerek message broker’a publish ediliyor. Burada özellikle 38 ile 52. satır aralığındaki nota dikkat ederseniz idempotent özelliğinin ihtiyaç duyulacağı nokta vurgulanmaktadır. Nihayetinde 53. satırda Outbox table’da publish edilen event’ler yayınlandıklarına dair işleme tabi tutulacakken veritabanı ile olası bir bağlantı kesintisi ile karşılaşma ihtimaline nazaran, publish edilmiş ve consumer tarafından işlenmiş bir verinin sonraki tetiklenmede tekrar publish edilip tutarsızlığa mahal verebilecek şekilde tekrar işlenmesini engellememiz gerekmektedir! İçeriğimizin sonunda örnek olarak geliştireceğimiz consumer üzerinden bu idempotent özellikli mesajların nasıl Inbox pattern eşliğinde işlendiğini görmüş olacaksınız.

      Tekrar yukarıdaki kod bloğuna odaklanırsak eğer ‘OrderSingletonDatabase’ isminde bir sınıf üzerinden veritabanı sürecinin işlendiğini görmekteyiz. Hemen bu sınıfında içeriğini aşağıya alarak incelersek eğer;

          public static class OrderSingletonDatabase
          {
              static OrderSingletonDatabase()
              {
                  _connection = new SqlConnection("Server=localhost, 1433;Database=OutboxExampleDB;User ID=SA;Password=1q2w3e4r+!;");
              }
      
              static IDbConnection _connection;
              public static IDbConnection Connection
              {
                  get
                  {
                      if (_connection.State == ConnectionState.Closed)
                          _connection.Open();
                      return _connection;
                  }
              }
      
              public static async Task<IEnumerable<T>> QueryAsync<T>(string sql)
                  => await _connection.QueryAsync<T>(sql);
      
              public static async Task<int> ExecuteAsync(string sql)
                  => await _connection.ExecuteAsync(sql);
      
              static bool _dataReaderState = true;
              public static bool DataReaderState { get => _dataReaderState; }
      
              public static void DataReaderReady() => _dataReaderState = true;
              public static void DataReaderBusy() => _dataReaderState = false;
          }
      

      şeklinde olduğunu göreceğiz. Burada özellikle singleton bir şekilde tasarlanmış olan ‘SqlConnection’ nesnesi yönetilmekte ve ayrıca Dapper kütüphanesinin temel sorgulama metotları(QueryAsync, ExecuteAsync) işlenmektedir. Burada önemli olan ‘DataReaderState’ property’sinin mahiyetini anlamaktır. ‘OrderOutboxPublishJob’ sınıfı içerisinde her 5 saniyede bir Outbox table üzerinde işlem gerçekleştirirken bazen binlerce verinin işlenip publish edilmesi 5 saniyeyi geçebilmektedir. Böyle bir durumda işlemi bitmeyen bir DataReader nesnesini tekrardan(5 saniye sonra) kullanmaya çalışmak hataya sebep olacaktır. Dolayısıyla bu hatanın engellenebilmesi için DataReader nesnesinin state’ini bu property ile tutmakta ve DataReaderReady ile DataReaderBusy fonksiyonlarıyla yönetimi sağlanmaktadır. Bu bilgiler eşliğinde tekrardan ‘OrderOutboxPublishJob’ sınıfına göz atarsanız eğer 12., 14. ve 57. satırlarda bu kontroller eşliğinde DataReader güvenliğine göre Outbox table sorgulanmaktadır.

      Pulling Publisher yöntemiyle Outbox table’dan mesajların nasıl okunabileceğine gayet yeterli bir örnek sunduğumuz kanaatindeyim. Şimdi ise diğer yöntemi de değerlendirip konumuza devam edebiliriz.

    • Transaction Log Tailing
      Bu yöntemde ise Outbox table’dan transaction logları okuyarak eklenen mesajları anında publish edebiliriz. Bunun için Debezium teknolojisini kullanan basit bir console uygulaması geliştirebiliriz.

      Bunun için ‘OutboxExample.TransactionLog.Service’ isminde bir console uygulaması oluşturalım. Ardından ‘OutboxExample\TransactionLogService’ dizininde aşağıdaki docker compose dosyasını oluşturalım ve
      docker-compose -f docker-compose-sqlserver.yaml up talimatı eşliğinde çalıştıralım.

      version: '3.1'
      services:
        zookeeper:
          image: debezium/zookeeper
          ports:
            - "2181:2181"
            - "2888:2888"
            - "3888:3888"
        kafka:
          image: debezium/kafka
          ports:
            - "9092:9092"
            - "29092:29092"
          depends_on:
            - zookeeper
          environment:
            - ZOOKEEPER_CONNECT=zookeeper:2181
            - KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092
            - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
            - KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092
            - KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
        connect:
          image: debezium/connect
          ports:
            - 8083:8083
          environment:
            - BOOTSTRAP_SERVERS=kafka:9092
            - GROUP_ID=1
            - CONFIG_STORAGE_TOPIC=my_connect_configs
            - OFFSET_STORAGE_TOPIC=my_connect_offsets
            - STATUS_STORAGE_TOPIC=my_connect_statuses
          depends_on:
            - zookeeper
            - kafka
      

      Outbox Design Pattern(Outbox Tasarım Deseni)Ardından veritabanında ve Outbox tablosunda CDC(Change Data Capture)’yi aktifleştirebilmek için aşağıdaki konfigürasyonları çalıştıralım.

      Veritabanı seviyesinde CDC’yi aktifleştirmek için;

      USE OutboxExampleDB
      GO
      EXEC sys.sp_cdc_enable_db
      

      Tablo seviyesinde CDC’yi aktifleştirmek için;

      USE OutboxExampleDB
      GO
      EXEC sys.sp_cdc_enable_table 
      @source_schema = N'dbo', 
      @source_name = N'OrderOutboxes', 
      @role_name = NULL, 
      @filegroup_name = N'', 
      @supports_net_changes = 0 
      

      Sırada Debezium için bir connector tanımlamaya gelmiştir. Bunun için aşağıdaki json formatındaki register konfigürasyonları eşliğinde curl talimatını post edelim.

      {
        "name": "OutboxExample-Connector",
        "config": {
          "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
          "tasks.max": "1",
          "database.hostname": "host.docker.internal",
          "database.port": "1433",
          "database.user": "SA",
          "database.password": "1q2w3e4r+!",
          "database.dbname": "OutboxExampleDB",
          "database.server.name": "MSSQLServer",
          "schema.include.list": "dbo",
          "table.whitelist": "dbo.OrderOutboxes",
          "database.history.kafka.bootstrap.servers": "kafka:9092",
          "database.history.kafka.topic": "dbhistory.outboxtable"
        }
      }
      
      curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json

      Outbox Design Pattern(Outbox Tasarım Deseni)Evet, connector’de tanımlandığına göre Outbox table’da yapılan tüm değişiklikleri takip edebilir ve yeni gelen mesajları rahatlıkla publish edebiliriz. Bunun için console uygulamasına geliniz ve aşağıdaki çalışmayı gerçekleştiriniz.

      using Confluent.Kafka;
      
      ConsumerConfig config = new()
      {
          GroupId = "MSSQLServer.dbo.OrderOutboxes",
          BootstrapServers = "localhost:29092",
          AutoOffsetReset = AutoOffsetReset.Earliest
      };
      
      using IConsumer<Ignore, string> consumer = new ConsumerBuilder<Ignore, string>(config).Build();
      consumer.Subscribe("MSSQLServer.dbo.OrderOutboxes");
      
      while (true)
      {
          CancellationTokenSource cancellationTokenSource = new();
          ConsumeResult<Ignore, string> result = consumer.Consume(cancellationTokenSource.Token);
      
          Console.WriteLine(result.Value);
          /*
           Şu aşamadan sonra result.Value'da ki veriler ayıklanıp message broker'a gönderilebilir.
           */
      }
      

      Yukarıdaki kod bloğunu incelerseniz eğer ‘Kafka’dan gelecek olan ve Outbox table’daki verileri barındıran mesajları serilize edip esas message broker’a gönderebilecek temeller atılmış bulunmaktadır. Bundan sonrası içeriğin şişmesine ve yapılanların tekrar edilmesine sebebiyet verebileceğinden dolayı devamını sizlere bırakıyorum.

  • Adım 12 (Consumer’ın geliştirilmesi)
    Artık yolun son virajına gelmiş bulunmaktayız. Geliştirdiğimiz uygulamayı tüketecek olan consumer’ı geliştirecek ve Inbox pattern ile idempotent güvencesini sağlıyor olacağız. Bunun için ‘Presentation’ katmanında ‘OutboxExample.Stock.API’ uygulaması oluşturalım ve ‘Program.cs’ dosyasında aşağıdaki konfigürasyonları gerçekleştirelim.

    using MassTransit;
    
    var builder = WebApplication.CreateBuilder(args);
    
    builder.Services.AddMassTransit(configurator =>
    {
        configurator.AddConsumer<OrderCreatedEventConsumer>();
        configurator.UsingRabbitMq((context, _configurator) =>
        {
            _configurator.Host(builder.Configuration["RabbitMQ:Host"], "/", hostConfigurator =>
            {
                hostConfigurator.Username(builder.Configuration["RabbitMQ:Username"]);
                hostConfigurator.Password(builder.Configuration["RabbitMQ:Password"]);
            });
            _configurator.ReceiveEndpoint("stock-order-created-event", e => e.ConfigureConsumer<OrderCreatedEventConsumer>(context));
        });
    });
    
    builder.Services.AddControllers();
    builder.Services.AddEndpointsApiExplorer();
    builder.Services.AddSwaggerGen();
    

    Ardından ‘OrderCreatedEventConsumer’ isimli consumer’ı oluşturularak içeriğini geliştirmek üzere sonraki adımlara bırakalım.

        public class OrderCreatedEventConsumer : IConsumer<OrderCreatedEvent>
        {
            public async Task Consume(ConsumeContext<OrderCreatedEvent> context)
            {
                //...
                //...
                //processing
                //...
                //...
                Console.WriteLine($"Stock {context.Message.Quantity} miktar kadar düşürülmüştür!");
            }
        }
    
  • Adım 13 (Inbox Pattern temellerinin atılması)
    Şimdi consumer’a gelen mesajları işlemeden önce idempotent özelliği gereği önceden işlenip işlenmediğini sorgulamak için Inbox pattern mantığı doğrultusunda Inbox table’ın entity’sini oluşturalım. ‘Domain’ katmanına gelelim ‘OrderInbox’ adında entity’i aşağıdaki içerikle ekleyelim.

        public class OrderInbox
        {
            public Guid OrderId { get; set; }
            public int Quantity { get; set; }
            public string Description { get; set; }
            public bool Processed { get; set; }
            public Guid IdempotentToken { get; set; }
        }
    

    Yukarıdaki Inbox table mahiyetinde geliştirilen ‘OrderInbox’ entity’sine göz atarsanız eğer ‘Processed’ property’si sayesinde ilgili mesajın/event’in işlenip işlenmediğini ifade edecek bir alan barındırmaktadır. Bizler publisher’dan gelecek olan mesajı önce bu tabloya kaydedecek sonrasında ise bu tablo üzerinden alıp işleme tabi tutacağız. Süreç başarıyla neticelenirse eğer ‘Processed’ kolonunu ‘true’ olarak güncelleyip tutarlılığı hat safhaya çıkaracağız.

    Velhasıl şimdi bu entity’nin repository arayüzünü ve concrete’ini oluşturalım.

        public interface IOrderInboxRepository : IRepository<OrderInbox>
        {
        }
    
        public class OrderInboxRepository : Repository<OrderInbox>, IOrderInboxRepository
        {
            public OrderInboxRepository(OutboxExampleDbContext context) : base(context)
            {
            }
        }
    

    Tabi IoC’den bu repository’nin çağrılabilmesi için service registration’ı unutmayalım.

        public static class ServiceRegistrations
        {
            public static void AddPersistenceServices(this IServiceCollection services)
            {
                .
                .
                .
                services.AddScoped<IOrderInboxRepository, OrderInboxRepository>();
            }
        }
    

    Son olarak ise context nesnesine DbSet olarak ekleyelim ve migration basıp, migrate edelim.

        public class OutboxExampleDbContext : DbContext
        {
            public OutboxExampleDbContext(DbContextOptions options) : base(options)
            {
            }
            public DbSet<Order> Orders { get; set; }
            public DbSet<OrderOutbox> OrderOutboxes { get; set; }
            public DbSet<OrderInbox> OrderInboxes{ get; set; }
            protected override void OnModelCreating(ModelBuilder modelBuilder)
            {
                modelBuilder.ApplyConfiguration(new OrderConfiguration());
                modelBuilder.ApplyConfiguration(new OrderOutboxConfiguration());
                modelBuilder.ApplyConfiguration(new OrderInboxConfiguration());
            }
        }
    
  • Adım 14 (Consumer’da idempotent ilkesinin uygulanması)
    Consumer’ı aşağıdaki gibi geliştirelim.

        public class OrderCreatedEventConsumer : IConsumer<OrderCreatedEvent>
        {
            readonly IOrderInboxRepository _orderInboxRepository;
    
            public OrderCreatedEventConsumer(IOrderInboxRepository orderInboxRepository)
            {
                _orderInboxRepository = orderInboxRepository;
            }
    
            public async Task Consume(ConsumeContext<OrderCreatedEvent> context)
            {
                bool hasData = _orderInboxRepository.GetWhere(oi => oi.IdempotentToken == context.Message.IdempotentToken && oi.Processed).Any();
                /*
                 hasData : true ise demek ki bu event önceden gelmiş ve başarıyla işlenmiş lakin publisher tarafından gönderildiğine
                 dair güncelleme işlemi bilinmeyen bir sebepten dolayı gerçekleştirilemediği için tekrardan
                 publish edilmiştir. İşte idempotent özelliği sayesinde bu event'in tekrar işlenmesinin önüne böylece geçilmektedir.
                 */
    
                if (!hasData)
                {
                    await _orderInboxRepository.AddAsync(new()
                    {
                        Description = context.Message.Description,
                        IdempotentToken = context.Message.IdempotentToken,
                        OrderId = context.Message.OrderId,
                        Quantity = context.Message.Quantity,
                        Processed = false
                    });
                    await _orderInboxRepository.SaveChangesAsync();
                }
    
                //Inbox table'a kaydedilen mesaj/event çekilip işleme tabi tutulmaktadır.
    
                List<OrderInbox> orderInboxes = _orderInboxRepository.GetWhere(oi => !oi.Processed).ToList();
                foreach (var orderInbox in orderInboxes)
                {
                    Console.WriteLine(@$"OrderId : {orderInbox.OrderId}
                                             Id : {orderInbox.IdempotentToken}
                                             Stock {orderInbox.Quantity} miktar kadar düşürülmüştür!");
                    orderInbox.Processed = true;
                }
                await _orderInboxRepository.SaveChangesAsync();
    
            }
        }
    

    Görüldüğü üzere gelen mesaj önce Inbox table’da var mı kontrol edilmekte, yoksa eğer eklenmekte ve ardından işleme tabi tutulmaktadır.

İşte bu kadar 🙂

Test Edelim

Şimdi yaptığımız bu çalışmayı test edebiliriz…
Bunun için kıssadan hisse yaparak SQL sorgularından istifade edeceğiz.

Aşağıdaki sorgu ile hem ‘Orders’ hem de ‘OrderOutboxes’ tablolarına eşzamanlı olarak 10000 kayıt atılabilmektedir.

DECLARE @i INT = 0
WHILE @i < 10000
BEGIN
	DECLARE @id UNIQUEIDENTIFIER = NEWID()
	INSERT Orders(Id, Description, Quantity) VALUES(@id, 'sipariş - ' + CAST(@i as NVARCHAR(MAX)), @i * 2)
	INSERT OrderOutboxes(OccuredOn, Payload, Type, IdempotentToken) VALUES(GETDATE(), '{"Id":"' + CAST(@id as NVARCHAR(MAX)) + '","Quantity":0,"Description":"sipariş - ' + CAST(@i as NVARCHAR(MAX))+ '"}', 'OrderCreatedEvent', NEWID())
	SET @i = @i + 1
END

Yukarıdaki sorgu çalışırken bir yandan da ‘ProcessOutboxJob.Service’ ve ‘Stock.API’ uygulamaları ayakta olduğu taktirde aşağıdaki sorgular eşliğinde görseldeki gibi tutarlı mesajlaşma sürecini gözlemleyebiliriz.

SELECT * FROM Orders
SELECT * FROM OrderOutboxes 
WHERE ProcessedDate IS NULL
SELECT * FROM OrderInboxes

Outbox Design Pattern(Outbox Tasarım Deseni)Ara ara ‘OrderOutboxPublishJob’ sınıfındaki update operasyonunu kaldırıp süreci devam ettirirsek eğer Outbox table’da işlendiği halde hala güncellenmeyen kayıtların kaldığını göreceksiniz. Bu kayıtların sonraki süreçte tutarsız herhangi bir işleme maruz kalmaksızın başarılı bir şekilde kaldırılacağını gözlemleyebilirsiniz. Eğer ki tutarsızlık olduğunu düşünüyorsanız aşağıdaki SQL sorgusu ile ‘Orders’ ve ‘OrderInboxes’ tabloları arasındaki farkları görebilir ve süreçte hangi verilerde tutarsızlık olabileceğini verisel açıdan monitör edebilirsiniz.

SELECT * FROM Orders
LEFT JOIN OrderInboxes
ON Orders.Id = OrderInboxes.OrderID
WHERE OrderInboxes.OrderID IS NULL

Yukarıdaki sorgu neticesinde gelen veriler olursa eğer aşağıdaki sorgu ile de Outbox table’da ki halini tarayıp rahatlıkla son durum hakkında bilgi edinebilirsiniz.

SELECT * FROM OrderOutboxes 
WHERE IdempotentToken = '80A716BA-6549-4A26-AD89-59D0FF63D87D'

Outbox Design Pattern(Outbox Tasarım Deseni)

Nihai olarak;
Outbox pattern ile distributed sistemlerdeki iletişim sürecinde tahaahhüt edilebilmesi zor olan işlemleri daha garantili hale getiriyor ve hatta akrabası olan Inbox pattern desteğiyle de süreçte işlenen veriler üzerinde garanti seviyesini bir tık daha ileri taşıyoruz. Yapısal olarak Outbox pattern’ın ihtiyaç noktalarında oldukça elverişli ve hayat kurtaran ama bir o kadar da zahmetli mimarisi olduğunu görmüş olduk. Bu içeriğimizde değinmemiş olsak da Outbox table’a veri eklenecek tüm eylemleri ihtiyaç noktalarında değil de notification yapılanmalarıyla koordine etmeye çalışmak mimarisel açıdan daha şık bir davranış sergilememizi sağlayacaktır kanaatindeyim. Bunuda sizlere bir fikir olması temennisiyle sunup içeriğimi noktalıyorum.

Üşenmeden okuyup, eşlik ettiğiniz için teşekkür ederim.

İlgilenenlerin faydalanması dileğiyle…
Sonraki yazılarımda görüşmek üzere…
İyi çalışmalar…

Not : Örnek çalışmayı aşağıdaki github adresinden inceleyebilirsiniz.
https://github.com/gncyyldz/Outbox-Design-Pattern-Example

Bunlar da hoşunuza gidebilir...

11 Cevaplar

  1. Tarik dedi ki:

    Efsane

  2. Umut dedi ki:

    Üstad normal iş hayatının yanında nasıl fırsat bulup da hem youtube üzerinden içerikler hazırlayıp hem de buradan makaleler yayınlıyorsun inan şaşıyorum. Yazılımı çok iyi öğretiyorsun ama asıl çalışma azmin örnek alınmalı diye düşünüyorum. Başarıların daim olsun 🙂

  3. Ersin dedi ki:

    Hocam affınıza sığınarakdan, Entity Framework Core’dan sonra şöyle bir web api’nin içinden geçsek herşeyiyle fena olmazmıydı :/

  4. Hasan dedi ki:

    Emeğine sağlık Gencay hocam, çok faydalı bir iiçerik daha kaleme almışsın. teşekkürler.

  5. Malitutuncu dedi ki:

    Hocam tek kelime ile efsane

  6. Murat Yüceer dedi ki:

    Eline sağlık, makalenin başında belirttiğin lock for update mevzusunu outbox ve inbox tablolarına select çekerken uygulayarak örneği güncelleyebilirsin.

  7. Ömer dedi ki:

    Çok güzel anlatmışsınız elinize sağlık.
    Bir sorum olacak , outbox biraz daha producer tarafında ki sorunları kaldırmak için oluyor.
    Örnek veriyorum consumer tarafında da bir sorun oldu vs. bunun için de bir pattern var mıdır ?

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir