Deterministic scale-out for spark jobs under increased load

Worry free spark jobs when processing more data using dynamic iteration.

A lot of ETL jobs run on a daily basis. For these jobs, the trick below is not relevant. However in case you need to backfill data or want to compute analytical queries on more than one day (atomic unit of execution) things can get tricky.

In case functions like repartition oder wide transformations like join or an aggregation are used they heavily depend on the paralellism, which in turn depends on the amount of data being processed.

In case the input data is not static for example when backfilling or executing analytical queries over multiple time frames time consuming hand tuning of various spark configuration parameters is required.

Additionally the shuffle IO could be unnecessarily large. When simply processing a greater than expected number of data.

Instead breaking the job down into iterated (i.e. daily parts) might limit the required IO. A naive implementation would use Oozie to perform such a task. However:

  • this would spin up an additional master container for each job.
  • Further limitations are: an additional tool would need to be learnt.
  • Moreover, any pre setup or post completion tasks like the calculation of a dataframe reused in all iteration steps would need to be performed each time over and over again.

This is inefficient.

It is more efficient to use spark itself for this task and iterate there and achieve paralellism on a different level. One more benefit is that you can control exactly the size of the outputted partitions. A repartition(numberPartitions) will now work on an atomic unit of data, and thus has a reduced shuffle IO load and is easier to configure.

This results in a job which scales more deterministically with increased amounts of data.

A function like:

def iterationModule[A](iterationItems: Seq[A], functionToApply: A => Any, parallel: Boolean = false): Unit =
    if (parallel) {
      iterationItems.par.foreach(functionToApply)
    }
    else {
      iterationItems.foreach(functionToApply)
    }

Can be called where functionToApply computes the desired result and outputs the relevant data for each partitions (daily) transformed data back to a storage system.

An example of how to apply the iteration module:

def doStuff(inputPartition: String, reusedStuff: String): Unit = println(inputPartition + "__"+ reusedStuff)

iterationModule[String](Seq("20190101", "20190102"), doStuff(_, "foo"))
Avatar
Georg Heiler
PhD candidate & data scientist

My research interests include large geo-spatial time and network data analytics.

Related