What makes Apache Spark + Delta Tables Nifty?
In today’s fast-paced world, real-time analytics has become crucial for organizations to gain valuable insights and make data-driven decisions. One of the key components that enables real-time analytics is Change Data Capture (CDC). CDC allows us to identify and capture changes made to data in a database, providing a reliable and efficient way to keep track of data modifications.
CDC plays a vital role in real-time analytics by enabling data engineers to stay up-to-date with the latest changes in the database. By capturing and streaming these changes, CDC ensures the accuracy and reliability of the data used for analytics. It allows organizations to synchronize data, build data warehouses, perform real-time analytics, and maintain data consistency across different systems.
In this blog, we will explore the significance of CDC in real-time analytics and how it can be implemented using Debezium, an open-source distributed platform for capturing and streaming changes in databases. We will dive into the concept of CDC, its purpose in data engineering, and the benefits it brings to organizations. Additionally, we will discuss Debezium and its seamless integration with various databases, such as MySQL, PostgreSQL, MongoDB, Oracle, and SQL Server, making it a powerful tool for CDC implementation.
By leveraging Debezium’s capabilities, organizations can process CDC from Apache Kafka and utilize the power of Apache Spark for real-time analytics. We will explore the steps involved in enabling CDC in different databases, deploying Debezium using Docker or Kubernetes, and configuring the Debezium connector to capture and stream CDC into Apache Kafka. Furthermore, we will demonstrate how to process CDC using Apache Spark, enabling organizations to analyze streaming data and drive insights in real-time.
Let’s get started with the blog…
What is the CDC?
CDC, or Change Data Capture, is a method used to identify and capture changes made to the data in a database. The CDC comprises old values of the row, new values of the row, database operation, timestamp, etc. These details are stored in separate log tables. The log tables maintain a record of all the historical modifications made to the database. However, before tracking the changed data, CDC must be enabled for the database or a particular table of the database. There are various ways to enable the CDC.
Purpose of CDC in Data Engineering
We can use CDC for various purposes, such as data synchronization, data warehousing, real-time analytics, and maintaining data consistency across different systems. With CDC, data engineers can stay up-to-date with the latest changes in the database and ensure the accuracy and reliability of the data they work with.
What is Debezium?
Debezium is an open-source distributed platform for capturing and streaming changes in databases. It provides a reliable and scalable Change Data Capture (CDC) solution by seamlessly integrating with different databases, such as MySQL, PostgreSQL, MongoDB, Oracle, and SQL Server. Debezium captures the changes made to the database and converts them into events that other applications or systems can consume in real time.
Debezium is built using Apache Kafka connect APIs. It can connect to databases, capture CDC, and stream the data as events to the Apache Kafka topic. To know more about Debezium, head over to the official site.
Enable CDC in different databases
- To enable CDC in SQL server, follow this blog.
- To enable CDC in MySql, follow this blog.
- To enable CDC in PostgreSQL, follow this blog.
Deploy Debezium
Using Docker images
Debezium requires a Kafka cluster. Deploy Kafka using this docker image. We also need the Zookeeper. Deploy the Zookeeper using this docker image.
We can deploy Debezium using its official docker image. Here is the link to the docker image.
I recommend you deploy Zookeeper, Kafka, and Debezium on cloud VMs like EC2.
Configuring Debezium connector
Once we have Debezium running, we need to configure the Debezium to connect to the database. So that Debezium can start pulling CDC and put that into a Kafka topic. Debezium provides Kafka-connect REST APIs that we can use to configure the Debezium connector.
URL to call ⇒ http://
:
To know more about the Kafka connect endpoints, head over to the official documentation.
The JSON below is request body of the above mentioned REST API endpoint.
{
"name": "<name of the connector>",
"config": {
"topic.prefix": "<Kafka Topic>",
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector | io.debezium.connector.mysql.MySqlConnector | io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<Database Server URL>",
"database.port": "<Database Server Port>",
"database.user": "<Database User>",
"database.encrypt": <false | true>,
"database.password": "<Database Password>",
"database.names": "<Name of the Database>",
"database.server.name": "<Database Identifier>",
"table.include.list": "<Comma separated name of tables that we want to track",
"schema.history.internal.kafka.bootstrap.servers": "<kafka bootstrap server to track schema changes>",
"schema.history.internal.kafka.topic": "<Kafka topic to store schema changes>",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(.*)",
"transforms.Reroute.topic.replacement": "<kafka topic>",
"column.exclude.list": "<list of columns that we don't track",
"skip-messages-without-change": true,
"decimal.handling.mode": "double",
"snapshot.mode": "schema_only"
}
}
Use Postman or any API testing tool to call the Debezium URL. And POST the JSON payload to create the Debezium connector.
Bonus tip: When you setup a Debezium connector, changes from a tables get it’s own Kafka topic. Reading from seperate Kafka topics is a tedious task. To avoid that I used SMT or single message transformation. Here I used the reroute SMT that takes messages from tables and reroutes those to a single topic.
Processing CDC from Apache Kafka
Before processing or getting CDC into the Kafka topic, ensure you have the following running.
- You have an Apache Kafka cluster running on either Docker or K8s.
- You have a Database with CDC enabled.
- You have configured the Debezium connector.
Bonus tip: Use a Kafka queue visualizer to visually confirm Kafka topics and messages in the topics. When you set up Debezium properly as mentioned above, you will see CDC as JSON messages into the Kafka topic. For this blog I used UI for Apache Kafka.
If you have checked the above three points, you should be able to see the CDC as JSON messages once you make any changes to a table.
Running Apache Spark
Now that we are getting a stream of the table changes into Kafka topic, Let’s focus on Apache Spark. We can run Apache Spark using the host machine as a single node cluster but I recommend using Amazon EMR or Databricks community edition for this example. If you are using Amazon EMR, you need to add the following packages to the spark-submit command.
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0 [script_file_name.py]
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "<Kafka bootstrap server address")
.option("subscribe", "<Kafka topic name>").option("startingOffsets", "latest")
.load()
)
The CDC as JSON message has a key and value, and we only need to focus on the value part of the message. Since the value is a JSON, we can cast the value to a string like below.
df = df.selectExpr("CAST(value AS STRING)")
Finally, we should process and write the CDC JSON message as follows.
query = (
df.writeStream.outputMode("append")
.format("memory")
.queryName("<identifier>")
.foreachBatch("<python function to process the Kafka messages>")
.start()
)
query.awaitTermination()
Spark processes streaming data in micro-batches. It is a compromise between near real-time processing, ease of programming, and fault-tolerance benefits of batch processing. We need to provide a Python function to the .foreachBatch
method to facilitate the processing of the messages. Implementation of such a function varies based on the business rules we want to apply to the streaming data. Notice that the code snippet writes the message to cluster memory instead of any table. Since writing to and reading from tables adds latency to the whole process, we want to avoid it in real-time analytics. In the batch process function, we should add code to write the processed micro-batch into a data warehouse that enables real-time analytics and a dashboard.
The above image demonstrates the processing of CDC using Debezium, Kafka, and Apache Spark.
Example CDC JSON messages
Let’s take a look at the JSON messages we get in a Kafka topic. The JSON structure is long and has many keys, but we should focus on the important part of the message.
Insert Event
...
"payload": {
"before": null,
"after": {
"Column 1": "Value 1",
"Column 2": "Value 2",
"Column 3": "Value 3",
"Column 4": "Value 4",
"Column 5": "Value 5"
},
"source": {
"version": "2.2.0.Alpha3",
"connector": "<type connector>",
"name": "<name of the connector>",
"ts_ms": 1705323942783,
"snapshot": "false",
"db": "<name of the database>",
"sequence": null,
"schema": "<name of the schema>",
"table": "<name of the table>",
"change_lsn": "0000c0cf:00004838:0011",
"commit_lsn": "0000c0cf:00004838:0049",
"event_serial_no": 1
},
"op": "c",
"ts_ms": 1705323944519,
"transaction": null
}
Update Event
...
"payload": {
"before": {
"Column 1": "Value 1",
"Column 2": "Value 2",
"Column 3": "Value 3",
"Column 4": "Value 4",
"Column 5": "Value 5"
},
"after": {
"Column 1": "new Value 1",
"Column 2": "new Value 2",
"Column 3": "new Value 3",
"Column 4": "new Value 4",
"Column 5": "new Value 5"
},
"source": {
"version": "2.2.0.Alpha3",
"connector": "<type connector>",
"name": "<name of the connector>",
"ts_ms": 1705323942783,
"snapshot": "false",
"db": "<name of the database>",
"sequence": null,
"schema": "<name of the schema>",
"table": "<name of the table>",
"change_lsn": "0000c0cf:00004838:0011",
"commit_lsn": "0000c0cf:00004838:0049",
"event_serial_no": 1
},
"op": "u",
"ts_ms": 1705323944519,
"transaction": null
}
Delete Event
...
"payload": {
"before": {
"Column 1": "Value 1",
"Column 2": "Value 2",
"Column 3": "Value 3",
"Column 4": "Value 4",
"Column 5": "Value 5"
},
"after": null,
"source": {
"version": "2.2.0.Alpha3",
"connector": "<type connector>",
"name": "<name of the connector>",
"ts_ms": 1705323942783,
"snapshot": "false",
"db": "<name of the database>",
"sequence": null,
"schema": "<name of the schema>",
"table": "<name of the table>",
"change_lsn": "0000c0cf:00004838:0011",
"commit_lsn": "0000c0cf:00004838:0049",
"event_serial_no": 1
},
"op": "d",
"ts_ms": 1705323944519,
"transaction": null
}
The shortened version of the JSON message shows the change in data in a table. We can use the values in “before”, “after”, “op”, “table”, and “schema” keys to facilitate real-time analytics and data-synchronization. To know more about the payload, head over to the official documentation.
Processing each micro-batch
Knowing the JSON event structure, let’s concentrate on message processing, that is, putting into practice the method we must supply to the aforementioned foreachBatch
. The following sections should ideally be included in the method.
- Get the micro-batch and un-nest the JSON messages from it.
- Apply business rules and transformations while processing JSON messages.
- Lastly, enter the processed JSON message into the data warehouse.
I compared the values of the before and after keys. And used the database operation to prepare an SQL statement. Finally, I applied the SQL statement to the data warehouse table. For this blog, I used poweBI to create interactive and live dashboard from the data warehouse. This is how I used CDC, Debezium, Apache Kafka and Spark to perform real-time analytics. p.s. Creating dashboard by using poweBI is out of the scope of this blog.