Real-Time Data Streaming and Search Indexing with Azure - Part 3 - Setting up CDC

Real-Time Data Streaming and Search Indexing with Azure - Part 3 - Setting up CDC

In the previous part of this series, we did the initial work for the system infrastructure using Azure Bicep. Today, we'll get deeper into setting up a Change Data Capture (CDC) pipeline to Azure Event Hub. CDC allows us to capture and stream real-time changes from our database, making it a powerful tool for real-time analytics, data integration, and, most importantly for this lab, enriching search indexes.

This guide will walk you through the key steps required to enable CDC on a SQL Server database, configure Debezium for change tracking, and stream events to Azure Event Hub in Azure.

Setup Essentials

To set up the CDC pipeline, we’ll need the following:

  1. A simple SQL data schema for demonstration purposes.
  2. A Debezium user with permissions to connect to the database and read CDC data.
  3. CDC enabled at both the SQL Server level and specific tables.
  4. A properly configured Debezium connector to stream changes to Azure Event Hub.

Data structure

For this example, we’re using a schema that demonstrates the basics of CDC with Debezium. The schema includes Products and Reviews tables, each of which can have multiple generic tags associated with them.

While the schema isn’t optimized for efficient search operations (it relies on multiple joins), this setup provides an opportunity to experiment with Azure AI Search and explore its potential for improving query performance.

SQL Schema

IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[Products]') AND type in (N'U'))
BEGIN
    CREATE TABLE Products (
        ProductID INT PRIMARY KEY IDENTITY(1,1),
        Name NVARCHAR(255),
        Description NVARCHAR(MAX)
    );
END;

IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[Reviews]') AND type in (N'U'))
BEGIN
    CREATE TABLE Reviews (
        ReviewID INT PRIMARY KEY IDENTITY(1,1),
        ProductID INT FOREIGN KEY REFERENCES Products(ProductID),
        Rating INT CHECK (Rating >= 1 AND Rating <= 5),
        Comment NVARCHAR(MAX)
    );
END;

IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[Tags]') AND type in (N'U'))
BEGIN
    CREATE TABLE Tags (
        TagID INT PRIMARY KEY IDENTITY(1,1),
        Name NVARCHAR(50)
    );
END;

IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[ProductTags]') AND type in (N'U'))
BEGIN
    CREATE TABLE ProductTags (
        ProductID INT FOREIGN KEY REFERENCES Products(ProductID),
        TagID INT FOREIGN KEY REFERENCES Tags(TagID),
        PRIMARY KEY (ProductID, TagID)
    );
END;

IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[ReviewTags]') AND type in (N'U'))
BEGIN
    CREATE TABLE ReviewTags (
        ReviewID INT FOREIGN KEY REFERENCES Reviews(ReviewID),
        TagID INT FOREIGN KEY REFERENCES Tags(TagID),
        PRIMARY KEY (ReviewID, TagID)
    );
END;

Debezium user

To allow Debezium to connect and read from the database, we’ll create a dedicated SQL user. This user must have access to CDC tables and permissions to track changes.

Edit the placeholders #{USER}# and #{PASSWORD}# in the following script to set your desired username and password:

SQL Debezium user

IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = '#{USER}#')
BEGIN
   CREATE USER [#{USER}#] WITH PASSWORD = '#{PASSWORD}#'
   PRINT 'User #{USER}# created.'
END
ELSE
BEGIN
   PRINT 'User #{USER}# already exists.'
END
GO

IF NOT EXISTS (SELECT 1 
            FROM sys.database_role_members drm
            JOIN sys.database_principals dp ON drm.member_principal_id = dp.principal_id
            WHERE dp.name = '#{USER}#' AND drm.role_principal_id = DATABASE_PRINCIPAL_ID('db_owner'))
BEGIN
   ALTER ROLE [db_owner] ADD MEMBER [#{USER}#]
   PRINT 'User #{USER}# added to db_owner role.'
END
ELSE
BEGIN
   PRINT 'User #{USER}# is already a member of db_owner role.'
END
GO

Note: Granting db_owner permissions is a bit excessive for CDC access, it may pose security risks. For production environments, consider defining a custom role with the minimum permissions needed for Debezium.

Enable CDC

To enable CDC, we need to activate it at the database level and configure it for specific tables.

Enable CDC

EXEC sys.sp_cdc_enable_db
GO

Repeat the following script for each table you want to track. Replace #{SCHEMA}# and #{TABLE}# with the schema and table names:

Table CDC

IF NOT EXISTS (
   SELECT 
      name AS table_name,
      OBJECT_SCHEMA_NAME(object_id) AS table_schema,
      is_tracked_by_cdc
   FROM sys.tables
   WHERE is_tracked_by_cdc = 1 
     AND OBJECT_SCHEMA_NAME(object_id) = '#{SCHEMA}#' 
     AND name = '#{TABLE}#'
)
BEGIN
   -- Enable CDC on selected table
   EXEC sys.sp_cdc_enable_table 
      @source_schema = N'#{SCHEMA}#', 
      @source_name = N'#{TABLE}#', 
      @role_name = null, 
      @supports_net_changes = 0
END
GO

Setup Debezium connector

The Debezium connector enables Change Data Capture (CDC) by streaming real-time changes from your database tables to downstream systems. Each connector is designed for a specific database, such as MySQL, PostgreSQL, or SQL Server, and works with Apache Kafka to manage and distribute change events. To setup a connector we simply need to do a POST request to the debezium instance that is running with a confirugation file.

An example of such a configuration file for SQL Server in Azure would be this.
In a working example we would need to replace the #{KEY}# Key values with actual data.

Connector Setup

$debeziumEndpoint = "#{DEBEZIUM_ENDPOINT}#"
$JSON = Get-Content 'sqlserver-connector-config.json' | Out-String
Invoke-RestMethod "https:/$debeziumEndpoint/connectors/" -Method POST -Body $JSON -ContentType "application/json" -AllowInsecureRedirect

Json Configuration

{
  "name": "#{EVENTHUBNAMESPACE_HUB_NAME}#",
  "config": {
    "snapshot.mode": "schema_only",
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.hostname": "#{AZURE_SQL_SERVER_NAME}#.database.windows.net",
    "database.port": "1433",
    "database.user": "#{AZURE_SQL_DATABASE_USER}#",
    "database.password": "#{AZURE_SQL_DATABASE_PASSWORD}#",
    "database.names": "#{AZURE_SQL_DATABASE_NAME}#",
    "driver.encrypt": "false",
    "driver.trustServerCertificate": "true",
    "schema.history.internal.kafka.bootstrap.servers": "#{EVENTHUBNAMESPACE_NAME}#.servicebus.windows.net:9093",
    "schema.history.internal.kafka.topic": "#{EVENTHUBNAMESPACE_HUB_NAME}#",
    "schema.history.internal.consumer.security.protocol": "SASL_SSL",
    "schema.history.internal.consumer.sasl.mechanism": "PLAIN",
    "schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"#{EVENTHUBNAMESPACE_CONNECTIONSTRING}#\";",
    "schema.history.internal.producer.security.protocol": "SASL_SSL",
    "schema.history.internal.producer.sasl.mechanism": "PLAIN",
    "schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"#{EVENTHUBNAMESPACE_CONNECTIONSTRING}#\";",
    "table.include.list": "#{CDC_DATABASE_TABLES}#",
    "tombstones.on.delete": "false",
    "topic.prefix": "SQLAzure",
    "transforms": "Reroute",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "(.*)",
    "transforms.Reroute.topic.replacement": "#{EVENTHUBNAMESPACE_HUB_NAME}#"
  }
}

The configuration file specifies several key properties that must be defined to set up the Debezium connector. For a detailed explanation of all available properties, refer to the Debezium documentation for SQL Server connectors.

Here’s a summary of the placeholders you need to replace with your specific values:

#{EVENTHUBNAMESPACE_HUB_NAME}#: The name of the Event Hub (not the namespace) where CDC changes will be published.
#{AZURE_SQL_SERVER_NAME}#: The name of the Azure SQL Server that Debezium will connect to.
#{AZURE_SQL_DATABASE_USER}#: The username created earlier, used by Debezium to authenticate with the database.
#{AZURE_SQL_DATABASE_PASSWORD}#: The password for the Debezium username.
#{AZURE_SQL_DATABASE_NAME}#: The name of the database that Debezium will monitor for changes.
#{EVENTHUBNAMESPACE_NAME}#: The name of the Event Hub namespace.
#{EVENTHUBNAMESPACE_CONNECTIONSTRING}#: The connection string for the Event Hub namespace, used for authentication and connectivity.
#{CDC_DATABASE_TABLES}#: A comma-separated list of the tables to be monitored in the CDC process.

Each of these fields plays a role in ensuring the Debezium connector can capture and transmit change events reliably. Ensure all values are correctly set before deploying the configuration.

Updated Deployment script

For my spceific solution I have created a deployment script that I mention in the past article. This script is to simplyfy and focus on my experinemt where i dont want to mix github actions and deployment methodologies to the article.

My updated deployment script has greatly improved and does not only deploy the infrastructure. But after a successful the deploy i query certain properties to setup Firewalls, database structure, enable cdc and configure the debezium connector.

Deployment script

param(
   [string]$environmentName = "dev",
   [string]$sqlServerDbPassword = "p@ssW0rd",
   [string]$sqlServerDebeziumUser = "debezium-usr",
   [string]$sqlServerDebeziumPassword = "p@ssW0rd"
)

function Ensure-SqlServerModule {
   if (-not (Get-Module -ListAvailable -Name SqlServer)) {
      Write-Host "SqlServer module not found. Installing..."
      Install-Module -Name SqlServer -Force -Scope CurrentUser
   } else {
      Write-Host "SqlServer module is already installed."
   }
}

function Get-MyIp {
   return (Invoke-RestMethod http://ipinfo.io/json).ip
}

function Get-FirewallIps {
   param (
      [Parameter(Mandatory=$true)]
      [object]$deployOutput
   )
   $myIp = Get-MyIp
   return @($myIp) + $deployOutput.outputs.debeziumOutboundIps.value
}

function Get-CurrentFirewallRules {
   param (
      [Parameter(Mandatory=$true)]
      [string]$sqlServerId
   )
   return az sql server firewall-rule list --ids $sqlServerId --query "[].{id:id, name:name, startIpAddress:startIpAddress, endIpAddress:endIpAddress}" -o json | ConvertFrom-Json
}

function Remove-UnwantedSqlFirewallRules {
   param (
      [Parameter(Mandatory=$true)]
      [array]$currentFirewallRules,
      [Parameter(Mandatory=$true)]
      [array]$fireWallIps
   )
   $currentFirewallRules | ForEach-Object {
      if ($fireWallIps -notcontains $_.startIpAddress) {
         Write-Host "Deleting firewall rule: $($_.name)"
         az sql server firewall-rule delete --ids $_.id
      }
   }
}

function Add-MissingSqlFirewallRules {
   param (
      [Parameter(Mandatory=$true)]
      [array]$missingFirewallIps,
      [Parameter(Mandatory=$true)]
      [object]$deployOutput
   )
   $missingCount = $missingFirewallIps.Count
   $missingFirewallIps | ForEach-Object -Begin { $i = 0 } -Process {
      $i++
      Write-Host "Creating firewall rule for IP: $_ ($i of $missingCount)"
      az sql server firewall-rule create -g $deployOutput.outputs.resourceGroupName.value -s $deployOutput.outputs.sqlServerName.value -n "ClientIPAddress_$($_)_$($_)" --start-ip-address $_ --end-ip-address $_ -o none
   }
}

function Deploy-SqlFirewallRules {
   param (
      [Parameter(Mandatory=$true)]
      [object]$deployOutput
   )
   Write-Host ""
   Write-Host "Deploying firewall rules..."
   $fireWallIps = Get-FirewallIps -deployOutput $deployOutput
   $currentFirewallRules = Get-CurrentFirewallRules -sqlServerId $deployOutput.outputs.sqlServerId.value
   if ($null -ne $currentFirewallRules -and $currentFirewallRules.Count -gt 0) {
      Remove-UnwantedSqlFirewallRules -currentFirewallRules $currentFirewallRules -fireWallIps $fireWallIps
   }
   $missingFirewallIps = $fireWallIps | Where-Object { $currentFirewallRules.startIpAddress -notcontains $_ }
   if ($missingFirewallIps -ne $null -and $missingFirewallIps.Count -gt 0) {
      Add-MissingSqlFirewallRules -missingFirewallIps $missingFirewallIps -deployOutput $deployOutput
   } else {
      Write-Host "No missing firewall IPs to add."
   }
}

function Deploy-DebeziumConnector {
   param (
      [Parameter(Mandatory=$true)]
      [object]$deployOutput,
      [Parameter(Mandatory=$true)]
      [string]$sqlServerPassword,
      [Parameter(Mandatory=$true)]
      [string]$debeziumUser,
      [Parameter(Mandatory=$true)]
      [string]$debeziumPassword
   )
   Write-Host ""
   Write-Host "Deploying Debezium connector..."
   $debeziumEndpoint = $deployOutput.outputs.debeziumEndpoint.value
   $existingConnectors = (Invoke-RestMethod "$debeziumEndpoint/connectors")
   $existingConnectors | ForEach-Object -Begin { $i = 0 } -Process {
      $i++
      Write-Host "Deleting existing connector: $_ ($i of $($existingConnectors.Count))"
      Invoke-RestMethod -Uri "$debeziumEndpoint/connectors/$($_)" -Method DELETE
   }

   $eventhubConnectionString = az eventhubs namespace authorization-rule keys list --resource-group $deployOutput.outputs.resourceGroupName.value --name RootManageSharedAccessKey --namespace-name $deployOutput.outputs.eventhubNamespaceName.value --output tsv --query 'primaryConnectionString'
   $tables = Get-Content '.\sql\setup-cdc\.cdc-tables' | Join-String -Separator ','
   $JSON = (Get-Content '.\debezium\config\sqlserver-connector-config.json' -Raw) `
      -replace '#{EVENTHUBNAMESPACE_HUB_NAME}#', $deployOutput.outputs.eventhubName.value `
      -replace '#{EVENTHUBNAMESPACE_NAME}#', $deployOutput.outputs.eventhubNamespaceName.value `
      -replace '#{AZURE_SQL_SERVER_NAME}#', $deployOutput.outputs.sqlServerName.value `
      -replace '#{AZURE_SQL_DATABASE_USER}#', $debeziumUser `
      -replace '#{AZURE_SQL_DATABASE_PASSWORD}#', $debeziumPassword `
      -replace '#{AZURE_SQL_DATABASE_NAME}#', $deployOutput.outputs.sqlDatabaseName.value `
      -replace '#{EVENTHUBNAMESPACE_CONNECTIONSTRING}#', $eventhubConnectionString `
      -replace '#{CDC_DATABASE_TABLES}#', $tables 

   Write-Host $JSON
   Write-Host "Creating new connector..."
   Invoke-RestMethod "$debeziumEndpoint/connectors/" -Method POST -Body $JSON -ContentType "application/json" -AllowInsecureRedirect
}

function Deploy-Database {
   param (
      [Parameter(Mandatory=$true)]
      [object]$deployOutput,
      [Parameter(Mandatory=$true)]
      [string]$sqlServerPassword
   )
   Write-Host ""
   Write-Host "Deploying Database..."   
   $setupDbScript = Get-Content '.\sql\seed\setup-db.sql' -Raw   
   Invoke-Sqlcmd -ServerInstance "$($deployOutput.outputs.sqlServerName.value).database.windows.net" -Database "$($deployOutput.outputs.sqlDatabaseName.value)" -Username "$($deployOutput.outputs.sqlServerUser.value)" -Password "$sqlServerPassword" -Query $setupDbScript
}

function Deploy-CDC {
   param (
      [Parameter(Mandatory=$true)]
      [object]$deployOutput,
      [Parameter(Mandatory=$true)]
      [string]$sqlServerPassword,
      [Parameter(Mandatory=$true)]
      [string]$debeziumUser,
      [Parameter(Mandatory=$true)]
      [string]$debeziumPassword
   )
   Write-Host ""
   Write-Host "Deploying CDC..."
   
   #prepare cdc user
   $cdcUserScript = (Get-Content '.\sql\setup-cdc\01-setup-cdc-user.sql' -Raw) `
      -replace '#{USER}#', $debeziumUser `
      -replace '#{PASSWORD}#', $debeziumPassword
   
   Invoke-Sqlcmd -ServerInstance "$($deployOutput.outputs.sqlServerName.value).database.windows.net" -Database "$($deployOutput.outputs.sqlDatabaseName.value)" -Username "$($deployOutput.outputs.sqlServerUser.value)" -Password "$sqlServerPassword" -Query $cdcUserScript

   $enableCdcScript = (Get-Content '.\sql\setup-cdc\02-enable-cdc.sql' -Raw)
   Invoke-Sqlcmd -ServerInstance "$($deployOutput.outputs.sqlServerName.value).database.windows.net" -Database "$($deployOutput.outputs.sqlDatabaseName.value)" -Username "$($deployOutput.outputs.sqlServerUser.value)" -Password "$sqlServerPassword" -Query $enableCdcScript

   $tables = Get-Content '.\sql\setup-cdc\.cdc-tables'
   $tables | ForEach-Object {
      $schema = ($_.Split('.'))[0]
      $table = ($_.Split('.'))[1]
      
      Write-Host "Deploying CDC for $schema.$table..."
      $cdcTableScript = (Get-Content '.\sql\setup-cdc\03-enable-cdc-table.sql' -Raw) `
         -replace '#{SCHEMA}#', $schema `
         -replace '#{TABLE}#', $table
      Invoke-Sqlcmd -ServerInstance "$($deployOutput.outputs.sqlServerName.value).database.windows.net" -Database "$($deployOutput.outputs.sqlDatabaseName.value)" -Username "$($deployOutput.outputs.sqlServerUser.value)" -Password "$sqlServerPassword" -Query $cdcTableScript
   }
}

Ensure-SqlServerModule
$deployOutput = az stack sub create --name ChangeCapture --location swedencentral --template-file ".\infrastructure\main.bicep" --parameters ".\infrastructure\main.bicepparam" sqlServerPassword=$sqlServerDbPassword --dm none --aou detachAll --yes | ConvertFrom-Json
if($null -ne $deployOutput){
   Deploy-SqlFirewallRules -deployOutput $deployOutput
   Deploy-Database -deployOutput $deployOutput -sqlServerPassword $sqlServerDbPassword
   Deploy-CDC -deployOutput $deployOutput -sqlServerPassword $sqlServerDbPassword -debeziumUser $sqlServerDebeziumUser -debeziumPassword $sqlServerDebeziumPassword
   Deploy-DebeziumConnector -deployOutput $deployOutput -sqlServerPassword $sqlServerDbPassword -debeziumUser $sqlServerDebeziumUser -debeziumPassword $sqlServerDebeziumPassword
   Write-Host "Deployment completed successfully."
}

Debezium event

Once Debezium is configured, we can inspect incoming messages in the Event Hub and observe various types of messages and payloads. With our configuration, Debezium captures both schema and table changes for the specified tables. Initially, when the connector is introduced, it emits schema changes to the Event Hub, providing the structure of the tables. After this initial setup, we can insert a row into the Products table to see how the changes are captured and represented as an event.

Insert SQL

INSERT INTO [dbo].[Products]
           ([Name]
           ,[Description])
     VALUES
           ('Test Product 1'
           ,'Test Description 1')

Insert Event

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "ProductID"
          },
          {
            "type": "string",
            "optional": true,
            "field": "Name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "Description"
          }
        ],
        "optional": true,
        "name": "evh-product-dev.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "ProductID"
          },
          {
            "type": "string",
            "optional": true,
            "field": "Name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "Description"
          }
        ],
        "optional": true,
        "name": "evh-product-dev.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_us"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ns"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "change_lsn"
          },
          {
            "type": "string",
            "optional": true,
            "field": "commit_lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "event_serial_no"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.sqlserver.Source",
        "field": "source"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_us"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ns"
      }
    ],
    "optional": false,
    "name": "evh-product-dev.Envelope",
    "version": 2
  },
  "payload": {
    "before": null,
    "after": {
      "ProductID": 1,
      "Name": "Test Product 1",
      "Description": "Test Description 1"
    },
    "source": {
      "version": "2.7.3.Final",
      "connector": "sqlserver",
      "name": "SQLAzure",
      "ts_ms": 1732648704753,
      "snapshot": "false",
      "db": "sqldbproductsdev",
      "sequence": null,
      "ts_us": 1732648704753000,
      "ts_ns": 1732648704753000000,
      "schema": "dbo",
      "table": "Products",
      "change_lsn": "00000030:000019c0:001f",
      "commit_lsn": "00000030:000019c0:0021",
      "event_serial_no": 1
    },
    "transaction": null,
    "op": "c",
    "ts_ms": 1732648710033,
    "ts_us": 1732648710033250,
    "ts_ns": 1732648710033250786
  }
}

The Event payload has a schema node that let us know the entire table schema before and after the operation. The payload node have all the good stuff about before and after data, aswell as some additional properties

op c for create, u for update and d for delete
ts_ms timestamp of the event
schema Which schema got the change
table Which table got the change

We can also look at a Update event where the payload node contains a before value with the previous data but also a after node which contains the data as is looked like when the event was generated.

Update SQL

UPDATE Products
SET Name = 'Updated Test Product 1'
WHERE ProductID = 1

Update Event

{
  "schema": {    
    //removed for brevity
  },
  "payload": {
    "before": {
      "ProductID": 1,
      "Name": "Test Product 1",
      "Description": null
    },
    "after": {
      "ProductID": 1,
      "Name": "Updated Test Product 1",
      "Description": "Test Description 1"
    },
    "source": {
      "version": "2.7.3.Final",
      "connector": "sqlserver",
      "name": "SQLAzure",
      "ts_ms": 1732648858363,
      "snapshot": "false",
      "db": "sqldbproductsdev",
      "sequence": null,
      "ts_us": 1732648858363000,
      "ts_ns": 1732648858363000000,
      "schema": "dbo",
      "table": "Products",
      "change_lsn": "00000030:00001dc8:0005",
      "commit_lsn": "00000030:00001dc8:0007",
      "event_serial_no": 2
    },
    "transaction": null,
    "op": "u",
    "ts_ms": 1732648860467,
    "ts_us": 1732648860467240,
    "ts_ns": 1732648860467240123
  }
}

The delete is pretty much the same as a insert but reversed.

Delete SQL

DELETE Products
WHERE ProductID = 1

Delete Event

{
  "schema": {
    //removed for brevity
  },
  "payload": {
    "before": {
      "ProductID": 1,
      "Name": "Updated Test Product 1",
      "Description": "Test Description 1"
    },
    "after": null,
    "source": {
      "version": "2.7.3.Final",
      "connector": "sqlserver",
      "name": "SQLAzure",
      "ts_ms": 1732648955610,
      "snapshot": "false",
      "db": "sqldbproductsdev",
      "sequence": null,
      "ts_us": 1732648955610000,
      "ts_ns": 1732648955610000000,
      "schema": "dbo",
      "table": "Products",
      "change_lsn": "00000030:00001e70:0003",
      "commit_lsn": "00000030:00001e70:0007",
      "event_serial_no": 1
    },
    "transaction": null,
    "op": "d",
    "ts_ms": 1732648960465,
    "ts_us": 1732648960465589,
    "ts_ns": 1732648960465589876
  }
}

Wrapping up

In this article, we’ve set up a complete CDC pipeline from SQL Server to Azure Event Hub using Debezium. This foundation enables real-time data streaming, which can be consumed by services like Azure Stream Analytics or Azure AI Search.

Next, we’ll explore how to process these events and index them efficiently for search.