What makes Apache Spark + Delta Tables Nifty?
Linux Foundation Delta Tables is an open-source format used to store tables and their underlying data files in a data lake. It can be thought of as parquet files on steroids considering they are based on the parquet file format and extend its functionality to allow for features like ACID transactional guarantees, time travel, etc.
In this blog I would like to discuss a handful of features that delta tables provide that felt pretty amazing to me. This is not an exhaustive list, just the ones that I found really handy.
Time Travelling
This is a feature of delta tables that allows us to sneak a peek at the state of your tables, query and retrieve data as well as do point-in-time restores/recovery of your data, at any point in time, within the last 30 days, by default. This makes reverting changes in your tables, such as accidental deletes, a matter of just querying the data from the right version or point in time in the table’s life.
How is this possible, you ask? Whenever you create a table with the delta format, it also creates a _delta_logs directory within the table’s root directory.
Here logs of all transactions done towards a delta table including data inserts, updates, and deletes are recorded. The following query allows us to see the transactional history of our table:
describe history table-name
In this case, you can see I have recently deleted a record from my table. This change is denoted by the DELETE operation which created version 5 of the table. I can always query to see how the table looked like in version 4 using the following query:
select * from <table-name>@v4
If however, your objective is to see how the table was at a given point in time, you could do that as well. The following query illustrates restoring the table to how it was as of 2024-10-13T08:02:23.000+00:00
create or replace table <table-name>_restored as (
select * from <table-name> timestamp as of
'2024-10-13T08:02:23.000+00:00'
)
Data Skipping
This is a feature of delta tables that works under the hood. It allows skipping some of the files that collectively form the delta table when the data in them aren’t relevant to the query at hand. This helps prevent unnecessary file reads thus increasing performance as well as reducing compute and storage costs.
It entails the delta transaction logs maintaining statistical data about up to the first 32 columns of a delta table on a per-file basis. This works best with numerical columns and is pretty unhelpful when it comes to string columns of high cardinality. The statistical information being stored includes, but is not limited to, the count, min, and max of each of the 32 columns. It should however be understood that if one among these 32 columns is of type JSON, each property of the JSON object will be counted against the limit of 32. Therefore if the second column in your table is JSON with 31 properties, then statistics will only be calculated for the first 2 columns.
Thus when the following query is issued against a table where the 3rd column, let’s call it ratings, is of type integer with values ranging from 1 to 10 and if, hypothetically, among the 5 parquet files that make up the delta table under the hood, only the last two has records with a value greater than 5, only the last 2 files will be read to get the result:
select * from <table-name> where ratings = 7
This is because the statistical information from the first 3 files as stored by the delta logs will inform spark that the min and max values for the column ratings is within a range that is not applicable to the where clause of the query in question thus rendering them irrelevant to the result.
Change Data Feed (CDF)
This is a feature of delta tables that can optionally be enabled on individual tables or set as default for all the new tables that will be created. Once enabled an _change_data directory is created under the table’s root directory which stores the information about the MERGE, UPDATE, and DELETE operations, which constitutes a change in the table’s data. The _change_data directory along with the delta transaction logs are used to compute the CDF. This can be used to create a replayable stream of changes, to the data in a table, from any given version available within the data retention policy, which is 7 days by default.
CDF can be enabled using the following command during table creation
create table <table-name> (
column1 type, column2 type .... column type
) tblproperties (
delta.enableChangeDataFeed = true
)
Or on existing tables by using
alter table <table-name>
set tblproperties (
delta.enableChangeDataFeed = true
)
With CDF enabled, the way we propagate changes to downstream tables could be changed to only affect the records that are in question thus reducing costs as well as simplifying the implementation. The option readChangeFeed can be set to True while reading the table either as a batch or as a stream to be able to consume the change feed. Optionally you can also use the startingVersion or startingTimestamp properties to specify the starting point from where you would like the change feed to be read.
Upon being read, you are provided with the entire change feed from the specified starting version including inserts, deletes, update_preimage, and update_postimage. In batch mode, all the available changes up to the latest version are returned. When using spark.readStream however, it returns all the changes from the starting version up until the latest one and then continues to listen for any further changes made to the table thus providing us a stream of data changes for real-time processing.
One thing to be mindful of while using CDF is that it isn’t meant to be a replacement for a long-term solution that retains changes to a table, especially for compliance reasons. A Slowly Changing Dimensions (SCD) Type-2 table might be a better solution in such a scenario. Also, since the files in the _change_data directory are subject to the same data retention policy as that of the table, which is 7 days by default, a vacuum command if run will delete any older files.
Delta Lake AutoOptimize
This helps you solve the too many small files problem with your delta tables which might render the performance of your tables low. When each executor writes small portions of data to different files, over time, you end up with a situation where too many files have to be read just to get the required result for a query. The OPTIMIZE command coupled with z-ordering over partitioned columns, when done periodically serves to combat this issue.
The Delta Lake AutoOptimize on the other hand uses 2 features optimizeWrite and autoCompact under the hood to solve the above-mentioned problem in an automated way. Once enabled, every time a write operation happens, optimizeWrite will try to ensure that wherever possible data is combined into a smaller number of larger files rather than a large number of smaller files. autoCompact on the other hand is triggered once a partition or table hits a set number of smaller files and performs a compact operation with a target of 128 MB per file by default. When set, both of these configurations in tandem solve the problem of too many small files.
The following configs can be set either at the table level or at the session level to enable these
Table level:
delta.autoOptimize.autoCompact
delta.autoOptimize.optimizeWrite
Session level:
spark.databricks.delta.autoCompact.enabled
spark.databricks.delta.optimizeWrite.enabled
Conclusion
Thus, the features that delta tables bring to the table, pun intended, when coupled with Apache Spark for data processing and data lakes in general are fascinating to the point of making it a crucial tool for efficient data management to have in your arsenal. Leveraging these capabilities can significantly enhance performance, streamline workflows, and reduce costs.