.Net Core ve Debezium ile MSSQL Veri Tabanında Change Data Capture (CDC) Nasıl Yapılır?

Mehmet Can Tas
Devops Türkiye☁️ 🐧 🐳 ☸️
8 min readOct 15, 2020

--

Herkese merhaba,

Bu makalede Change Data Capture işleminin ne olduğundan, Kafka’dan, Kafka Connect’in ve Debezium’un ne iş yaptığından bahsedeceğim. Ardından .Net Core ile örnek bir uygulama yapıp Debezium ile MSSQL üzerinde değişen verileri anlık olarak yakalayıp nasıl kullanabiliriz onu inceleyeceğiz. Bahsetmemiz gereken çok konu olduğundan dolayı giriş kısmını fazla uzatmadan direkt konuya girelim.

Tüm işlemlere başlamadan önce bir docker-compose dosyası oluşturup içeriğini aşağıdaki kodlarla değiştirin ve docker-compose run komutu ile çalıştırın.

version: ‘3.1’services:zookeeper:image: confluentinc/cp-zookeeperports:- “2181:2181”environment:ZOOKEEPER_CLIENT_PORT: 2181sqlserver:image: microsoft/mssql-server-linux:2017-CU9-GDR2ports:- 1433:1433environment:- ACCEPT_EULA=Y- MSSQL_PID=Standard- SA_PASSWORD=Password123321- MSSQL_AGENT_ENABLED=truekafka:image: confluentinc/cp-kafkadepends_on:- zookeeper- sqlserverports:- “9092:9092”environment:KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 5000KAFKA_BROKER_ID: 1KAFKA_MIN_INSYNC_REPLICAS: 1connector:image: debezium/connect:latestports:- “8083:8083”environment:GROUP_ID: 1CONFIG_STORAGE_TOPIC: my_connect_configsOFFSET_STORAGE_TOPIC: my_connect_offsetsBOOTSTRAP_SERVERS: kafka:9092depends_on:- zookeeper- sqlserver- kafka

Change Data Capture (CDC) Nedir?

Veri tabanında yapılan Insert, Update ya da Delete gibi işlemlerin sonrasında değişen verilerin ilk halinin ve son halinin CDC desteği sağlayan veri tabanı tarafından izlenmesi ve değişikliklerin kayıt altına alınması işlemidir diyebiliriz. Elbette bu süreci Trigger yardımıyla yapmak da mümkün. CDC’nin performanslı olmasının sebebi tüm bu bahsettiğimiz işlemleri veri tabanının log dosyasından okuyarak hizmet vermesidir. Bu sebepten dolayı Trigger’a göre çok daha performanslı bir şekilde aynı işlemi yapmamıza olanak sağlamış oluyor. Veri tabanındaki takip etmek istediğiniz tablonun tamamını takip edebilme olanağı dışında tablo içerisindeki belirli kolonlarda yapılan değişiklikleri takip edebilmeniz de mümkün. Şimdi bu özelliği MSSQL üzerinde nasıl aktifleştirebiliriz ve nasıl belirli bir tablo üzerinde bu özelliği kullanabiliriz buna bakalım.

Öncelikle MSSQL üzerinde DebeziumTest isminde bir veri tabanı ve işlemlerimizi yapacağımız tabloyu oluşturalım. Veri tabanı ve örnek tablo oluşturma işlemi için aşağıdaki 2 sorguyu sırasıyla çalıştırabilirsiniz.

CREATE DATABASE DebeziumTestUSE DebeziumTestSET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE TABLE [dbo].[Product](
[Id] [int] IDENTITY(1,1) NOT NULL,
[Name] [varchar](500) NULL,
[StockCode] [varchar](50) NULL,
[Price] [decimal](18, 2) NULL,
[StockQuantity] [int] NULL
) ON [PRIMARY]
GOALTER TABLE [dbo].[Product] ADD CONSTRAINT [PK_Product] PRIMARY KEY CLUSTERED
(
[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
GO

Ardından MSSQL üzerinde Change Data Capture özelliğini aktifleştirmek için aşağıdaki komutu çalıştırın.

USE DebeziumTest
EXEC sys.sp_cdc_enable_db

CDC işlemi için MSSQL sürümünüzün Enterprise, Developer, Enterprise Evaluation, ya da Standard sürümlerinden birisi olması gerekiyor.

Şu an veri tabanımızda CDC özelliğini aktifleştirdik. Şimdi oluşturduğumuz Product tablosu için de CDC özelliğini aktifleştirelim.

USE DebeziumTest 
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N’dbo’,
@source_name = N’Product’,
@role_name = NULL,
@filegroup_name = N'’, -- Siz burada kendi tablonuzun FileGroup değerini yazınız eğer herhangi bir FileGroup yaratmadıysanız boş bırakabilirsiniz,
@supports_net_changes = 0
GO

Yukarda da bahsettiğimiz sadece belirli kolonlardaki değişikleri izlemek için aşağıdaki parametreyi sorguya ekleyebilirsiniz.

@captured_column_list

Sorguyu çalıştırdığınızda 2 adet Job oluşturulduğunu göreceksiniz. Bunlardan birisi verileri toplar diğeri ise belirli bir tarih öncesindeki kayıtları siler.

Job 'cdc.DebeziumTest_capture' started successfully.Job 'cdc.DebeziumTest_cleanup' started successfully.

Kafka Nedir?

Apache Kafka, büyük veri akışını düşük bir gecikme süresiyle sağlamaya odaklanan open source, publisher-subscriber tabanlı bir dağıtık mesajlaşma sistemidir.

Kafka ile ilgili bilmemiz gereken birkaç temel kavramdan bahsedelim. Daha detaylı bilgi için ise Bora Kaşmer’in ve Kaan Bayram’ın aşağıdaki makalesine göz atmanızı öneririm.

Producer : Apache Kafka pub-sub tabanlı bir dağıtık mesajlaşma sistemi demiştik. Producer bu yapıdaki publisher rolü anlamına geliyor. Mesajları kendisini dinleyen subscriber’lara gönderir.

Consumer : Consumer ise pub-sub ilişkisindeki subscriber yani dinleyici rolünü ifade eder. Producer’lardan gelen mesajları dinlemektedir.

Broker : Topicleri barındıran sunuculardır. Birden fazla broker içeren gruplara ise Kafka Cluster denir. Broker’lar arasından biri lider (leader) olarak seçilir ve diğer brokerlar ise takipçi (follower) olarak belirlenir. Eğer lider seçilen broker’a bir şey olursa diğer takipçi brokerlar arasından birisi lider seçilir. Takipçi brokerlar liderdeki verilerin kopyasını tutar. Bu yapı aslında biraz da Redis’te bulunan Sentinel yapısına benziyor diyebiliriz.

Topic : Kafka üzerinde mesajların tutulduğu yapıdır.

Kafka Connect Nedir ?

Apache Kafka’nın veri tabanlarına, Consul gibi sistemlere ya da Elasticsearch gibi yapılara bağlanmasını sağlayan yapıdır. Kafka Connect iki çeşit connector bulundurmaktadır. Bunlardan birisi Sink Connector diğeri ise Source Connector adı verilen yapıdır.

Source connector, veri tabanınızdaki tüm değişiklikleri toplayarak Topic’lere yollar. Bunun yanı sıra uygulama metriklerinizi de çok düşük bir gecikme ile toplayıp kullanılabilir hale getirebilir.

Sink connector ise Apache Kafka üzerindeki verileri Elasticsearch ya da Hadoop gibi sistemlere aktarılmasını sağlar.

Debezium Nedir?

Gelelim Debezium’un ne iş yaptığına. Debezium, LindkedIn Databus ya da DynamoDB streams gibi veri tabanındaki değişiklikleri veri tabanının loglarını okuyarak Kafka Connector isimli yapı üzerinden Kafka’ya ileten, Red Hat firması tarafından geliştirilen open source bir yapıdır. Debezium’u anlık veri değişimlerinin önemli olduğu her alanda kullanabilirsiniz. Örneğin bir e-ticaret sitesinde ERP üzerinde gerçekleşen stok ve fiyat değişikliklerini anında sitenize yansıtmak isteyebilirsiniz.

Biz bu makalede Debezium’u MSSQL üzerinde kullanacağız. Debezium’un desteklediği diğer veri tabanlarını incelemek için dokümanlarına göz atabilirsiniz.

Makalemizin başında docker-compose dosyamızı çalıştırdık, veri tabanımızda tablomuzu oluşturup CDC özelliğini aktifleştirdik. Şu an her şey hazır hemen bir .Net Core Console Application oluşturup verileri nasıl alıyoruz onu inceleyelim. Aşağıdaki komutu kullanarak uygulamamızı oluşturalım.

dotnet new console --name debezium-test

Yukarılarda bahsettiğimiz gibi Debezium ile veri tabanında bir bağlantı oluşturmak için Connector isimli yapıyı kullanmamız gerekiyor. Debezium’un kendine ait bir Rest API’ı bulunmaktadır. Öncelikle aşağıdaki JSON dosyasını oluşturalım. ve ismine sqlserver-register.json diyerek kaydedelim.

{"name": "product-connector","config": {"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector","tasks.max" : "1","database.server.name" : "127.0.0.1","database.hostname" : "sqlserver","database.port" : "1433","database.user" : "sa","database.password" : "Password123321","database.dbname" : "DebeziumTest","database.history.kafka.bootstrap.servers" : "kafka:9092","database.history.kafka.topic": "dbo.Product"}}

Kafka ve Debezium aynı Docker network’unde yer aldıkları için direkt olarak container’ın ismini ve portunu bootstrap.servers parametresine verebiliriz.

Şimdi yukarıda oluşturduğumuz JSON dosyasını kullanarak Debezium’un API’ına bir istekte bulunalım.

curl -X POST -H “Content-Type: application/json” -d @sqlserver-register.json http://localhost:8083/connectors

Şu an her şey hazır sıra kodları yazmaya geldi. Kafka’yı .Net ile kullanmamız için Confluent şirketinin geliştirdiği bir paket mevcut. Bu paketi projemize ekleyelim.

dotnet add package Confluent.Kafka

Artık Kafka’ya bağlanabiliriz. Program.cs sınıfındaki Main metodunun içeriğini aşağıdaki gibi değiştirin.

try{var config = new ConsumerConfig { GroupId = "product-consumer", BootstrapServers = "127.0.0.1:9092"};using (var consumer = new ConsumerBuilder<string, string>(config).Build()){consumer.Subscribe("127.0.0.1.dbo.Product");while (true){ConsumeResult<string, string> consumeResult = consumer.Consume();Console.WriteLine($"Mesaj {consumeResult.TopicPartitionOffset} isimli topic'ten alındı :{consumeResult.Value}");consumer.Commit();}}}catch (System.Exception ex){Console.WriteLine(ex.Message);}

Şimdi çalışıp çalışmadığını bir test edelim. dotnet run komutu ile projemizi çalıştıralım. Ardından veri tabanımıza dönelim ve aşağıdaki sorguyu çalıştıralım.

INSERT INTO Product(Name,StockCode,Price,StockQuantity) VALUES('Test Product','TST-123123',104,5)

Projemizin terminal ekranına baktığımızda verimizin Kafka üzerinden geldiğini görebilirsiniz.

Kısaca oluşan JSON’ın içeriğine bir göz atalım ve neyin ne anlama geldiğinden bahsedelim.

{
"schema": {
"type": "struct",
"fields": [{
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "Id"
}, {
"type": "string",
"optional": true,
"field": "Name"
}, {
"type": "string",
"optional": true,
"field": "StockCode"
}, {
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "2",
"connect.decimal.precision": "18"
},
"field": "Price"
}, {
"type": "int32",
"optional": true,
"field": "StockQuantity"
}],
"optional": true,
"name": "_27.0.0.1.dbo.Product.Value",
"field": "before"
}, {
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "Id"
}, {
"type": "string",
"optional": true,
"field": "Name"
}, {
"type": "string",
"optional": true,
"field": "StockCode"
}, {
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "2",
"connect.decimal.precision": "18"
},
"field": "Price"
}, {
"type": "int32",
"optional": true,
"field": "StockQuantity"
}],
"optional": true,
"name": "_27.0.0.1.dbo.Product.Value",
"field": "after"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "version"
}, {
"type": "string",
"optional": false,
"field": "connector"
}, {
"type": "string",
"optional": false,
"field": "name"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
}, {
"type": "string",
"optional": false,
"field": "db"
}, {
"type": "string",
"optional": false,
"field": "schema"
}, {
"type": "string",
"optional": false,
"field": "table"
}, {
"type": "string",
"optional": true,
"field": "change_lsn"
}, {
"type": "string",
"optional": true,
"field": "commit_lsn"
}, {
"type": "int64",
"optional": true,
"field": "event_serial_no"
}],
"optional": false,
"name": "io.debezium.connector.sqlserver.Source",
"field": "source"
}, {
"type": "string",
"optional": false,
"field": "op"
}, {
"type": "int64",
"optional": true,
"field": "ts_ms"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "id"
}, {
"type": "int64",
"optional": false,
"field": "total_order"
}, {
"type": "int64",
"optional": false,
"field": "data_collection_order"
}],
"optional": true,
"field": "transaction"
}],
"optional": false,
"name": "_27.0.0.1.dbo.Product.Envelope"
},
"payload": {
"before": null,
"after": {
"Id": 5,
"Name": "Test Product",
"StockCode": "TST-123123",
"Price": "104.00",
"StockQuantity": 5
},
"source": {
"version": "1.3.0.Final",
"connector": "sqlserver",
"name": "127.0.0.1",
"ts_ms": 1602790261877,
"snapshot": "false",
"db": "DebeziumTest",
"schema": "dbo",
"table": "Product",
"change_lsn": "00000027:000007c0:0003",
"commit_lsn": "00000027:000007c0:0004",
"event_serial_no": 1
},
"op": "c",
"ts_ms": 1602790267216,
"transaction": null
}
}

schema : Size tablonuzun yapısıyla ve kolonlarınızın veri tipiyle alakalı bilgiler verir.

payload : Ulaşmak istediğimiz verilerin bulunduğu kısım.

before : Bu kısım verimizin değişmeden önceki haliyle alakalı bilgiler verir. Biz bir INSERT işlemi yaptığımız için şu an bu kısım boş.

after : Bu kısım da bize tablomuzun yapılan işlem sonrasındaki halini verir. Burada güncellenen ya da eklenen verileri bulabilirsiniz.

Şimdi bir INSERT işlemi yaptık bir de UPDATE işlemi yapalım ve yavaştan makalemizi sonlandıralım. Aşağıdaki sorguyu çalıştırarak eklediğimiz verinin stok bilgisini güncelleyelim.

UPDATE Product SET StockQuantity = 2 WHERE Id = 1

Güncelleme işlemi sonrasında aşağıdaki gibi bir JSON oluşması gerekiyor. Burada da inceleyebileceğiniz gibi before ve after kısımları bize güncelleme işleminden önceki ve sonraki değerleri veriyor.

{
"schema": {
"type": "struct",
"fields": [{
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "Id"
}, {
"type": "string",
"optional": true,
"field": "Name"
}, {
"type": "string",
"optional": true,
"field": "StockCode"
}, {
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "2",
"connect.decimal.precision": "18"
},
"field": "Price"
}, {
"type": "int32",
"optional": true,
"field": "StockQuantity"
}],
"optional": true,
"name": "_27.0.0.1.dbo.Product.Value",
"field": "before"
}, {
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "Id"
}, {
"type": "string",
"optional": true,
"field": "Name"
}, {
"type": "string",
"optional": true,
"field": "StockCode"
}, {
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "2",
"connect.decimal.precision": "18"
},
"field": "Price"
}, {
"type": "int32",
"optional": true,
"field": "StockQuantity"
}],
"optional": true,
"name": "_27.0.0.1.dbo.Product.Value",
"field": "after"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "version"
}, {
"type": "string",
"optional": false,
"field": "connector"
}, {
"type": "string",
"optional": false,
"field": "name"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
}, {
"type": "string",
"optional": false,
"field": "db"
}, {
"type": "string",
"optional": false,
"field": "schema"
}, {
"type": "string",
"optional": false,
"field": "table"
}, {
"type": "string",
"optional": true,
"field": "change_lsn"
}, {
"type": "string",
"optional": true,
"field": "commit_lsn"
}, {
"type": "int64",
"optional": true,
"field": "event_serial_no"
}],
"optional": false,
"name": "io.debezium.connector.sqlserver.Source",
"field": "source"
}, {
"type": "string",
"optional": false,
"field": "op"
}, {
"type": "int64",
"optional": true,
"field": "ts_ms"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "id"
}, {
"type": "int64",
"optional": false,
"field": "total_order"
}, {
"type": "int64",
"optional": false,
"field": "data_collection_order"
}],
"optional": true,
"field": "transaction"
}],
"optional": false,
"name": "_27.0.0.1.dbo.Product.Envelope"
},
"payload": {
"before": {
"Id": 1,
"Name": "Test Product",
"StockCode": "TST-123123",
"Price": "KKA=",
"StockQuantity": 5
},
"after": {
"Id": 1,
"Name": "Test Product",
"StockCode": "TST-123123",
"Price": "104.00",
"StockQuantity": 2
},
"source": {
"version": "1.3.0.Final",
"connector": "sqlserver",
"name": "127.0.0.1",
"ts_ms": 1602790610977,
"snapshot": "false",
"db": "DebeziumTest",
"schema": "dbo",
"table": "Product",
"change_lsn": "00000027:000009f8:0002",
"commit_lsn": "00000027:000009f8:0003",
"event_serial_no": 2
},
"op": "u",
"ts_ms": 1602790612221,
"transaction": null
}
}

Umarım faydalı bir makale olmuştur. Eksik ya da hatalı gördüğünüz bir yer varsa yorumlarda lütfen belirtin. Ayrıca bana Twitter ya da LinkedIn adreslerimden ulaşabilirsiniz. Bir sonraki makalede görüşmek üzere. İyi kodlamalar.

--

--