From Warehouse to Lakehouse Pt.3 – Slowly Changing Dimensions (SCD) with Delta

SCD Type 3 in SQL and Python

Introduction

After recently designing a few Slowly Changing Dimensions with a client, I thought it would be good to revisit an earlier post theme and expand on the SCD Types.

For more information on this blog series and Slowly Changing Dimensions with Databricks and Delta Lakes check out SCD Type 1 & SCD Type 2 from part 1 of the ‘From Warehouse to Lakehouse’ series:

https://headinthecloud.blog/2021/08/17/from-warehouse-to-lakehouse-slowly-changing-dimensions-scd-with-delta-and-sql/

https://headinthecloud.blog/2021/08/24/from-warehouse-to-lakehouse-pt-2-slowly-changing-dimensions-scd-with-delta/

All code examples are available in SQL and Python (PySpark) from my GitHub repo so you can follow along:
https://github.com/cwilliams87/Blog-SCDs

Notebook – ‘SQL\SCD-Type3 & Python\SCD-Type3

SCD Type 3 – Adding a Previous Value Column

A Type 3 Slowly Changing Dimension is definitely not one of the most common forms of Dimension design, to be honest I’ve not actually seen it used much in production systems. This is due to it only allowing for a limited history of change with the form of a ‘Previous Value’ column. Once a record has been amended, the existing value switches to a ‘Previous’ column and the primary value is updated. We therefore, would only be able to secure the previous iteration of that record.

Lets examine how that would look based on the employee dataset from the previous examples.

Employee Sample Dataset

Scenario 3: For this example, three of our employees are relocating to offices in different countries. The following people require a new value for the [address_country] column, however we want to preserve a previous value history. There is also a new record to insert included in the data.

The values to UPSERT are featured in this dataset:

New Values to UPSERT

We want to MERGE this incoming dataset with our existing records but we would like to follow a similar operation which we have used in the past (See posts for SCD Type 1 & 2). If a record matches this time however, we need to write the value of [address_country] into a new [previous-country] column and update the current value. Using this method once again means that if the new dataset contains any new records, we would simply insert those rows.

Something along the lines of…

-- Merge.....
-- ...
-- ...
-- based on the following column(s)
ON scdType3.employee_id = scdType3NEW.employee_id

-- if there is a match do this...
WHEN MATCHED THEN 
  UPDATE SET 
scdType3.previous_country = scdType3.address_country, 
scdType3.address_country = scdType3NEW.address_country
-- if there is no match insert new row
WHEN NOT MATCHED THEN INSERT *

Great! It’s as easy as that right?…

AnalysisException: cannot resolve `scdType3.previous_country` in UPDATE clause given columns scdType3.`employee_id`, scdType3.`first_name`, scdType3.`last_name`, scdType3.`gender`, scdType3.`address_country`

…ah, the schema of the dataset has now being changed. We don’t want that… or do we?

Schema Enforcement or Schema Evolution

Schema Enforcement or Schema Validation is a way to ensure the quality of our data is maintained. The reason why we define data types or set data quality rules like a telephone number expecting a certain number of digits, is to preserve that integrity and avoid errors in our datasets.

There are many occasions where we need these rigid sets of rules for our incoming data. An ETL (or ELT) pipeline with dependant tables would be an example of this. Incoming datasets which contain incorrect columns or data types may break processes further down stream. We obviously want to avoid that.

What about when a dataset changes? What about if we are expecting some incremental change and we want to allow that to happen?

In Databricks and Delta dataframes, how do we allow Schema Evolution?

Models showing evolution

In a standard dataframe WRITE operation there are two methods for appending to a dataset whilst adapting it’s schema at the same time:

.option(“mergeSchema”, “true”)

The existing schema is merged with the incoming dataset. The following actions are accepted:

  • Adding new columns (this is the most common scenario)
  • Changing of data types from NullType -> any other type, or upcasts from ByteType -> ShortType -> IntegerType

Sometimes, a change to the datasets schema might be more of a significant change so an overwrite is required.

.option(“overwriteSchema”, “true)

A schema Overwrite however, can only be applied for the following actions:

  • Dropping a column
  • Changing an existing column’s data type (in place)
  • Renaming column names that differ only by case (e.g. “Foo” and “foo”)

Ok, so the ideal scenario would be to create a MERGE pattern that follows some of the previous examples of SCD’s but is dynamic and reusable. We should be able to select a chosen column that can be updated with a way to create a ‘Previous’ version of that column to maintain the history.

MERGE schema evolution

As attempted in the earlier example, we encountered an error when attempting to add a new ‘previous_country’ column as part of the MERGE

According the Automatic Schema Evolution section of the DELTA documentation:

https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution

So in order to achieve this pattern we need to approach this in a way where we can create a ‘ChangeRows’ table so we can use the updateALL() or insertALL() methods.

SCD Type 3 Example

To keep our variables (the name of a chosen column) agnostic in Databricks we can use WIDGETS:

SQL

-- Create WIDGET to pass in column name variable and keep it dynamic
CREATE WIDGET TEXT changingColumn DEFAULT 'address_country';

Python

# Create WIDGET to pass in column name variable and keep it dynamic 
dbutils.widgets.text("changingColumn", "address_country")
changingColumn = dbutils.widgets.get("changingColumn")

The WIDGETs can now be used in our queries to form an incoming dataset for the new changes (with a JOINed column from the existing set). This can then be UNION joined with any entirely new rows which do not exist in the original dataset.

-- Create ChangeRows table (union of rows to amend and new rows to insert)
CREATE OR REPLACE TEMP VIEW scd3ChangeRows AS
SELECT scdType3New.*, scdType3.$changingColumn AS previous_$changingColumn FROM scdType3New
INNER JOIN scdType3
ON scdType3.employee_id = scdType3New.employee_id
AND scdType3.$changingColumn <> scdType3New.$changingColumn
UNION
-- Union join any new rows to be inserted
SELECT scdType3New.*, null AS previous_$changingColumn FROM scdType3New
LEFT JOIN scdType3
ON scdType3.employee_id = scdType3New.employee_id
WHERE scdType3.employee_id IS NULL;

This can then be included with a simple MERGE now we have the rows sorted out in our chosen structure.

SQL

-- Set autoMerge to True
SET spark.databricks.delta.schema.autoMerge.enabled=true;

-- Merge scdType3NEW dataset into existing
MERGE INTO scdType3
USING scd3ChangeRows

-- based on the following column(s)
ON scdType3.employee_id = scd3ChangeRows.employee_id

-- if there is a match do this...
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED THEN 
  INSERT *

Python

# Set autoMerge to True
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True)

# Convert table to Delta
deltaTable = DeltaTable.forName(spark, "scdType3")

# Merge Delta table with new dataset
(
  deltaTable
    .alias("original3")
    # Merge using the following conditions
    .merge( 
      changeRowsDF.alias("updates3"),
      "original3.employee_id = updates3.employee_id"
    )
    # When matched UPDATE these values
    .whenMatchedUpdateAll()
    # When not matched INSERT ALL rows
    .whenNotMatchedInsertAll()
    # Execute
    .execute()
)

Finally checking our rows we can see that it has updated as expected. Great!

Please remember to grab code examples in SQL and Python (PySpark) from my GitHub repo:
https://github.com/cwilliams87/Blog-SCDs

Tips for the Databricks Certified Associate Developer for Apache Spark 3.0 – Python – Pt.2

Following on from my previous post I wanted to cover off some more key topics that can really help your understanding of Spark and diving in to the Databricks Certified Associate Developer for Apache Spark 3.0 exam.

For more information on general assessment tips, great practice exams to take and other core topics, please see part 1:

https://headinthecloud.blog/2022/01/05/tips-for-the-databricks-certified-associate-developer-for-apache-spark-3-0-python-pt-1/

More Topics to Remember
Broadcast Variables & Accumulator Variables

Both of these are shared variables across the cluster, but what are the uses and differences?

Broadcasting Icon

Broadcast Variables

On a top level, Broadcast Variables are immutable (read-only) and shared amongst all worker nodes in a cluster. As they are accessible within every node this can improve read time.

Say we have a large dataset partitioned across the cluster which contains a RegionCode as one of the columns.
We want to lookup the RegionName from Table 1 to incorporate it in to the main dataset.

Table 1
Table 1 broadcasted to the Executors

Using a Broadcast Variable, the lookup Table 1 can be made available for each Executor to read rather than be read from the driver multiple times. This is really useful to reduce network traffic.

Accumulator Variables

Whereas Broadcast Variables are used for reading across your Worker Nodes in a cluster, Accumulators are used for writing across the Nodes back to the driver.

Accumulator 2

Accumulators are used to gather together partial values across many tasks across the executors. They can be added to by the executors but only read by the driver (see Accumulator 2 in the diagram above). These are often used for Counters or Sums for example.

Sample

This a function which was something that featured frequently across numerous questions in the Practice Exams. If you are someone from a more Data Engineering background performing Dataframe transformations, you may not have considered the use of this.
The syntax for this method is as follows:

sample(withReplacement, fraction, seed=None)

withReplacement: bool   #Can rows appear more than once?
fraction: float         #Fraction of rows to generate
seed: int               #Seed for sampling (to lock random number generation)

Looking at the following sample question as an example:

What would be the correct code block for returning 250 rows of a 1000 row Dataframe, assuming that returned rows are distinct and the sample is reproduceable?

df.sample(True, 0.25, 3456)  # Incorrect: Multiple rows can be selected

df.sample(False, 0.25, 3456) # --- CORRECT ---

df.sample(False, 250, 3456)  # Incorrect: There should be a fraction of the whole dataset

df.sample(False, 0.25)       # Incorrect: No Seed is selected, not reproducible
Cluster Execution Modes

There are three valid execution modes in Spark: Cluster, Client and Local.

A Cluster Driver and Worker – Spark: The Definitive Guide

These different modes dictate where the driver or resources are located when you run your application.

Cluster Mode

In Cluster Mode, the job is submitted to the Cluster Manager which then launches the Driver and the Executors on Worker Nodes within the Cluster.

Cluster Mode – Spark: The Definitive Guide

Client Mode

Client Mode is where the Spark Driver is located on the machine that is submitting the application and is not located with the cluster. This is often referred to as a ‘gateway machine’ or ‘edge node’.

Client Mode – Spark: The Definitive Guide

Local Mode

Local Mode is different from the previous two as it runs the entire Spark Application on a single JVM (Java Virtual Machine).

See Spark: The Definitive Guide (Chambers, Zaharia 2018) – pg.254-256 ‘Execution Modes’

Persist & Cache
Write to disk

Persist and Cache are methods which are often used to optimise operations on the cluster. If a computation of an RDD, Dataset or Dataframe is going to be used multiple times it may be more efficient to save to memory. There are differences that are important to know when considering this approach .

cache() vs persist()

Cache

Caching is done across the workers but using Lazy transformation therefore it will only happen once an action has been performed.

df.cache()
df.count()     # An action needs to be performed

Persist

Persisting, however offers options around what type of Storage Level you want.

df.persist(StorageLevel.MEMORY_ONLY)     
# The type of Storage Level can be specified

If you want to remove a Dataframe, remove all blocks for it from memory and disk by using unpersist().

When to Cache and Persist?
If a Dataframe is accessed Commonly for frequent transformations during ETL pipelines.

When Not To Cache and Persist?
If there is an inexpensive transformation on a Dataframe not requiring frequent use, regardless of size.

Learning Spark (Damji, Wenig, Das, Lee 2020) – pg.187 ‘When to Cache and Persist’

Storage Levels

Following on from the previous point, Persist() allows for multiple Storage Levels to suit requirements. There may be times when you need to manually manage the memory options to optimise your applications.

cardboard box lot
Boxes for storage

Here are the Storage Levels:

MEMORY_ONLYData is stored only in memory
MEMORY_ONLY_SERData is serialized in a compact form and stored in memory.
Deserialization comes at a cost.
MEMORY_AND_DISKData is stored directly in memory. If memory is full the rest
is stored on disk
MEMORY_AND_DISK_SERLike MEMORY_AND_DISK but data is serialised and stored in memory. Data is always serialised when stored on disk
DISK_ONLYData is serialised and stored on disk
OFF_HEAPData is stored off-heap meaning that it is not managed by the JVM and can be handled explicitly by the application which allows for great control over memory management.

Extra Note: Each Storage.Level also has an equivalent ‘LEVEL_NAME_2‘ (e.g. MEMORY_ONLY_2), which means that it is replicated twice on different Spark Executors. This may great for fault tolerance however can be expensive.

Good Luck

Hopefully these last two articles have helped in explaining some of the core components that may feature in your exam. As I said in the previous post, I completely recommend practice exams to really get a sense of what topics you need to revise.

So when you feel ready, register to sit your exam from the Databricks Academy and good luck!

Tips for the Databricks Certified Associate Developer for Apache Spark 3.0 – Python – Pt.1

After recently diving in to (and passing!) the Associate Developer for Apache Spark 3.0 exam certification from Databricks, I thought it would be useful to go over some quick points to remember and some potential ‘gotcha’ topics for anyone considering the challenge.

The majority of the exam (72% in fact) features the use of the Dataframe API and if you are a person who uses Databricks/Spark regularly, you probably will feel pretty optimistic about these questions. Where I experienced the difficulty often came from the other categories of the exam:

Spark Architecture: Conceptual understanding – 17%

Spark Architecture: Applied understanding – 11%

These sections which I’m going to collectively refer to as just Spark Architecture, are aspects that you may have overlooked especially if you are exclusively using Spark in the context of the Databricks platform.

Firstly, I would recommend you run through a few practice papers before booking your examination, just to really get a feel for the questions and the overall requirements. I tried a few and aside from the Practice Exam provided by Databricks Academy (https://tinyurl.com/5dvjnbkz), my favourite set of papers were written by Florian Roscheck (Senior Data Scientist at Henkel), available in Python or Scala.

https://www.udemy.com/course/databricks-certified-developer-for-apache-spark-30-practice-exams/

Check out these great practice exams on Udemy! Not only are all the answers fully explained on review of the completed paper but also included is an additional third exam which is designed specifically to be more challenging than the actual exam. It was a great chance to really expand my knowledge in preparation for it. If you can achieve passing marks (70% or higher) in paper 3 then you should feel really confident to book in for the actual assessment.

The second aspect to definitely practice before your exam is the use of Spark Docs. You are provided with a copy within the assessment terminal but search is deactivated! The documentation is extensive and the viewing window is super small.

https://www.webassessor.com/zz/DATABRICKS/Python_v2.html

Here is a link to the docs so you can have a try to retrieve the information for various functions using just the scroll bar. This can be hugely important as a number of questions are designed to show function arguments in an incorrect order or with a slightly different name. Having the chance to check if the expression is expecting a String, a Column or a List can be crucial.

Topics to Remember

Over the course of my preparation for the exam I made a few notes on key areas from the Spark Architecture and Dataframe API which I noticed made an significant appearance. So perhaps this pseudo-cheat-sheet may help you to get a better understanding of these components…

Narrow & Wide Transformations

Transformations are interpreted lazily so therefore a list of these processing steps are compiled but will not return an output until they have been initiated by an Action. Transformations come in two distinct categories known as Narrow and Wide.

Narrow Transformations

This is when a change to an input partition will only contribute to one output partition.

Narrow Transformation
# An example of a Narrow Transformation is Filter
df.filter(col("chosenCol") >= 3)

Wide Transformations

This is when a transformation with have input partitions that can contribute to many output partitions.

Wide Transformation

As this operation can span across partitions, this will trigger a shuffle (the reorganisation or partitions of data across nodes in the cluster). This will be important to remember if there is a question relating to execution performance

# An example of a Wide Transformation is Join
df.join(newdf, df.id == newdf.id, "inner")

Actions

…And Actions are the triggered computation of our Transformation. This occurs with the initiation of operators like Count() or Collect().

# An example of an Action is Count
df.count()

For more information see Spark: The Definitive Guide (Chambers, Zaharia 2018) – pg.25 ‘Transformations’

Spark Execution Hierarchy

So when an Action has been triggered, The Spark application formulates all of the lazily evaluated Transformations in to an Execution Plan to be divided amongst its cluster resources. How is the plan arranged you ask?
Well, it’s simple if you remember “jst these three parts”

Job – Stage – Task

Spark Execution Hierarchy (JOB – STAGE – TASK)
  • The Application triggers a Job to fulfil the action.
  • Components of the plan are assembled in to Stages which dependent on shuffles required.
  • The stages are collections of Tasks to run transformations on the divisions of the data which are then sent to the Executors.

For more information see Spark: The Definitive Guide (Chambers, Zaharia 2018) – pg.263 ‘A Spark Job’

Repartition & Coalesce
coalesce()
# vs
repartition()

These are interesting operations because on the surface, they sound like they do the same thing. Collectively they are used to change the number of partitions of a RDD, Dataframe or Dataset however there are some noticeable differences:

Coalesce – Used to decrease number of partitions quickly avoiding shuffles.

# Reduce df partitions to 4 with Coalesce
df.coalesce(4)

Repartition – Can increase and decrease the number of partitions and organises them to an even size. This will result in slower performance due to the full shuffle.

# Reduce df partitions to 4 with Repartition
df.repartition(4)    # Optional partition columns can also be specified
Catalyst Optimizer

In order to allow Spark SQL to be as effective as possible the Catalyst Optimizer was created to automatically analyse and rewrite queries to execute more efficiently.

Catalyst Optimizer Diagram
Catalyst Optimizer Diagram

The Catalyst Optimizer takes a computational query and converts it into an execution plan which goes through four transformational phases:

  • Analysis

Spark SQL generates an abstract syntax tree (representation of the structure of text) for the query and converts this to an Unsolved Logical Plan. An internal Catalog (repository of all table and DataFrame information) is then consulted and if a required table or column name does not exist in the catalog, the analyser may reject the plan.

  • Logical Optimization

The optimiser will construct a set of multiple plans and uses its cost-based optimizer (CBO) to assign costs to each plan and applies the processes of constant folding, predicate pushdown and projection pruning to simplify.

  • Physical Planning

Catalyst then organises how the the most efficient logical plan will be executed on the cluster by creating physical plans.

  • Code Generation

The final phase involves the generation of efficient Java bytecode to run on each machine within the cluster.

This can be demonstrated by the joining of two dataframes in this query:

joinedDF = users
    .join(events, users("id") === events("uid"))
    .filter(events("date") > "2015-01-01")
The query transformation example – Learning Spark 2020

For more information see:

Spark: The Definitive Guide (Chambers, Zaharia 2018) – pg.62 ‘Overview of Structured API Execution’

Learning Spark (Damji, Wenig, Das, Lee 2020) – pg.77-81 ‘The Catalyst Optimizer’

https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

In the Next Post…

Check out my next post “Tips for the Databricks Certified Associate Developer for Apache Spark 3.0 – Python – Pt.2” where I’ll cover topics such as Cluster Execution Modes, Broadcast Variables and Accumulators!

From Warehouse to Lakehouse Pt.2 – Slowly Changing Dimensions (SCD) with Delta

SCD Type 2 in SQL and Python

Introduction

For more information on this blog series and Slowly Changing Dimensions with Databricks and Delta Lakes check out SCD Type 1 from part 1 of the ‘From Warehouse to Lakehouse’ series:
https://headinthecloud.blog/2021/08/17/from-warehouse-to-lakehouse-slowly-changing-dimensions-scd-with-delta-and-sql/

All code examples are available in SQL and Python (PySpark) from my GitHub repo so you can follow along:
https://github.com/cwilliams87/Blog-SCDs

Notebook – ‘SQL\SCD-Type2 & Python\SCD-Type2

SCD Type 2 – Add a new row (with active row indicators or dates)

A Type 2 SCD is probably one of the most common examples to easily preserve history in a dimension table and is commonly used throughout any Data Warehousing/Modelling architecture. Active rows can be indicated with a boolean flag or a start and end date. In this example from the table above, all active rows can be displayed simply by returning a query where the end date is null.

For example:

SELECT * FROM type2Table WHERE end_date IS NULL

Or in Python:

type2TableDF.where("end_date IS NULL")

In order to perform this type we need to add a number of columns to the existing table. Firstly a [start_date] and an [end_date] are required to act as active row indicators and a surrogate key denoted as [id]. This is due to the duplications that will occur in the [employee_id] column when record changes are added as new rows.

For more information on Type 2 SCD’s as proposed by the Kimball Group:

https://www.kimballgroup.com/data-warehouse-business-intelligence-resources/kimball-techniques/dimensional-modeling-techniques/type-2/

Scenario 2: Using the employee dataset (as seen in part 1). The employees Fred Flintoff and Hilary Casillis have changes to be made.
Fred needs a change of address whereas Hilary has recently got married and will be changing her [last_name].

New rows to MERGE into SCD Type 2 table

We can once again perform this operation using the MERGE function in the previous example (Part 1), however as there are essentially two events happening with each row (amend existing and insert new version), we need to create a composite of the insertion table to highlight the two operations:

SQL

-- Example ChangeRows table
SELECT
null AS id, employee_id, first_name, last_name, gender, address_street, address_city, address_country, email, job_title, current_date AS start_date, null AS end_date
FROM scdType2NEW
UNION ALL
SELECT
id, employee_id, first_name, last_name, gender, address_street, address_city, address_country, email, job_title, start_date, end_date
FROM scdType2
WHERE employee_id IN
(SELECT employee_id FROM scdType2NEW)

Python

# Create list of selected employee_id's
empList = scd2Temp.select(
collect_list(scd2Temp['employee_id'])
).collect()[0][0]

# Select columns in new dataframe to merge
scd2Temp = scd2Temp.selectExpr(
"null AS id", "employee_id", "first_name", "last_name", "gender", "address_street", "address_city", "address_country", "email", "job_title", "current_date AS start_date", "null AS end_date"
)

# Union join queries to match incoming rows with existing
scd2Temp = scd2Temp.unionByName(
scdType2DF
.where(col("employee_id").isin(empList)), allowMissingColumns=True
)

# Preview results
display(scd2Temp)
Query showing new records with their existing counterparts

As you can see in the example above we have been able to achieve a UNION JOIN with the new rows to insert with their similar counterparts displaying the original records. Notice that the [id] rows are blank for these new records because this will be used to trigger the varying behaviours in the MERGE process. So if we put these points together with the MERGE operation…

SQL

-- Merge scdType2NEW dataset into existing
MERGE INTO scdType2
USING

-- Update table with rows to match (new and old referenced as seen in example above)
( SELECT
null AS id, employee_id, first_name, last_name, gender, address_street, address_city, address_country, email, job_title, current_date AS start_date, null AS end_date
FROM scdType2NEW
UNION ALL
SELECT
id, employee_id, first_name, last_name, gender, address_street, address_city, address_country, email, job_title, start_date, end_date
FROM scdType2
WHERE employee_id IN
(SELECT employee_id FROM scdType2NEW)
) scdChangeRows

-- based on the following column(s)
ON scdType2.id = scdChangeRows.id

-- if there is a match do this…
WHEN MATCHED THEN
UPDATE SET scdType2.end_date = current_date()

-- if there is no match insert new row
WHEN NOT MATCHED THEN INSERT *

Python

# Convert table to Delta
deltaTable = DeltaTable.forName(spark, "scdType2") 

# Merge Delta table with new dataset
(
deltaTable
.alias("original2")
# Merge using the following conditions
.merge(
scdChangeRows.alias("updates2"),
"original2.id = updates2.id"
)

# When matched UPDATE ALL values
.whenMatchedUpdate(
set={ "original2.end_date" : current_date() }
)

# When not matched INSERT ALL rows
.whenNotMatchedInsertAll()
# Execute
.execute()
)

As you can see in the code example above, if there is a match with [id] column from the scdChangeRows we can simply update the [end_date] with the current date, thus marking the row as expired. The new rows are then inserted from within the scdChangeRows table.
See Databricks notebook for further clarification.

What about the [id] column I hear you ask? Yes, you are correct to point out as using this method doesn’t add in new values for those additional rows. This is sadly due to Delta not currently (as of Aug 2021) supporting Auto Increment, a classic SQL column attribute which would be used to easily populate an ID for incoming rows. We can however, replace that functionality using a ROW_NUMBER window function which will add that sequential integer. We can however, for the purposes of this example, create a quick INSERT statement to reintroduce those rows with blank results. This sadly must be performed separately due to it not being supported from within the MERGE operation itself.

SQL

-- Order nulls in DF to the end and recreate row numbers (as delta does not currently support auto incrementals)
INSERT OVERWRITE scdType2
SELECT ROW_NUMBER() OVER (ORDER BY id NULLS LAST) - 1 AS id, 
employee_id, first_name, last_name, gender, address_street, 
address_city, address_country, email, job_title, start_date, end_date
FROM scdType2

Python

scdType2DF.selectExpr(
"ROW_NUMBER() OVER (ORDER BY id NULLS LAST) - 1 AS id", 
"employee_id", "first_name", "last_name", "gender", "address_street", 
"address_city", "address_country","email", "job_title", "start_date", "end_date"
).write.insertInto(tableName="scdType2", overwrite=True)

So there we are, in the final SELECT query shown above (limited to just show the affected 4 rows) we can see that there are new versions with a null [end_date] and changed amended values!

I hope this helps, next time we’ll look at an approach to perform SCD Type 3!

Please use the notebooks provided from my GitHub repo for a more in depth example.

From Warehouse to Lakehouse Pt.1 – Slowly Changing Dimensions (SCD) with Delta

SCD Type 1 in SQL and Python

Introduction

With the move to cloud based Data Lake platforms there has often been criticism from the more traditional Data Warehousing community. A Data Lake, offering cheap, almost endlessly scalable storage in the cloud is hugely appealing to a platform administrator however over the number of years that this has been promoted some adopters have often fallen victim to the infamous Data Swamp. A poorly governed ‘free for all’ with a lack of data quality measures and ownership. It’s easy to imagine how this can happen. Sometimes just a change in personnel can begin the chain reaction of orphaned datasets and forgotten definitions.

“By its definition, a data lake accepts any data, without oversight or governance. Without descriptive metadata and a mechanism to maintain it, the data lake risks turning into a data swamp.”Gartner 2014https://www.gartner.com/en/newsroom/press-releases/2014-07-28-gartner-says-beware-of-the-data-lake-fallacy

So here is where these next few blog articles come in… How can we provide structure to the once unstructured, how can we use tried and tested data warehouse patterns to provide consistency to our ever growing data lake. I’m not going to use this article to explain how the Lakehouse pattern works or the Delta Lake used with Apache Spark. There are plenty of articles on the subject already and with just a simple Google search you will be flooded with blogs, articles and videos on the topic:

https://databricks.com/blog/2020/09/10/diving-deep-into-the-inner-workings-of-the-lakehouse-and-delta-lake.html

There are however, a few of those accomplished warehouse techniques that maybe shouldn’t just be overlooked, now that we have greater power in our tech.

Slowly Changing Dimensions

Anyone that has contributed towards a Data Warehouse or a dimensional model in Power BI will know the distinction made between the time-series metrics of a Fact Table and the categorised attributes of a Dimension Table. These dimensions are also affected by the passage of time and require revised descriptions periodically which is why they are known as Slowly Changing Dimensions (SCD). See The Data Warehouse Toolkit – Kimball & Ross for more information.

Here is where the Delta Lake comes in. Using its many features such as support for ACID transactions (Atomicity, Consistency, Isolation and Durability) and schema enforcement we can create the same durable SCD’s. This may have required a series of complicated SQL statements in the past to achieve this. I will now discuss a few of the most common SCD’s and show how they can be easily achieved using a few Databricks Notebooks, which are available from my GitHub repo so you can download and have a go:

https://github.com/cwilliams87/Blog-SCDs

Note: Before using any of the following notebooks, first ensure that the ‘SCD-Start’ notebook has been run initially to load dependencies and create datasets.

SCD Type 1 – Overwrite

Notebook – ‘SQL\SCD-Type1 & Python\SCD-Type1

The dataset that we are using in these examples is a generated sample Employee table. A simple set of people working within a company with common attributes such as name, address, email and job title.

Sample Employee dataset

A SCD Type 1 is essentially just a simple overwrite of a selected value or values. There may be an occurrence when a record contains a error which requires immediate correction so therefore we do not need to keep any previous instances of those value(s).

Scenario 1: The employee Stu Sand has changed their job title from Paralegal to Solicitor. In order to create a reproducible action to use in our Data Lake/Lakehouse we can apply a procedure to check the row exists in the dataset, overwrite the record if it exists and insert as a new row if it does not. We can form this action using the MERGE function in Spark SQL to incorporate the incoming row in to our existing dataset:

New row to amend or insert

SQL

MERGE INTO scdType1
USING scdType1NEW

ON scdType1.employee_id = scdType1NEW.employee_id

WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

Python

# Convert table to Delta
deltaTable = DeltaTable.forName(spark, "scdType1")

# Merge Delta table with new dataset
( deltaTable
.alias("original1")
# Merge using the following conditions
.merge(
scd1Temp.alias("updates1"),
"original1.employee_id = updates1.employee_id" )

# When matched UPDATE ALL values
.whenMatchedUpdateAll()

# When not matched INSERT ALL rows
.whenNotMatchedInsertAll()

# Execute merge
.execute() )

This operation checks that the [employee_id] of the incoming dataframe matches the [employee_id] of the existing (scdType1) , performs an UPDATE action for all fields (*) and if the row matches, an INSERT action is performed.

A query you may find useful that can be performed at this stage is the DESCRIBE HISTORY statement. One of Delta format’s significant features is its Transaction Log, an ordered record of every transaction performed on a Delta Table. The evolution of a table can be shown with this query:

SQL

DESCRIBE HISTORY scdType1

Python

display( deltaTable.history() )
Displaying the Delta Transaction log

There we go, that was pretty simple wasn’t it. In the next post we’ll look at SCD Type 2!

Please use the notebooks provided from my GitHub repo for a more in depth example.

First one…

So here it is, the first post in what I hope is the first of many, but only time will tell.

Chris by a stream in North Yorkshire

I wanted to use this as a potential avenue in to certain cloud concepts and technologies in the hope that it could potentially help others to develop these kind of solutions.

In my history working in the Business Intelligence space, I met a number of colleagues more comfortable with the traditional Microsoft BI Stack (MS SQL Server, SSIS, SSRS, Excel) and as the technology has moved at such a rapid pace, were sometimes unsure which would be the correct path to follow. If I am able to assist even a couple of those people, then I will consider this a success.

From a personal point of view, I would also like this to be a place where I can share accounts of my career as well as any potential insight that I could share. If you are new to the field or are hoping to work in Data Engineering, perhaps, this could help…

…but only time will tell.