SCD Type 3 in SQL and Python
After recently designing a few Slowly Changing Dimensions with a client, I thought it would be good to revisit an earlier post theme and expand on the SCD Types.
For more information on this blog series and Slowly Changing Dimensions with Databricks and Delta Lakes check out SCD Type 1 & SCD Type 2 from part 1 of the ‘From Warehouse to Lakehouse’ series:
All code examples are available in SQL and Python (PySpark) from my GitHub repo so you can follow along:
Notebook – ‘SQL\SCD-Type3 & Python\SCD-Type3
SCD Type 3 – Adding a Previous Value Column
A Type 3 Slowly Changing Dimension is definitely not one of the most common forms of Dimension design, to be honest I’ve not actually seen it used much in production systems. This is due to it only allowing for a limited history of change with the form of a ‘Previous Value’ column. Once a record has been amended, the existing value switches to a ‘Previous’ column and the primary value is updated. We therefore, would only be able to secure the previous iteration of that record.
Lets examine how that would look based on the employee dataset from the previous examples.
Scenario 3: For this example, three of our employees are relocating to offices in different countries. The following people require a new value for the [address_country] column, however we want to preserve a previous value history. There is also a new record to insert included in the data.
The values to UPSERT are featured in this dataset:
We want to MERGE this incoming dataset with our existing records but we would like to follow a similar operation which we have used in the past (See posts for SCD Type 1 & 2). If a record matches this time however, we need to write the value of [address_country] into a new [previous-country] column and update the current value. Using this method once again means that if the new dataset contains any new records, we would simply insert those rows.
Something along the lines of…
-- Merge..... -- ... -- ... -- based on the following column(s) ON scdType3.employee_id = scdType3NEW.employee_id -- if there is a match do this... WHEN MATCHED THEN UPDATE SET scdType3.previous_country = scdType3.address_country, scdType3.address_country = scdType3NEW.address_country -- if there is no match insert new row WHEN NOT MATCHED THEN INSERT *
Great! It’s as easy as that right?…
AnalysisException: cannot resolve `scdType3.previous_country` in UPDATE clause given columns scdType3.`employee_id`, scdType3.`first_name`, scdType3.`last_name`, scdType3.`gender`, scdType3.`address_country`
…ah, the schema of the dataset has now being changed. We don’t want that… or do we?
Schema Enforcement or Schema Evolution
Schema Enforcement or Schema Validation is a way to ensure the quality of our data is maintained. The reason why we define data types or set data quality rules like a telephone number expecting a certain number of digits, is to preserve that integrity and avoid errors in our datasets.
There are many occasions where we need these rigid sets of rules for our incoming data. An ETL (or ELT) pipeline with dependant tables would be an example of this. Incoming datasets which contain incorrect columns or data types may break processes further down stream. We obviously want to avoid that.
What about when a dataset changes? What about if we are expecting some incremental change and we want to allow that to happen?
In Databricks and Delta dataframes, how do we allow Schema Evolution?
In a standard dataframe WRITE operation there are two methods for appending to a dataset whilst adapting it’s schema at the same time:
The existing schema is merged with the incoming dataset. The following actions are accepted:
- Adding new columns (this is the most common scenario)
- Changing of data types from NullType -> any other type, or upcasts from ByteType -> ShortType -> IntegerType
Sometimes, a change to the datasets schema might be more of a significant change so an overwrite is required.
A schema Overwrite however, can only be applied for the following actions:
- Dropping a column
- Changing an existing column’s data type (in place)
- Renaming column names that differ only by case (e.g. “Foo” and “foo”)
Ok, so the ideal scenario would be to create a MERGE pattern that follows some of the previous examples of SCD’s but is dynamic and reusable. We should be able to select a chosen column that can be updated with a way to create a ‘Previous’ version of that column to maintain the history.
MERGE schema evolution
As attempted in the earlier example, we encountered an error when attempting to add a new ‘previous_country’ column as part of the MERGE
According the Automatic Schema Evolution section of the DELTA documentation:
So in order to achieve this pattern we need to approach this in a way where we can create a ‘ChangeRows’ table so we can use the updateALL() or insertALL() methods.
SCD Type 3 Example
To keep our variables (the name of a chosen column) agnostic in Databricks we can use WIDGETS:
-- Create WIDGET to pass in column name variable and keep it dynamic CREATE WIDGET TEXT changingColumn DEFAULT 'address_country';
# Create WIDGET to pass in column name variable and keep it dynamic dbutils.widgets.text("changingColumn", "address_country") changingColumn = dbutils.widgets.get("changingColumn")
The WIDGETs can now be used in our queries to form an incoming dataset for the new changes (with a JOINed column from the existing set). This can then be UNION joined with any entirely new rows which do not exist in the original dataset.
-- Create ChangeRows table (union of rows to amend and new rows to insert) CREATE OR REPLACE TEMP VIEW scd3ChangeRows AS SELECT scdType3New.*, scdType3.$changingColumn AS previous_$changingColumn FROM scdType3New INNER JOIN scdType3 ON scdType3.employee_id = scdType3New.employee_id AND scdType3.$changingColumn <> scdType3New.$changingColumn UNION -- Union join any new rows to be inserted SELECT scdType3New.*, null AS previous_$changingColumn FROM scdType3New LEFT JOIN scdType3 ON scdType3.employee_id = scdType3New.employee_id WHERE scdType3.employee_id IS NULL;
This can then be included with a simple MERGE now we have the rows sorted out in our chosen structure.
-- Set autoMerge to True SET spark.databricks.delta.schema.autoMerge.enabled=true; -- Merge scdType3NEW dataset into existing MERGE INTO scdType3 USING scd3ChangeRows -- based on the following column(s) ON scdType3.employee_id = scd3ChangeRows.employee_id -- if there is a match do this... WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
# Set autoMerge to True spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True) # Convert table to Delta deltaTable = DeltaTable.forName(spark, "scdType3") # Merge Delta table with new dataset ( deltaTable .alias("original3") # Merge using the following conditions .merge( changeRowsDF.alias("updates3"), "original3.employee_id = updates3.employee_id" ) # When matched UPDATE these values .whenMatchedUpdateAll() # When not matched INSERT ALL rows .whenNotMatchedInsertAll() # Execute .execute() )
Finally checking our rows we can see that it has updated as expected. Great!
Please remember to grab code examples in SQL and Python (PySpark) from my GitHub repo: