Yazılım Mimarileri ve Tasarım Desenleri Üzerine

Debezium(CDC) İle Veritabanlarındaki Değişiklikleri Yakalama | PostgreSQL – MSSQL – MongoDB

Debezium(CDC) İle Veritabanlarındaki Değişiklikleri Yakalama | PostgreSQL - MSSQL - MongoDB

Merhaba,

Bu içeriğimizde birden çok veri kaynağında gerçek zamanlı oluşan değişikliklerin yakalanmasını ve stream edilmesini sağlayan, open source ve distributed bir araç olan Debezium CDC(Change Data Capture)’yi inceliyor olacağız.

CDC(Change Data Capture) Nedir?

Veritabanında insert, update ve delete işlemleri neticesinde nihai olarak öncesinden farklı olan, bir başka deyişle değişiklik gösteren verileri yakalamamızı, izlememizi ve işlem yapabilmemizi sağlayan bir tasarım modelidir. Bu model sayesinde verilerin ilk ve son hallerini elde edebilir ve türlü operasyonlar gerçekleştirebiliriz. Tabi ki de CDC’yi kullanabilmek için ilgili veritabanı tarafından destekleniyor olması gerekmektedir.

La hoca! Veritabanındaki verisel değişiklikleri izleyebilmek için trigger kullanamaz mıyız?
Tabi ki de trigger ile bu işlemleri gerçekleştirebilirsiniz. Lakin CDC ile ihtiyaca dönük operasyonları daha hızlı bir şekilde işleyebilirsiniz. Çünkü CDC verilere dair tüm bilgileri veritabanı loglarından toplayarak hizmet sağlamaktadır. Bu sebepten dolayı trigger’lara nazaran daha performanslı ve az maliyetli bir iş süreci sağlamış bulunmaktadır. Ayrıca CDC sayesinden, veritabanında istenilen tablonun tamamı yahut istek doğrultusunda bir kısmı üzerinde yapılan değişiklikler takip edilerek süreçteki verisel akış filtrelenebilmekte ve arzu edinildiği şekilde yönetilebilmektedir.

Debezium Nedir?

Giriş paragrafında bahsedildiği üzere Debezium, veritabanı üzerinde yapılan değişiklikleri yakalayarak bunları stream eden, open source ve distributed platform olan Change Data Capture(CDC) aracıdır.

Debezium, veritabanında istenilen tabloda yahut o tablonun istenilen kolonlarında olan tüm verisel değişiklikleri yakalayarak Kafka’ya aktarır. Bu değişiklikleri yakalama süreci, Debezium’un çalışmadığı süreçlerde de pasif bir şekilde de gerçekleştirilebilmektedir.

Debezium, bir üstteki paragrafta bahsedildiği gibi veritabanında oluşan değişikliklere istinaden yakaladığı verilerin dayanıklılığı ve güvenilirliği için Kafka ve Kafka Connect’ten yararlanmaktadır.

Kafka Nedir?

Binlerce yazılım projesi ve şirket tarafından kullanılan, publisher/subscriber tabanlı olan open source distributed streaming platformudur. Temelde bilinmesi gereken Producer, Consumer, Broker ve Topic olmak üzere dört terminolojik terimi mevcuttur;

Debezium ile veritabanında yakalanan değişikliklerin Kafka’ya aktarılabilmesi için verilerin bunun anlayabileceği dilden bir formata dönüştürülmesi gerekmektedir. Bunun için Kafka Connect tool’unu kullanıyor olacağız.

Kafka Connect Nedir?
Kafka’nın veritabanlarına, consule yahut eleasticsearch gibi yapılara bağlanabilmesini ve metrikleri alabilmesini sağlayan bir tool’dur. Hususi olarak içeriğimiz çerçevesinde değerlendirirsek eğer Kafka ile Debezium arasında veri akışını sağlayacak olan köprü görevi gören yapılanmadır diyebiliriz.

ZooKeeper Nedir?

Birçok yapılandırma bilgisinin, distributed mimariler tarafından şu veya bu şekilde kullanıldığını biliyoruz. Tabi bu bilgilerin yönetimi manuel bir şekilde yapıldığı taktirde uygulanmalarının zorluğu nedeniyle insani durumlardan kaynaklı hatalar kaçınılmaz olabilmektedir. Hadi diyelim doğru bir şekilde yönetildiler, bu sefer de uygulamalar dağıtıldığında yönetim karmaşası gibi durumlarla karşılaşılabilmektedir. İşte ZooKeeper, tüm bu riskleri gözardı edip güvenilir distributed yapılandırma yapmamızı sağlamaktadır.

Haliyle, Kafka’nın yapılandırma bilgilerini depolamak, adlandırmak, senkronizasyonunu sağlamak ve koordine etmek için ZooKeeper yazılımını kullanıyor olacağız.

Debezium entegrasyonlarında ZooKeeper, temel kurulum dışında derinlemesine anlaşılmayı gerektirmemektedir.

Şimdi, temel enstrümanları ve kullanacağımız teknolojileri teoride incelediğimize göre artık pratikte Debezium’u inceleyebiliriz. Bunun için hedeflediğim üç farklı veritabanı üzerinden(PostgreSQL, MSSQL ve MongoDB) çalışma sergiliyor olacağız. Hadi buyrun, fazla vakit kaybetmeksizin ilk olarak PostgreSQL’den başlayalım.

Debezium İle PostgreSQL’de Change Data Capture(CDC) Nasıl Yapılır?

İlk olarak Debezium’u hangi veritabanıyla kullanırsak kullanalım temelde Kafka ve Debezium çalıştırılmalı ve ZooKeeper ile yapılandırmalar ayarlanmalıdır. Tüm bu yazılımlarım ayağa kaldırılması için Docker’dan istifade ediyor olacağız. Şimdi gelin ihtiyacımız olan araçları hızlıca oluşturup, ayağa kaldıralım.

docker-compose-postgres.yaml

version: '3.1'
services:
  zookeeper:
    image: debezium/zookeeper
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"
  kafka:
    image: debezium/kafka
    ports:
      - "9092:9092"
      - "29092:29092"
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
      - KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092
      - KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
  postgres:
    image: debezium/postgres
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=123456
      - POSTGRES_DB=DebeziumDB
  connect:
    image: debezium/connect
    ports:
      - 8083:8083
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    depends_on:
      - zookeeper
      - kafka

Yukarıdaki docker compose dosyasına bakarsanız eğer ihtiyaç doğrultusundaki ilgili tüm araçları barındırmaktadır. Tüm image’lar da debezium’lu versiyonları kullanmamızın nedeni debezium ile çalışmaları için gerekli ayarlarının etkin gelmesindendir. PostgreSQL’de ise debezium’un sağlıklı çalışabilmesi için wal_level konfigürasyonuna ‘logical’ değeri verilmelidir. Bunun nedeni, PostgreSQL’de herhangi bir transaction yürütüldüğünde öncelikle bu işlem Write-ahead logging(WAL) adı verilen yerde gerçekleştirilmektedir. Debezium’da gerekli verileri WAL’dan toplayacağı için bu şekilde ayarlanması gerekmektedir. Haliyle debezium/postgres bu konfigürasyon temellerinde gelmektedir. Şimdi PostgreSQL sunucusunun yapılandırmalarına bir göz atıp öyle devam edelim.

Logical, WAL verilerini harici sistemlerin tüketebilmesi için gerekli izin konfigürasyonunu sağlar.

Şimdi bu docker compose dosyasını
docker-compose -f docker-compose-postgres.yaml up
talimatı eşliğinde çalıştıralım.
Tüm yüklemelerin bitip, container’ların ayağa kaldırıldığından emin olalım.

Şimdi PostgreSQL sunucusunda yukarıdaki docker-compose-postgres.yaml dosyasında oluşturulan ‘DebeziumDB’ veritabanı içerisinde aşağıdaki örnek tabloyu ve şemayı oluşturalım ve ardından örnek bir kaç veri insert edelim.

create  schema exampleschema
set search_path TO exampleschema,public;
create table exampleschema.exampletable (
    column1 int,
    column2 int,
    column3 varchar(150),
    primary key(column1)
);
alter table exampleschema.exampletable replica identity FULL;
insert into exampleschema.exampletable values (1000, 2000, 'example value');
insert into exampleschema.exampletable values (1001, 2001, 'example value 2');

Şimdi ise debezzium connector’ın bizlere sunmuş olduğu API aracılığıyla dinlenecek olan veritabanı ve tablo bilgilerinin verilmesi gerekmektedir. Bunun için aşağıdaki gibi bir .json dosyası oluşturarak gerekli bilgileri içerisinde tanımlamamız gerekmektedir.

register-postgres.json

{
    "name": "example-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "123456",
        "database.dbname" : "DebeziumDB",
        "database.server.name": "exampleserver",
        "schema.include.list": "exampleschema",
        "table.whitelist": "exampleschema.exampletable"
    }
}

Bu bilgileri tanımladıktan sonra curl üzerinden aşağıdaki post isteğini göndermemiz yeterlidir.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.jsonBöylece bu son hamle eşliğinde bir connector tanımlayarak debezium’u etkinleştirmiş bulunmaktayız. Connector’ı etkinleştirme neticesinde yukarıdaki görselde tek satır halinde gelen result’ı doğru düzgün görmek isteyenler için formatlandırıp aşağıya bırakıyorum;

{
   "name":"example-connector",
   "config":{
      "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
      "tasks.max":"1",
      "database.hostname":"postgres",
      "database.port":"5432",
      "database.user":"postgres",
      "database.password":"123456",
      "database.dbname":"DebeziumDB",
      "database.server.name":"exampleserver",
      "schema.include.list":"exampleschema",
      "table.whitelist":"exampleschema.exampletable",
      "name":"example-connector"
   },
   "tasks":[
      
   ],
   "type":"source"
}

Ve şimdi ise debezium connector ile takip edilen veritabanı sunucusundaki ilgili tablodaki veri değişikliklerinin izlemesini gerçekleştirelim. Bunun için aşağıdaki docker talimatının verilmesi yeterlidir.

docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --property print.key=true --topic exampleserver.exampleschema.exampletable
Evet, görüldüğü üzere PostgreSQL veritabanındaki yapılan değişiklikler yakalanmış bulunmaktadır. Hatta debezium connector bağlanmadan önce girilen örnek veriler bile önceki paragraflarda bildirildiği üzere debezium çalıştırıldıktan sonra yakalanmış ve elde edilmiştir. Şimdi gelen json verileri düzgün formata getirip, inceleyelim.

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"column1"
         }
      ],
      "optional":false,
      "name":"exampleserver.exampleschema.exampletable.Key"
   },
   "payload":{
      "column1":1000
   }
}{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"struct",
            "fields":[
               {
                  "type":"int32",
                  "optional":false,
                  "field":"column1"
               },
               {
                  "type":"int32",
                  "optional":true,
                  "field":"column2"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"column3"
               }
            ],
            "optional":true,
            "name":"exampleserver.exampleschema.exampletable.Value",
            "field":"before"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"int32",
                  "optional":false,
                  "field":"column1"
               },
               {
                  "type":"int32",
                  "optional":true,
                  "field":"column2"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"column3"
               }
            ],
            "optional":true,
            "name":"exampleserver.exampleschema.exampletable.Value",
            "field":"after"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_ms"
               },
               {
                  "type":"string",
                  "optional":true,
                  "name":"io.debezium.data.Enum",
                  "version":1,
                  "parameters":{
                     "allowed":"true,last,false,incremental"
                  },
                  "default":"false",
                  "field":"snapshot"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"sequence"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"schema"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"table"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"txId"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"lsn"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"xmin"
               }
            ],
            "optional":false,
            "name":"io.debezium.connector.postgresql.Source",
            "field":"source"
         },
         {
            "type":"string",
            "optional":false,
            "field":"op"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"total_order"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"data_collection_order"
               }
            ],
            "optional":true,
            "field":"transaction"
         }
      ],
      "optional":false,
      "name":"exampleserver.exampleschema.exampletable.Envelope"
   },
   "payload":{
      "before":null,
      "after":{
         "column1":1000,
         "column2":2000,
         "column3":"example value"
      },
      "source":{
         "version":"1.8.1.Final",
         "connector":"postgresql",
         "name":"exampleserver",
         "ts_ms":1647763176365,
         "snapshot":"true",
         "db":"DebeziumDB",
         "sequence":"[null,\"23723656\"]",
         "schema":"exampleschema",
         "table":"exampletable",
         "txId":557,
         "lsn":23723656,
         "xmin":null
      },
      "op":"r",
      "ts_ms":1647763176368,
      "transaction":null
   }
}

Yukarıdaki json datayı incelerseniz eğer 177 ile 200. satır aralığında değişiklik olan verileri içeren ‘payload’ alanı mevcuttur. Yakalanan verilerin hangi aksiyona tabii tutulduğunu 198. satırdaki ‘op’ alanındaki c, u, d ve r değerleri ifade etmektedir.

c Yakalanan verinin eklendiğini ifade eder.
u Yakalanan verinin güncellendiğini ifade eder.
d Yakalanan verinin silindiğini ifade eder.
r Yakalanan verinin zaten var olduğunu ifade eder.

Peki hoca, gelen json data’da ki diğer alanlar neyin nesi? diye sorarsanız eğer şöyle cevaplayalım;

Şimdi test amaçlı aşağıdaki veritabanı sorgularını çalıştıralım.

update exampleschema.exampletable
set
column1 = 5000,
column2 = 6000,
column3 = 'updated example value'
where column1 = 1000

Görüldüğü üzere çalıştırılan sorgu update sorgusu ise veritabanı açısından bu bir delete ve insert işlemi olduğu için önce ‘op’ değeri ‘d’ sonra da ‘c’ olan iki kayıt yakalanmaktadır. Haliyle ‘before’ ve ‘after’ alanlarından update yapılan sorguya dair verileri rahatlıkla yakalayabilmekteyiz.

delete from exampleschema.exampletable
where column1 = 5000

Delete sorgusunda da aynı mantık silinecek veriye dair önceki kayıtlar elde edilebilmektedir.

Böylece PostgreSQL ile CDC çalışmasını gerçekleştirmiş olduk. Mevzu bahis, Debezium ile yakalanan verilerin Kafka üzerinden .NET Core ortamında okunmasına gelmeden önce MSSQL ve MongoDB üzerinden de CDC entegrasyonunu ele alalım. Nihayetinde tüm örneklendirmeler Kafka kullanacağından dolayı içeriğimizin sonlarına doğru .NET Core ortamında müşterek bir uygulama oluşturarak konuyu tek elden örneklendirebiliriz.

Debezium İle MSSQL’de Change Data Capture(CDC) Nasıl Yapılır?

İlk olarak MSSQL veritabanıyla birlikte ilgili image’ları ayağa kaldırabilmek için aşağıdaki docker compose dosyasını oluşturalım.

docker-compose-sqlserver.yaml

version: '3.1'
services:
  zookeeper:
    image: debezium/zookeeper
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"
  kafka:
    image: debezium/kafka
    ports:
      - "9092:9092"
      - "29092:29092"
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
      - KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092
      - KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
  sqlserver:
    image: mcr.microsoft.com/mssql/server
    ports:
      - 1433:1433
    environment:
      - ACCEPT_EULA=Y
      - MSSQL_PID=Standard
      - SA_PASSWORD=1q2w3e4r!^+
      - MSSQL_AGENT_ENABLED=true
  connect:
    image: debezium/connect
    ports:
      - 8083:8083
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    depends_on:
      - zookeeper
      - kafka

Ardından aşağıdaki talimat eşliğinde ilgili dosyayı çalıştıralım.
docker-compose -f docker-compose-sqlserver.yaml upŞimdi SQL Server’da örnek bir veritabanı ve tablo oluşturalım.

create database DebeziumDB
go
use DebeziumDB
go
create schema exampleschema
go
create table exampleschema.exampletable
(
	column1 int primary key identity(1,1),
	column2 int,
	column3 nvarchar(max)
)

Akabinde oluşturulan veritabanı içerisinde verisel değişiklikleri yakalayabilmek için gerekli CDC aktifleştirme konfigürasyonunu gerçekleştirelim.

use DebeziumDB
go
exec sys.sp_cdc_enable_db

Bu veritabanı seviyesinde bir CDC aktifleştirmeyken,

use DebeziumDB
go
exec sys.sp_cdc_enable_table 
@source_schema = N'exampleschema', 
@source_name = N'exampletable', 
@role_name = NULL, 
@filegroup_name = N'', 
@supports_net_changes = 0 

bu ise tablo seviyesinde CDC aktifleştirmesidir.

Bu işlemlerden sonra sıra debezium için bir connector tanımlamasında bulunmaya gelmiştir. Bunun için aşağıdaki register konfigürasyonlarını barındıran .json dosyasını curl üzerinden verilen talimat eşliğinde post edilmesi yeterlidir.

register-sqlserver.json

{
    "name": "example-connector",
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max": "1",
        "database.hostname": "sqlserver",
        "database.port": "1433",
        "database.user": "sa",
        "database.password": "1q2w3e4r!^+",
        "database.dbname": "DebeziumDB",
        "database.server.name": "exampleserver",
        "schema.include.list": "exampleschema",
        "table.whitelist": "exampleschema.exampletable",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "dbhistory.fullfillment"
    }
}

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
Yukarıdaki görseli incelerseniz eğer connector oluşturma neticesinde result’unu bizlere döndürmüş vaziyettedir. Result’un doğru formatlandırılmış halini bir önceki PostgreSQL örneklendirmesinde sunduğum için burada lüzum görmemekteyim.

Artık debezium ile bağlantısı kurulan SQL Server’ın üzerinde yapılan değişikliklerin yakalanıp, yakalanmadığını aşağıdaki talimat eşliğinde takip edebiliriz.
docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --property print.key=true --topic exampleserver.exampleschema.exampletable

Evet… SQL Server ile de CDC çalışmasını tamamlamış bulunmaktayız. Şimdi sırada NoSQL yaklaşımına sahip olan MongoDB var 🙂

Debezium İle MongoDB’de Change Data Capture(CDC) Nasıl Yapılır?

MongoDB için örneklendirmede biraz hızlı ilerliyor olacağız. Şimdi aşağıdaki docker compose içeriğini verilen talimat eşliğinde çalıştırarak başlayalım.

version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: quay.io/debezium/kafka
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mongodb:
    image: quay.io/debezium/example-mongodb
    hostname: mongodb
    ports:
     - 27017:27017
    environment:
     - MONGODB_USER=debezium
     - MONGODB_PASSWORD=dbz
  connect:
    image: quay.io/debezium/connect
    ports:
     - 8083:8083
    links:
     - kafka
     - mongodb
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

docker-compose -f docker-compose-mongodb.yaml up

Akabinde MongoDB replika setini başlatalım ve test verileri ekleyelim.

docker-compose -f docker-compose-mongodb.yaml exec mongodb bash -c '/usr/local/bin/init-inventory.sh'

Ardından aşağıdaki içeriğe sahip olan connector’u verilen talimat eşliğinde başlatalım.

{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
        "tasks.max" : "1",
        "mongodb.hosts" : "rs0/mongodb:27017",
        "mongodb.name" : "dbserver1",
        "mongodb.user" : "debezium",
        "mongodb.password" : "dbz",
        "database.include.list" : "inventory",
        "database.history.kafka.bootstrap.servers" : "kafka:9092"
    }
}

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mongodb.json

Yapılan bu çalışmalar kafidir. example-mongodb image’i sayesinde örnek bir veritabanı oluşturulmuş vaziyettedir. Haliyle bu veritabanı içerisindeki verisel değişiklikleri yakalayan kafka’ya bağlanarak verileri tüketebiliriz. Bunun için aşağıdaki talimatın verilmesi yeterlidir.
docker-compose -f docker-compose-mongodb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --property print.key=true --topic dbserver1.inventory.customers
Vee işte MongoDB’dende yapılan verisel değişikliklerin yakalandığını hep birlikte görüyoruz 🙂

Şimdi veritabanlarında yapılan değişikliklerin CDC ile yakalanıp kafka’ya gönderilen datalarını C# üzerinden nasıl okuyabileceğimizi basit olarak ele almanın vakti geldi.

Debezium İle PostgreSQL, MSSQL veya MongoDB’den Change Data Capture(CDC) İle Yakalanan Verilerin C# Tarafından Okunması

Bunun için aşağıdaki gibi Confluent.Kafka kütüphanesini kullanarak basit bir consumer oluşturabiliriz.

using Confluent.Kafka;

ConsumerConfig config = new()
{
    GroupId = "exampleserver.exampleschema.exampletable",
    BootstrapServers = "localhost:29092",
    AutoOffsetReset = AutoOffsetReset.Earliest,

};

using IConsumer<Ignore, string> consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe("exampleserver.exampleschema.exampletable");

CancellationTokenSource source = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
    e.Cancel = true;
    source.Cancel();
};
while (true)
{
    ConsumeResult<Ignore, string> result = consumer.Consume(source.Token);
    Console.WriteLine($"Topic Name : {result.TopicPartitionOffset}");
    Console.WriteLine($"Message : {result.Value}");
}

Ekstra Bilgiler

Misal, Kafka’da herhangi bir topic oluşturuldu mu? ya da yukarı satırlarda oluşturduğumuz connector’lar hala ayakta mı? Bunun için aşağıdaki talimatlar eşliğinde sorgulama yapabiliyoruz…

Mevcut connectorlar : http://localhost:8083/connectors/
Topic list : docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-topics.sh --list --bootstrap-server kafka:9092
ya da
bin/kafka-topics.sh --list --bootstrap-server kafka:9092

İlgilenenlerin faydalanması dileğiyle…
Sonraki yazılarımda görüşmek üzere…
İyi çalışmalar…

Not : Örnek dosyaları aşağıdaki linklere tıklayarak indirebilirsiniz.
MongoDB, SQLServer, Postgres, DebeziumExample

Exit mobile version