Real-Time Data Streaming and Search Indexing with Azure - Part 3 - Setting up CDC
In the previous part of this series, we did the initial work for the system infrastructure using Azure Bicep. Today, we'll get deeper into setting up a Change Data Capture (CDC) pipeline to Azure Event Hub. CDC allows us to capture and stream real-time changes from our database, making it a powerful tool for real-time analytics, data integration, and, most importantly for this lab, enriching search indexes.
This guide will walk you through the key steps required to enable CDC on a SQL Server database, configure Debezium for change tracking, and stream events to Azure Event Hub in Azure.
Setup Essentials
To set up the CDC pipeline, we’ll need the following:
- A simple SQL data schema for demonstration purposes.
- A Debezium user with permissions to connect to the database and read CDC data.
- CDC enabled at both the SQL Server level and specific tables.
- A properly configured Debezium connector to stream changes to Azure Event Hub.
Data structure
For this example, we’re using a schema that demonstrates the basics of CDC with Debezium. The schema includes Products and Reviews tables, each of which can have multiple generic tags associated with them.
While the schema isn’t optimized for efficient search operations (it relies on multiple joins), this setup provides an opportunity to experiment with Azure AI Search and explore its potential for improving query performance.
SQL Schema
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[Products]') AND type in (N'U'))
BEGIN
CREATE TABLE Products (
ProductID INT PRIMARY KEY IDENTITY(1,1),
Name NVARCHAR(255),
Description NVARCHAR(MAX)
);
END;
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[Reviews]') AND type in (N'U'))
BEGIN
CREATE TABLE Reviews (
ReviewID INT PRIMARY KEY IDENTITY(1,1),
ProductID INT FOREIGN KEY REFERENCES Products(ProductID),
Rating INT CHECK (Rating >= 1 AND Rating <= 5),
Comment NVARCHAR(MAX)
);
END;
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[Tags]') AND type in (N'U'))
BEGIN
CREATE TABLE Tags (
TagID INT PRIMARY KEY IDENTITY(1,1),
Name NVARCHAR(50)
);
END;
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[ProductTags]') AND type in (N'U'))
BEGIN
CREATE TABLE ProductTags (
ProductID INT FOREIGN KEY REFERENCES Products(ProductID),
TagID INT FOREIGN KEY REFERENCES Tags(TagID),
PRIMARY KEY (ProductID, TagID)
);
END;
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[ReviewTags]') AND type in (N'U'))
BEGIN
CREATE TABLE ReviewTags (
ReviewID INT FOREIGN KEY REFERENCES Reviews(ReviewID),
TagID INT FOREIGN KEY REFERENCES Tags(TagID),
PRIMARY KEY (ReviewID, TagID)
);
END;
Debezium user
To allow Debezium to connect and read from the database, we’ll create a dedicated SQL user. This user must have access to CDC tables and permissions to track changes.
Edit the placeholders #{USER}#
and #{PASSWORD}#
in the following script to set your desired username and password:
SQL Debezium user
IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = '#{USER}#')
BEGIN
CREATE USER [#{USER}#] WITH PASSWORD = '#{PASSWORD}#'
PRINT 'User #{USER}# created.'
END
ELSE
BEGIN
PRINT 'User #{USER}# already exists.'
END
GO
IF NOT EXISTS (SELECT 1
FROM sys.database_role_members drm
JOIN sys.database_principals dp ON drm.member_principal_id = dp.principal_id
WHERE dp.name = '#{USER}#' AND drm.role_principal_id = DATABASE_PRINCIPAL_ID('db_owner'))
BEGIN
ALTER ROLE [db_owner] ADD MEMBER [#{USER}#]
PRINT 'User #{USER}# added to db_owner role.'
END
ELSE
BEGIN
PRINT 'User #{USER}# is already a member of db_owner role.'
END
GO
Note: Granting db_owner permissions is a bit excessive for CDC access, it may pose security risks. For production environments, consider defining a custom role with the minimum permissions needed for Debezium.
Enable CDC
To enable CDC, we need to activate it at the database level and configure it for specific tables.
Enable CDC
EXEC sys.sp_cdc_enable_db
GO
Repeat the following script for each table you want to track. Replace #{SCHEMA}#
and #{TABLE}#
with the schema and table names:
Table CDC
IF NOT EXISTS (
SELECT
name AS table_name,
OBJECT_SCHEMA_NAME(object_id) AS table_schema,
is_tracked_by_cdc
FROM sys.tables
WHERE is_tracked_by_cdc = 1
AND OBJECT_SCHEMA_NAME(object_id) = '#{SCHEMA}#'
AND name = '#{TABLE}#'
)
BEGIN
-- Enable CDC on selected table
EXEC sys.sp_cdc_enable_table
@source_schema = N'#{SCHEMA}#',
@source_name = N'#{TABLE}#',
@role_name = null,
@supports_net_changes = 0
END
GO
Setup Debezium connector
The Debezium connector enables Change Data Capture (CDC) by streaming real-time changes from your database tables to downstream systems. Each connector is designed for a specific database, such as MySQL, PostgreSQL, or SQL Server, and works with Apache Kafka to manage and distribute change events. To setup a connector we simply need to do a POST request to the debezium instance that is running with a confirugation file.
An example of such a configuration file for SQL Server in Azure would be this.
In a working example we would need to replace the #{KEY}#
Key values with actual data.
Connector Setup
$debeziumEndpoint = "#{DEBEZIUM_ENDPOINT}#"
$JSON = Get-Content 'sqlserver-connector-config.json' | Out-String
Invoke-RestMethod "https:/$debeziumEndpoint/connectors/" -Method POST -Body $JSON -ContentType "application/json" -AllowInsecureRedirect
Json Configuration
{
"name": "#{EVENTHUBNAMESPACE_HUB_NAME}#",
"config": {
"snapshot.mode": "schema_only",
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "#{AZURE_SQL_SERVER_NAME}#.database.windows.net",
"database.port": "1433",
"database.user": "#{AZURE_SQL_DATABASE_USER}#",
"database.password": "#{AZURE_SQL_DATABASE_PASSWORD}#",
"database.names": "#{AZURE_SQL_DATABASE_NAME}#",
"driver.encrypt": "false",
"driver.trustServerCertificate": "true",
"schema.history.internal.kafka.bootstrap.servers": "#{EVENTHUBNAMESPACE_NAME}#.servicebus.windows.net:9093",
"schema.history.internal.kafka.topic": "#{EVENTHUBNAMESPACE_HUB_NAME}#",
"schema.history.internal.consumer.security.protocol": "SASL_SSL",
"schema.history.internal.consumer.sasl.mechanism": "PLAIN",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"#{EVENTHUBNAMESPACE_CONNECTIONSTRING}#\";",
"schema.history.internal.producer.security.protocol": "SASL_SSL",
"schema.history.internal.producer.sasl.mechanism": "PLAIN",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"#{EVENTHUBNAMESPACE_CONNECTIONSTRING}#\";",
"table.include.list": "#{CDC_DATABASE_TABLES}#",
"tombstones.on.delete": "false",
"topic.prefix": "SQLAzure",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(.*)",
"transforms.Reroute.topic.replacement": "#{EVENTHUBNAMESPACE_HUB_NAME}#"
}
}
The configuration file specifies several key properties that must be defined to set up the Debezium connector. For a detailed explanation of all available properties, refer to the Debezium documentation for SQL Server connectors.
Here’s a summary of the placeholders you need to replace with your specific values:
#{EVENTHUBNAMESPACE_HUB_NAME}#: The name of the Event Hub (not the namespace) where CDC changes will be published.
#{AZURE_SQL_SERVER_NAME}#: The name of the Azure SQL Server that Debezium will connect to.
#{AZURE_SQL_DATABASE_USER}#: The username created earlier, used by Debezium to authenticate with the database.
#{AZURE_SQL_DATABASE_PASSWORD}#: The password for the Debezium username.
#{AZURE_SQL_DATABASE_NAME}#: The name of the database that Debezium will monitor for changes.
#{EVENTHUBNAMESPACE_NAME}#: The name of the Event Hub namespace.
#{EVENTHUBNAMESPACE_CONNECTIONSTRING}#: The connection string for the Event Hub namespace, used for authentication and connectivity.
#{CDC_DATABASE_TABLES}#: A comma-separated list of the tables to be monitored in the CDC process.
Each of these fields plays a role in ensuring the Debezium connector can capture and transmit change events reliably. Ensure all values are correctly set before deploying the configuration.
Updated Deployment script
For my spceific solution I have created a deployment script that I mention in the past article. This script is to simplyfy and focus on my experinemt where i dont want to mix github actions and deployment methodologies to the article.
My updated deployment script has greatly improved and does not only deploy the infrastructure. But after a successful the deploy i query certain properties to setup Firewalls, database structure, enable cdc and configure the debezium connector.
Deployment script
param(
[string]$environmentName = "dev",
[string]$sqlServerDbPassword = "p@ssW0rd",
[string]$sqlServerDebeziumUser = "debezium-usr",
[string]$sqlServerDebeziumPassword = "p@ssW0rd"
)
function Ensure-SqlServerModule {
if (-not (Get-Module -ListAvailable -Name SqlServer)) {
Write-Host "SqlServer module not found. Installing..."
Install-Module -Name SqlServer -Force -Scope CurrentUser
} else {
Write-Host "SqlServer module is already installed."
}
}
function Get-MyIp {
return (Invoke-RestMethod http://ipinfo.io/json).ip
}
function Get-FirewallIps {
param (
[Parameter(Mandatory=$true)]
[object]$deployOutput
)
$myIp = Get-MyIp
return @($myIp) + $deployOutput.outputs.debeziumOutboundIps.value
}
function Get-CurrentFirewallRules {
param (
[Parameter(Mandatory=$true)]
[string]$sqlServerId
)
return az sql server firewall-rule list --ids $sqlServerId --query "[].{id:id, name:name, startIpAddress:startIpAddress, endIpAddress:endIpAddress}" -o json | ConvertFrom-Json
}
function Remove-UnwantedSqlFirewallRules {
param (
[Parameter(Mandatory=$true)]
[array]$currentFirewallRules,
[Parameter(Mandatory=$true)]
[array]$fireWallIps
)
$currentFirewallRules | ForEach-Object {
if ($fireWallIps -notcontains $_.startIpAddress) {
Write-Host "Deleting firewall rule: $($_.name)"
az sql server firewall-rule delete --ids $_.id
}
}
}
function Add-MissingSqlFirewallRules {
param (
[Parameter(Mandatory=$true)]
[array]$missingFirewallIps,
[Parameter(Mandatory=$true)]
[object]$deployOutput
)
$missingCount = $missingFirewallIps.Count
$missingFirewallIps | ForEach-Object -Begin { $i = 0 } -Process {
$i++
Write-Host "Creating firewall rule for IP: $_ ($i of $missingCount)"
az sql server firewall-rule create -g $deployOutput.outputs.resourceGroupName.value -s $deployOutput.outputs.sqlServerName.value -n "ClientIPAddress_$($_)_$($_)" --start-ip-address $_ --end-ip-address $_ -o none
}
}
function Deploy-SqlFirewallRules {
param (
[Parameter(Mandatory=$true)]
[object]$deployOutput
)
Write-Host ""
Write-Host "Deploying firewall rules..."
$fireWallIps = Get-FirewallIps -deployOutput $deployOutput
$currentFirewallRules = Get-CurrentFirewallRules -sqlServerId $deployOutput.outputs.sqlServerId.value
if ($null -ne $currentFirewallRules -and $currentFirewallRules.Count -gt 0) {
Remove-UnwantedSqlFirewallRules -currentFirewallRules $currentFirewallRules -fireWallIps $fireWallIps
}
$missingFirewallIps = $fireWallIps | Where-Object { $currentFirewallRules.startIpAddress -notcontains $_ }
if ($missingFirewallIps -ne $null -and $missingFirewallIps.Count -gt 0) {
Add-MissingSqlFirewallRules -missingFirewallIps $missingFirewallIps -deployOutput $deployOutput
} else {
Write-Host "No missing firewall IPs to add."
}
}
function Deploy-DebeziumConnector {
param (
[Parameter(Mandatory=$true)]
[object]$deployOutput,
[Parameter(Mandatory=$true)]
[string]$sqlServerPassword,
[Parameter(Mandatory=$true)]
[string]$debeziumUser,
[Parameter(Mandatory=$true)]
[string]$debeziumPassword
)
Write-Host ""
Write-Host "Deploying Debezium connector..."
$debeziumEndpoint = $deployOutput.outputs.debeziumEndpoint.value
$existingConnectors = (Invoke-RestMethod "$debeziumEndpoint/connectors")
$existingConnectors | ForEach-Object -Begin { $i = 0 } -Process {
$i++
Write-Host "Deleting existing connector: $_ ($i of $($existingConnectors.Count))"
Invoke-RestMethod -Uri "$debeziumEndpoint/connectors/$($_)" -Method DELETE
}
$eventhubConnectionString = az eventhubs namespace authorization-rule keys list --resource-group $deployOutput.outputs.resourceGroupName.value --name RootManageSharedAccessKey --namespace-name $deployOutput.outputs.eventhubNamespaceName.value --output tsv --query 'primaryConnectionString'
$tables = Get-Content '.\sql\setup-cdc\.cdc-tables' | Join-String -Separator ','
$JSON = (Get-Content '.\debezium\config\sqlserver-connector-config.json' -Raw) `
-replace '#{EVENTHUBNAMESPACE_HUB_NAME}#', $deployOutput.outputs.eventhubName.value `
-replace '#{EVENTHUBNAMESPACE_NAME}#', $deployOutput.outputs.eventhubNamespaceName.value `
-replace '#{AZURE_SQL_SERVER_NAME}#', $deployOutput.outputs.sqlServerName.value `
-replace '#{AZURE_SQL_DATABASE_USER}#', $debeziumUser `
-replace '#{AZURE_SQL_DATABASE_PASSWORD}#', $debeziumPassword `
-replace '#{AZURE_SQL_DATABASE_NAME}#', $deployOutput.outputs.sqlDatabaseName.value `
-replace '#{EVENTHUBNAMESPACE_CONNECTIONSTRING}#', $eventhubConnectionString `
-replace '#{CDC_DATABASE_TABLES}#', $tables
Write-Host $JSON
Write-Host "Creating new connector..."
Invoke-RestMethod "$debeziumEndpoint/connectors/" -Method POST -Body $JSON -ContentType "application/json" -AllowInsecureRedirect
}
function Deploy-Database {
param (
[Parameter(Mandatory=$true)]
[object]$deployOutput,
[Parameter(Mandatory=$true)]
[string]$sqlServerPassword
)
Write-Host ""
Write-Host "Deploying Database..."
$setupDbScript = Get-Content '.\sql\seed\setup-db.sql' -Raw
Invoke-Sqlcmd -ServerInstance "$($deployOutput.outputs.sqlServerName.value).database.windows.net" -Database "$($deployOutput.outputs.sqlDatabaseName.value)" -Username "$($deployOutput.outputs.sqlServerUser.value)" -Password "$sqlServerPassword" -Query $setupDbScript
}
function Deploy-CDC {
param (
[Parameter(Mandatory=$true)]
[object]$deployOutput,
[Parameter(Mandatory=$true)]
[string]$sqlServerPassword,
[Parameter(Mandatory=$true)]
[string]$debeziumUser,
[Parameter(Mandatory=$true)]
[string]$debeziumPassword
)
Write-Host ""
Write-Host "Deploying CDC..."
#prepare cdc user
$cdcUserScript = (Get-Content '.\sql\setup-cdc\01-setup-cdc-user.sql' -Raw) `
-replace '#{USER}#', $debeziumUser `
-replace '#{PASSWORD}#', $debeziumPassword
Invoke-Sqlcmd -ServerInstance "$($deployOutput.outputs.sqlServerName.value).database.windows.net" -Database "$($deployOutput.outputs.sqlDatabaseName.value)" -Username "$($deployOutput.outputs.sqlServerUser.value)" -Password "$sqlServerPassword" -Query $cdcUserScript
$enableCdcScript = (Get-Content '.\sql\setup-cdc\02-enable-cdc.sql' -Raw)
Invoke-Sqlcmd -ServerInstance "$($deployOutput.outputs.sqlServerName.value).database.windows.net" -Database "$($deployOutput.outputs.sqlDatabaseName.value)" -Username "$($deployOutput.outputs.sqlServerUser.value)" -Password "$sqlServerPassword" -Query $enableCdcScript
$tables = Get-Content '.\sql\setup-cdc\.cdc-tables'
$tables | ForEach-Object {
$schema = ($_.Split('.'))[0]
$table = ($_.Split('.'))[1]
Write-Host "Deploying CDC for $schema.$table..."
$cdcTableScript = (Get-Content '.\sql\setup-cdc\03-enable-cdc-table.sql' -Raw) `
-replace '#{SCHEMA}#', $schema `
-replace '#{TABLE}#', $table
Invoke-Sqlcmd -ServerInstance "$($deployOutput.outputs.sqlServerName.value).database.windows.net" -Database "$($deployOutput.outputs.sqlDatabaseName.value)" -Username "$($deployOutput.outputs.sqlServerUser.value)" -Password "$sqlServerPassword" -Query $cdcTableScript
}
}
Ensure-SqlServerModule
$deployOutput = az stack sub create --name ChangeCapture --location swedencentral --template-file ".\infrastructure\main.bicep" --parameters ".\infrastructure\main.bicepparam" sqlServerPassword=$sqlServerDbPassword --dm none --aou detachAll --yes | ConvertFrom-Json
if($null -ne $deployOutput){
Deploy-SqlFirewallRules -deployOutput $deployOutput
Deploy-Database -deployOutput $deployOutput -sqlServerPassword $sqlServerDbPassword
Deploy-CDC -deployOutput $deployOutput -sqlServerPassword $sqlServerDbPassword -debeziumUser $sqlServerDebeziumUser -debeziumPassword $sqlServerDebeziumPassword
Deploy-DebeziumConnector -deployOutput $deployOutput -sqlServerPassword $sqlServerDbPassword -debeziumUser $sqlServerDebeziumUser -debeziumPassword $sqlServerDebeziumPassword
Write-Host "Deployment completed successfully."
}
Debezium event
Once Debezium is configured, we can inspect incoming messages in the Event Hub and observe various types of messages and payloads. With our configuration, Debezium captures both schema and table changes for the specified tables. Initially, when the connector is introduced, it emits schema changes to the Event Hub, providing the structure of the tables. After this initial setup, we can insert a row into the Products table to see how the changes are captured and represented as an event.
Insert SQL
INSERT INTO [dbo].[Products]
([Name]
,[Description])
VALUES
('Test Product 1'
,'Test Description 1')
Insert Event
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "ProductID"
},
{
"type": "string",
"optional": true,
"field": "Name"
},
{
"type": "string",
"optional": true,
"field": "Description"
}
],
"optional": true,
"name": "evh-product-dev.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "ProductID"
},
{
"type": "string",
"optional": true,
"field": "Name"
},
{
"type": "string",
"optional": true,
"field": "Description"
}
],
"optional": true,
"name": "evh-product-dev.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "change_lsn"
},
{
"type": "string",
"optional": true,
"field": "commit_lsn"
},
{
"type": "int64",
"optional": true,
"field": "event_serial_no"
}
],
"optional": false,
"name": "io.debezium.connector.sqlserver.Source",
"field": "source"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
}
],
"optional": false,
"name": "evh-product-dev.Envelope",
"version": 2
},
"payload": {
"before": null,
"after": {
"ProductID": 1,
"Name": "Test Product 1",
"Description": "Test Description 1"
},
"source": {
"version": "2.7.3.Final",
"connector": "sqlserver",
"name": "SQLAzure",
"ts_ms": 1732648704753,
"snapshot": "false",
"db": "sqldbproductsdev",
"sequence": null,
"ts_us": 1732648704753000,
"ts_ns": 1732648704753000000,
"schema": "dbo",
"table": "Products",
"change_lsn": "00000030:000019c0:001f",
"commit_lsn": "00000030:000019c0:0021",
"event_serial_no": 1
},
"transaction": null,
"op": "c",
"ts_ms": 1732648710033,
"ts_us": 1732648710033250,
"ts_ns": 1732648710033250786
}
}
The Event payload has a schema
node that let us know the entire table schema
before
and after
the operation. The payload
node have all the good stuff about before
and after
data, aswell as some additional properties
op c
for create, u
for update and d
for delete
ts_ms timestamp of the event
schema Which schema got the change
table Which table got the change
We can also look at a Update event where the payload node contains a before
value with the previous data but also a after
node which contains the data as is looked like when the event was generated.
Update SQL
UPDATE Products
SET Name = 'Updated Test Product 1'
WHERE ProductID = 1
Update Event
{
"schema": {
//removed for brevity
},
"payload": {
"before": {
"ProductID": 1,
"Name": "Test Product 1",
"Description": null
},
"after": {
"ProductID": 1,
"Name": "Updated Test Product 1",
"Description": "Test Description 1"
},
"source": {
"version": "2.7.3.Final",
"connector": "sqlserver",
"name": "SQLAzure",
"ts_ms": 1732648858363,
"snapshot": "false",
"db": "sqldbproductsdev",
"sequence": null,
"ts_us": 1732648858363000,
"ts_ns": 1732648858363000000,
"schema": "dbo",
"table": "Products",
"change_lsn": "00000030:00001dc8:0005",
"commit_lsn": "00000030:00001dc8:0007",
"event_serial_no": 2
},
"transaction": null,
"op": "u",
"ts_ms": 1732648860467,
"ts_us": 1732648860467240,
"ts_ns": 1732648860467240123
}
}
The delete is pretty much the same as a insert but reversed.
Delete SQL
DELETE Products
WHERE ProductID = 1
Delete Event
{
"schema": {
//removed for brevity
},
"payload": {
"before": {
"ProductID": 1,
"Name": "Updated Test Product 1",
"Description": "Test Description 1"
},
"after": null,
"source": {
"version": "2.7.3.Final",
"connector": "sqlserver",
"name": "SQLAzure",
"ts_ms": 1732648955610,
"snapshot": "false",
"db": "sqldbproductsdev",
"sequence": null,
"ts_us": 1732648955610000,
"ts_ns": 1732648955610000000,
"schema": "dbo",
"table": "Products",
"change_lsn": "00000030:00001e70:0003",
"commit_lsn": "00000030:00001e70:0007",
"event_serial_no": 1
},
"transaction": null,
"op": "d",
"ts_ms": 1732648960465,
"ts_us": 1732648960465589,
"ts_ns": 1732648960465589876
}
}
Wrapping up
In this article, we’ve set up a complete CDC pipeline from SQL Server to Azure Event Hub using Debezium. This foundation enables real-time data streaming, which can be consumed by services like Azure Stream Analytics or Azure AI Search.
Next, we’ll explore how to process these events and index them efficiently for search.