# Solve data skew issues for array columns in spark

Handling data skew for complex type and Array based columns.

A lot of frameworks implementing the map-reduce paradigm show degraded performance in case of data skew. When there are outlier keys in the data with more than an order of magnitude more events than regular keys some tasks will take longer to compute. In fact, most likely hours longer.

Unfortunately, spark is a data parallel framework which only can handle simple rows well. It will not really consider complex types and especially not arrays of complex types when performing a shuffle. So a regular repartition will not resolve the problem of data skew for arrays with unequal number of elements.

## example

For my real usecase I need to apply a fairly expensive function which requires data locality and parallelizes over all the keys. I obtained it using:

realDf.grouBy("key", "date")
.agg(collect_list(sort_array('time, 'foo,  'bar, 'others, ...)))


So there I have a sorted array. Unfortunately, some keys have arrays with by far larger size than the overage. As my function iterates over the array a couple of times, computing it will take a lot more time for a larger input.

Now to the example. The following code returns a similar dataset exhibiting similar issues (https://stackoverflow.com/questions/46240688/how-to-equally-partition-array-data-in-spark-dataframe):

import scala.util.Random
import org.apache.spark.sql._
import org.apache.spark.sql.types.IntegerType
val localData = (1 to 100).map(i => (i,Seq.fill(Math.abs(Random.nextGaussian()*100).toInt)(Random.nextDouble)))
val df = sc.parallelize(localData).toDF("id","data").withColumn("data_size",size($"data")) df.printSchema root |-- id: integer (nullable = false) |-- data: array (nullable = true) | |-- element: double (containsNull = false) |-- data_size: integer (nullable = false) df.show(3) +---+--------------------+---------+ | id| data|data_size| +---+--------------------+---------+ | 1|[0.15397263238637...| 131| | 2|[0.97736166537028...| 42| | 3|[0.09454220834717...| 112| +---+--------------------+---------+  Though, in my real dataset these are even more severy as the function applied to the array is more expensive to compute for large arrays. ## dynamic sharding Dynamic sharding to the rescue. Facebook already faced a similar problem. They iteratively applied dynamic sharding. Using a simple example I will outline how a simple implementation of dynamic sharding could look like. Let’s define a function. def myFunction(input:Seq[Double]):Option[Double]= { // some random but expensive function when the array is overly long. if(null == input){ None }else{ Some(input.max) } } val myUdf = udf(myFunction _)  It can be executed like: df.withColumn("max", myUdf(col("data"))).show  To make it a bit more user-friendly and reusable: def myTransform(column:String)(df:DataFrame) = { df.withColumn("max", myUdf(col(column))) }  In order to derive the cutoff, there exist various possibilities. Some experiments on my data suggest that a basic mean and stdev are faster than approximative percentiles. But this might not be the case for you and your dataset. def deriveCutoff(column: String, df: DataFrame, useMean: Boolean = true): Int = { import df.sparkSession.implicits._ if (useMean) { df.select( (avg(col(column)) + stddev(col(column))).cast(IntegerType).as[Int]) .head } else { val columnsOfInterest = Array(column) val percentiles = Array(.9) val relError = 0.01 val approxQuantiles = df.stat.approxQuantile(columnsOfInterest, percentiles, relError) for (co <- columnsOfInterest.zipWithIndex) { for (q <- percentiles.zipWithIndex) { println( s"Column:${co._1}, quantiles: ${q._1}, value:${approxQuantiles(
co._2)(q._2)}")
}
}
}
}


Both are demonstrated above. But I for now only use any value larger than mean + stdev for the number of elements in the array as an outlier. My compute function first derives the size and then the cutoff. This is used to separate the data into regular sized arrays and outliers. The expensive function can then be computed separately for each and the results are combined using the union operator.

The important piece is the repartition operation, especially, in case of outliers. As all the regular arrays are separated from the outliers when repartitioning these, they should be separated more evenly. At least better than before.

val tmpSizeCol = "tmp_size_column"
def compute(function:DataFrame=> DataFrame, arrayColumn:String, parallelismRegular: Int, parallelismIncreased: Int)(df:DataFrame):DataFrame = {
val withSize = df.withColumn(tmpSizeCol,size(col(arrayColumn)))
// consider to persist prior to aggregation in case of iterative refinement a checkpoint  mith be useful
df.cache //also you might not have enough RAM available: .persist(StorageLevel.DISK_ONLY), and should consider to repartition to make sure that less than 2GB fall into a single partition.
val cutoff = deriveCutoff(tmpSizeCol, withSize, useMean=true)

val regular = withSize.filter(col(tmpSizeCol) <= cutoff).repartition(parallelismRegular).transform(function)
val outlier = withSize.filter(col(tmpSizeCol) > cutoff).repartition(parallelismIncreased).transform(function)

regular.union(outlier)
}


Then, it can be applied like:

df.transform(compute(myTransform("data"), "data", 4, 40)).show


## conclusion

Using dynamic sharding data skew for complex types and arrays can be handled for spark. If this is not enough for your usecase (like for Facebook), you need to apply this fucntion iteratively to evenly spread out the records.

##### Georg Heiler
###### PhD candidate & data scientist

My research interests include large geo-spatial time and network data analytics.