Run the latest version of spark

Flexible choice of spark version in enterprise data platform

Apache Spark

Distributors of enterprise data platforms such as Cloudera oftentimes bundle solid, but mature (=old) software. Particularly, in the case of Cloudera, they are focusing all their energy on CDP. This results in the current HDP sometimes not getting the latest updates.

In the past I already experimented with executing a custom version of spark regardless of the Hadoop or Hive version: Headless spark without Hadoop & Hive jars. However, that approach was lacking support for Hive, i.e. the tables registered in the Hive-Metastore were not available in spark.

Today, I will outline how to get Hive support to work on HDP 3.1 for Spark 3.x.

steps

First, download and unzip the (latest) version of Apache Spark. At the time of writing, this is 3.0.0. Then, extract it to the desired directory.

I wil demonstrate how to use pyspark. Certain environment variables need to be set up:

import os

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk'
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf/'

# download your desired version of spark and unzip it
os.environ['SPARK_HOME'] ='/path/to/spark-3.0.0-bin-hadoop3.2'

hdp_classpath = !hadoop --config /usr/hdp/current/spark2-client/conf classpath
os.environ['SPARK_DIST_CLASSPATH'] = hdp_classpath[0]

We must specify the exact HDP version during startup of spark:

hdp_current_version = !ls /usr/hdp
hdp_current_version = hdp_current_version[1]

The next part of the code assumes that a kinit is already executed successfully, i.e. kerberos authentication is already applied. Now, lets start spark.

I was testing this on python 3. This is not supported by Cloduera (HDP). They still use python 2 for spark-submit.

This means that periodic warnings due to the prelaunch script in python 2 will appear in the logs (but can be ignored) due to python 2/3 incompatibility.

When starting spark we allow it to read Hive 3 catalog, but this still does not mean that ACID tables would be supported.

# Required Classes
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, expr, rand, when, count, col

spark = SparkSession.builder.appName("custom spark") \
    .master("yarn") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.yarn.queue", "myqueue") \
    .config("spark.driver.extraJavaOption", f"'-Dhdp.version={hdp_current_version}'") \
    .config("spark.yarn.am.extraJavaOptions", f"'-Dhdp.version={hdp_current_version}'") \
    .config("spark.hadoop.metastore.catalog.default", "hive") \
    .config("spark.yarn.dist.files", "/usr/hdp/current/spark2-client/conf/hive-site.xml") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .enableHiveSupport()  \
    .getOrCreate()

spark.sql("show databases").show()
+--------------------+
|           namespace|
+--------------------+
|                 foo|
|                 bar|
+--------------------+
only showing top 20 rows

Let’s introduce Koalas (pandas API on Spark). I had been looking forward to using koalas for a while. But the outdated version of spark was not supported. It offers a scalable pandas native API which executes the transformations on top of Spark - as well as a niceer (pandas) representation of data frames. The following is based on https://koalas.readthedocs.io/en/latest/getting_started/10min.html:

import databricks.koalas as ks
ks.DataFrame(spark.sql("show databases"))

namespace
0foo
1bar

Koalas

Following is a brief example of how to go further with Koalas:

import pandas as pd 
import numpy as np

dates = pd.date_range('20130101', periods=6)
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf

ABCD
2013-01-01-0.198131-0.6700620.571255-0.986475
2013-01-020.3301741.699464-0.8489921.025219
2013-01-03-1.4008030.4841820.8614851.106958
2013-01-040.765681-1.187109-0.6048270.676196
2013-01-050.0083921.2625561.8694280.149154
2013-01-061.261549-1.675407-0.134585-1.498401
kdf = ks.from_pandas(pdf)
kdf

ABCD
2013-01-01-0.198131-0.6700620.571255-0.986475
2013-01-03-1.4008030.4841820.8614851.106958
2013-01-050.0083921.2625561.8694280.149154
2013-01-020.3301741.699464-0.8489921.025219
2013-01-040.765681-1.187109-0.6048270.676196
2013-01-061.261549-1.675407-0.134585-1.498401
kdf.describe()

ABCD
count6.0000006.0000006.0000006.000000
mean0.127810-0.0143960.2856280.078775
std0.9154621.3697161.0176901.089681
min-1.400803-1.675407-0.848992-1.498401
25%-0.198131-1.187109-0.604827-0.986475
50%0.008392-0.670062-0.1345850.149154
75%0.7656811.2625560.8614851.025219
max1.2615491.6994641.8694281.106958
%matplotlib inline
from matplotlib import pyplot as plt
pser = pd.Series(np.random.randn(1000),
                 index=pd.date_range('1/1/2000', periods=1000))

kser = ks.Series(pser)
kser = kser.cummax()
kser.plot()
<AxesSubplot:>

png

pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
                   columns=['A', 'B', 'C', 'D'])
kdf = ks.from_pandas(pdf)
kdf = kdf.cummax()
kdf.plot()
<AxesSubplot:>

png

summary

We have looked at how to start the latest version of Apache Spark (3.x) on the Hortonworks Data Platform HDP 3.1. Using YARN as a general system to allocate compute resources (basically like in a cloud) we can execute arbitrary versions of Spark easily, even though these are not supported out of the box by a specific distributor.

Georg Heiler
Georg Heiler
Researcher & data scientist

My research interests include large geo-spatial time and network data analytics.