Sparkling SCD2

Transform data to valid_from valid_to SDCD2 style using Apache Spark

In a data lake there usually are three kinds of data sets:

  • raw event data
  • daily full copies of small to medium sized metadata (including history)
  • change data capture (=stream) if edits

Especially for the last two types it holds that data is usually valid in a certain period of time only. I.e. the address of a customer might be changed - and thus invalidating the existing one.

When performing joins on multiple fully denormalized tables this operation can become very costly (i.e. if the same events are copied over and over again in full copy ingests) due to the high load of shuffle IO.

In the classical Hadoop ecosystem it was not easy to update data. Only dropping a full partition and re-loading the data of it afterwards was easy. As a result state mutation i.e. invalidation of records was a complex operation. Nowadays ACID tooling like delta lake, hudi iceberg or even Hive ACID are available to this easy. They all have their strengths and weaknesses, but all to some extent support the standard SQL MERGE INTO command.

However, you might be in the situation (as a data scientist) that your data engineers might not be fond of these tools just yet due to potential lack of vendor support other reasons. Or maybe as a data engineer you do not want to expose some special table to clients - but only a regular one (which still reflects the updated state). Then the following might be of interest to you:

As long as you are storing small to medium sized data it should be possible to historize it all - from the beginning of time. This allows you to re-create such a stateful SCD2 style (valid_from, valid_to) table easily for the metadata. Simply overwrite this view from the raw data when you want to perform updates.

Obviously, this is not ACID - and you should not have any running queries executing on it during the update.

See the example below:

val df = Seq(("k1","foo", "2020-01-01"), ("k1","foo", "2020-02-01"), ("k1","baz", "2020-02-01"),
("k2","foo", "2019-01-01"), ("k2","foo", "2019-02-01"), ("k2","baz", "2019-02-01")).toDF("key", "value_1", "date").withColumn("date", to_date(col("date")))
df.show
+---+-------+----------+
|key|value_1| date|
+---+-------+----------+
| k1| foo|2020-01-01|
| k1| foo|2020-02-01|
| k1| baz|2020-02-01|
| k2| foo|2019-01-01|
| k2| foo|2019-02-01|
| k2| baz|2019-02-01|
+---+-------+----------+
df.printSchema
root
|-- key: string (nullable = true)
|-- value_1: string (nullable = true)
|-- date: date (nullable = true)
df.transform(deduplicateScd2(Seq("key"), Seq("date"), "date", Seq())).show
+---+-------+----------+----------+
|key|value_1|valid_from| valid_to|
+---+-------+----------+----------+
| k1| foo|2020-01-01|2020-02-01|
| k1| baz|2020-02-01|2020-11-18|
| k2| foo|2019-01-01|2019-02-01|
| k2| baz|2019-02-01|2020-11-18|
+---+-------+----------+----------+

The code which makes this transformation magically happen follows below. You only must ensure to provide no all NULL column. These can be excluded by specifying them as columnsToIgnore:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.functions.lead
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.functions.current_date

def deduplicateScd2(
      key: Seq[String],
      sortChangingIgnored: Seq[String],
      timeColumn: String,
      columnsToIgnore: Seq[String]
  )(df: DataFrame): DataFrame = {
    val windowPrimaryKey = Window
      .partitionBy(key.map(col): _*)
      .orderBy(sortChangingIgnored.map(col): _*)
    val columnsToCompare =
      df.drop(key ++ sortChangingIgnored: _*).drop(columnsToIgnore: _*).columns
    val nextDataChange = lead(timeColumn, 1).over(windowPrimaryKey)
    val deduplicated = df
      .withColumn(
        "data_changes_start",
        columnsToCompare
          .map(e => {
            val previous = lag(col(e), 1).over(windowPrimaryKey)
            val self = col(e)
            // 3 cases: 1.: start (previous is NULL), 2: in between, try to collapse 3: end (= next is null)
            // first, filter to only start & end events (= updates/invalidations of records)
            //self =!= previous or self =!= next or previous.isNull or next.isNull
            self =!= previous or previous.isNull
          })
          .reduce(_ or _)
      )
      .withColumn(
        "data_changes_end",
        columnsToCompare
          .map(e => {
            val next = lead(col(e), 1).over(windowPrimaryKey)
            val self = col(e)
            // 3 cases: 1.: start (previous is NULL), 2: in between, try to collapse 3: end (= next is null)
            // first, filter to only start & end events (= updates/invalidations of records)
            self =!= next or next.isNull
          })
          .reduce(_ or _)
      )
      .filter(col("data_changes_start") or col("data_changes_end"))
      .drop("data_changes")
    deduplicated //.withColumn("valid_to", nextDataChange)
      .withColumn(
        "valid_to",
        when(col("data_changes_end") === true, col(timeColumn))
          .otherwise(nextDataChange)
      )
      .filter(col("data_changes_start") === true)
      .withColumn(
        "valid_to",
        when(nextDataChange.isNull, current_date()).otherwise(col("valid_to"))
      )
      .withColumnRenamed(timeColumn, "valid_from")
      .drop("data_changes_end", "data_changes_start")
  }
}
Georg Heiler
Georg Heiler
Researcher & data scientist

My research interests include large geo-spatial time and network data analytics.