From Warehouse to Lakehouse Pt.3 – Slowly Changing Dimensions (SCD) with Delta

SCD Type 3 in SQL and Python


After recently designing a few Slowly Changing Dimensions with a client, I thought it would be good to revisit an earlier post theme and expand on the SCD Types.

For more information on this blog series and Slowly Changing Dimensions with Databricks and Delta Lakes check out SCD Type 1 & SCD Type 2 from part 1 of the ‘From Warehouse to Lakehouse’ series:

All code examples are available in SQL and Python (PySpark) from my GitHub repo so you can follow along:

Notebook – ‘SQL\SCD-Type3 & Python\SCD-Type3

SCD Type 3 – Adding a Previous Value Column

A Type 3 Slowly Changing Dimension is definitely not one of the most common forms of Dimension design, to be honest I’ve not actually seen it used much in production systems. This is due to it only allowing for a limited history of change with the form of a ‘Previous Value’ column. Once a record has been amended, the existing value switches to a ‘Previous’ column and the primary value is updated. We therefore, would only be able to secure the previous iteration of that record.

Lets examine how that would look based on the employee dataset from the previous examples.

Employee Sample Dataset

Scenario 3: For this example, three of our employees are relocating to offices in different countries. The following people require a new value for the [address_country] column, however we want to preserve a previous value history. There is also a new record to insert included in the data.

The values to UPSERT are featured in this dataset:

New Values to UPSERT

We want to MERGE this incoming dataset with our existing records but we would like to follow a similar operation which we have used in the past (See posts for SCD Type 1 & 2). If a record matches this time however, we need to write the value of [address_country] into a new [previous-country] column and update the current value. Using this method once again means that if the new dataset contains any new records, we would simply insert those rows.

Something along the lines of…

-- Merge.....
-- ...
-- ...
-- based on the following column(s)
ON scdType3.employee_id = scdType3NEW.employee_id

-- if there is a match do this...
scdType3.previous_country = scdType3.address_country, 
scdType3.address_country = scdType3NEW.address_country
-- if there is no match insert new row

Great! It’s as easy as that right?…

AnalysisException: cannot resolve `scdType3.previous_country` in UPDATE clause given columns scdType3.`employee_id`, scdType3.`first_name`, scdType3.`last_name`, scdType3.`gender`, scdType3.`address_country`

…ah, the schema of the dataset has now being changed. We don’t want that… or do we?

Schema Enforcement or Schema Evolution

Schema Enforcement or Schema Validation is a way to ensure the quality of our data is maintained. The reason why we define data types or set data quality rules like a telephone number expecting a certain number of digits, is to preserve that integrity and avoid errors in our datasets.

There are many occasions where we need these rigid sets of rules for our incoming data. An ETL (or ELT) pipeline with dependant tables would be an example of this. Incoming datasets which contain incorrect columns or data types may break processes further down stream. We obviously want to avoid that.

What about when a dataset changes? What about if we are expecting some incremental change and we want to allow that to happen?

In Databricks and Delta dataframes, how do we allow Schema Evolution?

Models showing evolution

In a standard dataframe WRITE operation there are two methods for appending to a dataset whilst adapting it’s schema at the same time:

.option(“mergeSchema”, “true”)

The existing schema is merged with the incoming dataset. The following actions are accepted:

  • Adding new columns (this is the most common scenario)
  • Changing of data types from NullType -> any other type, or upcasts from ByteType -> ShortType -> IntegerType

Sometimes, a change to the datasets schema might be more of a significant change so an overwrite is required.

.option(“overwriteSchema”, “true)

A schema Overwrite however, can only be applied for the following actions:

  • Dropping a column
  • Changing an existing column’s data type (in place)
  • Renaming column names that differ only by case (e.g. “Foo” and “foo”)

Ok, so the ideal scenario would be to create a MERGE pattern that follows some of the previous examples of SCD’s but is dynamic and reusable. We should be able to select a chosen column that can be updated with a way to create a ‘Previous’ version of that column to maintain the history.

MERGE schema evolution

As attempted in the earlier example, we encountered an error when attempting to add a new ‘previous_country’ column as part of the MERGE

According the Automatic Schema Evolution section of the DELTA documentation:

So in order to achieve this pattern we need to approach this in a way where we can create a ‘ChangeRows’ table so we can use the updateALL() or insertALL() methods.

SCD Type 3 Example

To keep our variables (the name of a chosen column) agnostic in Databricks we can use WIDGETS:


-- Create WIDGET to pass in column name variable and keep it dynamic
CREATE WIDGET TEXT changingColumn DEFAULT 'address_country';


# Create WIDGET to pass in column name variable and keep it dynamic 
dbutils.widgets.text("changingColumn", "address_country")
changingColumn = dbutils.widgets.get("changingColumn")

The WIDGETs can now be used in our queries to form an incoming dataset for the new changes (with a JOINed column from the existing set). This can then be UNION joined with any entirely new rows which do not exist in the original dataset.

-- Create ChangeRows table (union of rows to amend and new rows to insert)
SELECT scdType3New.*, scdType3.$changingColumn AS previous_$changingColumn FROM scdType3New
ON scdType3.employee_id = scdType3New.employee_id
AND scdType3.$changingColumn <> scdType3New.$changingColumn
-- Union join any new rows to be inserted
SELECT scdType3New.*, null AS previous_$changingColumn FROM scdType3New
LEFT JOIN scdType3
ON scdType3.employee_id = scdType3New.employee_id
WHERE scdType3.employee_id IS NULL;

This can then be included with a simple MERGE now we have the rows sorted out in our chosen structure.


-- Set autoMerge to True

-- Merge scdType3NEW dataset into existing
USING scd3ChangeRows

-- based on the following column(s)
ON scdType3.employee_id = scd3ChangeRows.employee_id

-- if there is a match do this...


# Set autoMerge to True
spark.conf.set("", True)

# Convert table to Delta
deltaTable = DeltaTable.forName(spark, "scdType3")

# Merge Delta table with new dataset
    # Merge using the following conditions
      "original3.employee_id = updates3.employee_id"
    # When matched UPDATE these values
    # When not matched INSERT ALL rows
    # Execute

Finally checking our rows we can see that it has updated as expected. Great!

Please remember to grab code examples in SQL and Python (PySpark) from my GitHub repo:

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: