SCD Type 1 in SQL and Python
With the move to cloud based Data Lake platforms there has often been criticism from the more traditional Data Warehousing community. A Data Lake, offering cheap, almost endlessly scalable storage in the cloud is hugely appealing to a platform administrator however over the number of years that this has been promoted some adopters have often fallen victim to the infamous Data Swamp. A poorly governed ‘free for all’ with a lack of data quality measures and ownership. It’s easy to imagine how this can happen. Sometimes just a change in personnel can begin the chain reaction of orphaned datasets and forgotten definitions.
“By its definition, a data lake accepts any data, without oversight or governance. Without descriptive metadata and a mechanism to maintain it, the data lake risks turning into a data swamp.” – Gartner 2014 – https://www.gartner.com/en/newsroom/press-releases/2014-07-28-gartner-says-beware-of-the-data-lake-fallacy
So here is where these next few blog articles come in… How can we provide structure to the once unstructured, how can we use tried and tested data warehouse patterns to provide consistency to our ever growing data lake. I’m not going to use this article to explain how the Lakehouse pattern works or the Delta Lake used with Apache Spark. There are plenty of articles on the subject already and with just a simple Google search you will be flooded with blogs, articles and videos on the topic:
There are however, a few of those accomplished warehouse techniques that maybe shouldn’t just be overlooked, now that we have greater power in our tech.
Slowly Changing Dimensions
Anyone that has contributed towards a Data Warehouse or a dimensional model in Power BI will know the distinction made between the time-series metrics of a Fact Table and the categorised attributes of a Dimension Table. These dimensions are also affected by the passage of time and require revised descriptions periodically which is why they are known as Slowly Changing Dimensions (SCD). See The Data Warehouse Toolkit – Kimball & Ross for more information.
Here is where the Delta Lake comes in. Using its many features such as support for ACID transactions (Atomicity, Consistency, Isolation and Durability) and schema enforcement we can create the same durable SCD’s. This may have required a series of complicated SQL statements in the past to achieve this. I will now discuss a few of the most common SCD’s and show how they can be easily achieved using a few Databricks Notebooks, which are available from my GitHub repo so you can download and have a go:
Note: Before using any of the following notebooks, first ensure that the ‘SCD-Start’ notebook has been run initially to load dependencies and create datasets.
SCD Type 1 – Overwrite
Notebook – ‘SQL\SCD-Type1 & Python\SCD-Type1
The dataset that we are using in these examples is a generated sample Employee table. A simple set of people working within a company with common attributes such as name, address, email and job title.
A SCD Type 1 is essentially just a simple overwrite of a selected value or values. There may be an occurrence when a record contains a error which requires immediate correction so therefore we do not need to keep any previous instances of those value(s).
Scenario 1: The employee Stu Sand has changed their job title from Paralegal to Solicitor. In order to create a reproducible action to use in our Data Lake/Lakehouse we can apply a procedure to check the row exists in the dataset, overwrite the record if it exists and insert as a new row if it does not. We can form this action using the
MERGE function in Spark SQL to incorporate the incoming row in to our existing dataset:
MERGE INTO scdType1 USING scdType1NEW ON scdType1.employee_id = scdType1NEW.employee_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
# Convert table to Delta deltaTable = DeltaTable.forName(spark, "scdType1") # Merge Delta table with new dataset ( deltaTable .alias("original1") # Merge using the following conditions .merge( scd1Temp.alias("updates1"), "original1.employee_id = updates1.employee_id" ) # When matched UPDATE ALL values .whenMatchedUpdateAll() # When not matched INSERT ALL rows .whenNotMatchedInsertAll() # Execute merge .execute() )
This operation checks that the [employee_id] of the incoming dataframe matches the [employee_id] of the existing (scdType1) , performs an
UPDATE action for all fields (*) and if the row matches, an
INSERT action is performed.
A query you may find useful that can be performed at this stage is the
DESCRIBE HISTORY statement. One of Delta format’s significant features is its Transaction Log, an ordered record of every transaction performed on a Delta Table. The evolution of a table can be shown with this query:
DESCRIBE HISTORY scdType1
display( deltaTable.history() )
There we go, that was pretty simple wasn’t it. In the next post we’ll look at SCD Type 2!
Please use the notebooks provided from my GitHub repo for a more in depth example.