Exact percentiles in Spark

Fast and exact robust distribution chracteristics

Calculating exploratory statistical characteristics of a distribution such as its percentiles are important when analyzing data. Spark has supported the calcualtion of approximative percentiles for quite some while now using the algorithm of Greenwald-Khanna, https://databricks.com/de/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html . Their implementation is very scalable.

However, not all datasets are created equal. Sometimes you want to calculate such statistics for each key in the dataframe. Indeed, this is possible. But the number of elements per key (especially for a high cardinality key) might decrease rapidly. This leads to two problems with the approximative approach:

  • it is slow: the distributed combine step is slower (but more scalable)
  • it is not exact. This is particularly problematic with a small number of elements for a key
  • following out of the last point: not exact also means not deterministic. Especially for keys with a small number of elements the result will vary even more.

Therefore I implemented a naive but exact algorithm to compute the percentiles. It solves all these three pain points of mine. And as I use it to alculate the statistics per key it is scalable enough for my use-cases.

Recently, I started to work on more pySpark based scalable-datascience pipelines. In the past I had mainly used the Scala-based API. For this blog post I originally wanted to explain how I converted the Scala percentiles over to pyspark. But when researching some more for this blog post (after already having successfully implemented it) I stumbled upon: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala and figured out that spark apparently meanwhile already supports the calculation of exact percentiles.

As they are natively supported for catalyst they are even a bit faster to compute. Probably this means I should read more of the manual and API documentation - but it is hard to keep up with all the changes for such a fast moving project like spark - and the added benefit of my custom implementation is that I really control the percentiles. My implementation is matching the default pandas values (i.e. linear interpolation).

But I think the story of how to perform this implementation is still worthwhile - that`s why I will share it with you now - especially if you might need percentiles which must be exactly equal with the pandas way.

custom percentiles in Scala

The main percentiels function:

/**
    * Based on https://github.com/scalanlp/breeze/blob/master/math/src/main/scala/breeze/stats/DescriptiveStats.scala#L537
    * and on https://stackoverflow.com/questions/61508232/computation-of-approximative-percentiles
    * Returns the estimate of a pre-sorted array at p * it.size, where p in [0,1].
    * Same result as pandas/numpy using linear interpolation (=do nothing).
    * https://stackoverflow.com/questions/61776689
    * <p>
    * Note:
    * Result is invalid if the input array is not already sorted.
    * </p>
    *
    * But the calling function already applies spark native sorting using `sort_array`
    */
  def calculatePercentile(arr: Seq[Double], p: Double) = {
    val rank = (arr.length - 1) * p
    val i = arr(math.floor(rank).toInt)
    val j = arr(math.ceil(rank).toInt)
    val fraction = rank - math.floor(rank)
    i + (j - i) * fraction
  }

is wrapped in quite some boilerplate to make it available to Spark. The good thing is it will then also support handling of multiple columns and percentiles. As an array with all the percentiles is returned a handy function is added to unpack these to separate columns.

case class PercentileResult(percentile: Double, value: Double)

def getPercentileListName(s: String): String = s"${s}_percentiles"

/**
    * PREDONDITION: one key does not require an overly large amount of memory. Otherwise this version of percentiles
    * will OOM.
    * `df.groupby().agg(sort_array(collect_list(col(''))))` has already been calculated, we get the percentiles
    * and list of columns with the array values as the input.
    *
    * @param percentiles list of [0,1] percentiles
    * @param columns columns to be percentiled
    * @return columns of tupled percentiles
    */
  def exactPercentiles(
      percentiles: Seq[Double],
      columns: Seq[String]
  ): Seq[Column] = {
    percentiles.foreach(p => {
      if (p > 1 || p < 0)
        throw new IllegalArgumentException("p must be in [0,1]")
    })
    def calcPercentiles(
        arr: Seq[Double]
    ): Seq[PercentileResult] = {
      if (arr.isEmpty) {
        percentiles.map(p => {
          PercentileResult(percentile = p, value = Double.NaN)
        })
      } else {
        percentiles.map(p => {
          PercentileResult(percentile = p, value = calculatePercentile(arr, p))
        })
      }
    }
    val percentileUDF = udf(calcPercentiles _).asNondeterministic()
    columns.map(co => {
      percentileUDF(
        sort_array(collect_list(col(co).cast(DoubleType)))
      ).alias(getPercentileListName(co))
    })
  }

def unpackExactPercentilesToColumns(
      percentiles: Seq[Double],
      columns: Seq[String]
  )(df: DataFrame): DataFrame = {
    columns.foldLeft(df) { (tmpDfColumn, co) =>
      {
        percentiles.zipWithIndex
          .foldLeft(tmpDfColumn) { (tmpDfInnerPerc, percentile) =>
            {
              tmpDfInnerPerc.withColumn(
                s"${getPercentileListName(co)}_${percentile._1.toString.replace(".", "")}",
                col(getPercentileListName(co))
                  .getItem(percentile._2)
                  .getItem("value")
              )
            }
          }
          .drop(getPercentileListName(co))
      }
    }

  }
The .asNondeterministic() is very deliberate! Otherwise, when extracting the percentiles into separate columns catalyst would assume the operation is cheap and would recompute it for each one! As this is a very costly operation it should only be triggered once and only once.

pySpark integration

As I already experienced that the vectorized pandas UDF are still quite a bit slower than JVM native UDFs I decided to not use pandas.quantile but rather make my custom Scala UDF available to the python API.

This requires some additional code in Scala:

object ExactPercentiles {

  def exactPercentilesDirect(
      values: Seq[Double],
      percentiles: Seq[Double]
  ): Seq[PercentileResult] = {
    if (values.isEmpty) {
      percentiles.map(p => {
        PercentileResult(percentile = p, value = Double.MinValue)
      })
    } else {
      percentiles.map(p => {
        PercentileResult(percentile = p, value = calculatePercentile(values, p))
      })
    }
  }

  def exactPercentilesUDF =
    udf(exactPercentilesDirect _).asNondeterministic()

  def registerUdf = {
    val spark = SparkSession.builder().getOrCreate()
    spark.udf.register(
      "percentiles_exact",
      (values: Seq[Double], percentiles: Seq[Double]) =>
        exactPercentilesDirect(values, percentiles)
    )
  }
}

To use the Scala code in pyspark you must create a JAR out of it and generate JVM bytecode. I choose gradle to compile the code.

Finally we can actually start using it:

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, expr, rand, when, count, col

spark = SparkSession.builder.appName("anomlydetection") \
        .master("local[4]") \
        .config("spark.jars", "/path/to/my/jar-percentiles.jar") \
        .config("spark.driver.memory", "5G") \
        .enableHiveSupport()  \
        .getOrCreate()

import pandas as pd
import numpy as np

pdf = pd.DataFrame({'foo':[1.0,2.0,3.0, np.NaN], 'bar':[2.0,3.0,4.0, np.NaN], 'category':[1,1,1,2]})

sdf = spark.createDataFrame(pdf)

pdf.quantile([0.25, 0.5, 0.75])

from pyspark.sql.functions import array, lit, sort_array, collect_list
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.column import Column, _to_java_column, _to_seq 

def percentiles_exact_scala_udf(values, percentiles): 
    exactPercentilesUDF = spark._jvm.ExactPercentiles.exactPercentilesUDF() 
    return Column(exactPercentilesUDF.apply(_to_seq(spark.sparkContext, [values, percentiles], _to_java_column)))

perc_df  = sdf.select(percentiles_exact_scala_udf(sort_array(collect_list(col("foo"))), array([lit(i) for i in [0.25, 0.5, 0.75]])).alias("foo_percentiles"))
perc_df.show(200, False)

This nicely demonstrates the execution of arbitrary scala code using py4j - and wrapping of the required objects. Lastly, the output needs to be extracted to be available in separate columns:

from pyspark.sql import DataFrame

def unpack_percentiles_exact(percentiles, columns = None):
    """
    Extracts the output of `exactPercentilesUDF` (i.e. Array of percentile value and percentile) to separate columns for better downstream handling
    Based on the idea of https://github.com/apache/spark/pull/23877 and https://mungingdata.com/pyspark/chaining-dataframe-transformations/
    """
    def _(df: DataFrame):
        if columns is None:
            columns_with_percentiles = [c for c in df.columns if '_percentiles' in c]
        else:
            columns_with_percentiles = [f'{c}_percentiles' for c in columns]
            
        for c in columns_with_percentiles:
            print(c)
            for i,p in enumerate(percentiles):
                p_string = str(p).replace(".", "")
                print(f"{c}_{p_string}")
                df = df.withColumn(f"{c}_{p_string}", col(c).getItem(i).getItem("value"))
            df = df.drop(c)
        return df
    return _
From scala the df.transform(myFuctnion()) is very handy chaining operations. This also works for pyspark!

This computes the desired percentiles and stores them into separate columns for easy downstream handling:

perc_df.transform(unpack_percentiles_exact([0.25, 0.5, 0.75])).show()

native exact percentiles

As you certainly have observed quite some amount of code is needed. Furthermore, when you execut it you will observe that a WholeStageCodegen stage is added to execute the custom JVM based UDF. This is not the case for this catalyst native implementation of exact percentiles which nowadays ships directly out of the box with spark (at least for 3.0.1 which I am working with).

percsdf = sdf.groupBy("category").agg(expr("percentile(foo, array(0.25, 0.5, 0.75))").alias("foo_percentiles"))
percsdf.show()

from pyspark.sql import DataFrame

# base on:
# perc_df.withColumn('xx', col('foo_percentiles').getItem(0).getItem('value')).show()
def unpack_percentiles_exact(percentiles, columns = None):
    """
    Extracts the output of `exactPercentilesUDF` (i.e. Array of percentile value and percentile) to separate columns for better downstream handling
    Based on the idea of https://github.com/apache/spark/pull/23877 and https://mungingdata.com/pyspark/chaining-dataframe-transformations/
    """
    def _(df: DataFrame):
        if columns is None:
            columns_with_percentiles = [c for c in df.columns if '_percentiles' in c]
        else:
            columns_with_percentiles = [f'{c}_percentiles' for c in columns]
            
        for c in columns_with_percentiles:
            print(c)
            for i,p in enumerate(percentiles):
                p_string = str(p).replace(".", "")
                print(f"{c}_{p_string}")
                df = df.withColumn(f"{c}_{p_string}", col(c).getItem(i))
            df = df.drop(c)
        return df
    return _
    
percsdf.transform(unpack_percentiles_exact([0.25, 0.5, 0.75])).show()

I like that this native implementation is even faster than my hand-rolled variant! Furthermore, the amount of code needed is dreastically reduced. Also no two separate code-bases and a custom build in scala are required. This drastically simplifes the calculation of percentiles and makes it accessible to many more people.

TLDR

Spark supports a percentile SQL function to compute exact percentiles. It is not scalable (similar to my custom implementation) and needs to fit all values (i.e. for a key) into RAM. So far, I have not found it in the python or Scala API - therefore it needs to be executed using the string based expr("percentile(my_column, array(0.25, 0.5, 0.75))") notation.

I have learnt quite some things of how to integrate pySpark with its more native Scala API to keep things fast.

Georg Heiler
Georg Heiler
PhD candidate & data scientist

My research interests include large geo-spatial time and network data analytics.

comments powered by Disqus

Related