Typesafe data analytics

Making spark jobs more typesafe and convenient.

Spark offers several APIs (RDD, data frames and datasets) which either are not typesafe enough or incur a considerable overhead due to JVM object instantiation & serialization. Frameless claims to fix this via type classes.

Spark offers several APIs (RDD, data frames and datasets) which either are not typesafe enough or incur a considerable overhead due to JVM object instantiation & serialization. Frameless claims to fix this via type classes.

Spark data set API suggerates a lot of type safety, however, it is not optimal. Here some sample Code:

case class User(id: Int, name: String)`
val users = Seq(User(1, "Anna"), User(2, "Bob")).toDS`
users.select('id).show

So far so good. But a typo like:

users.select('idError).show

will throw a nasty runtime error like

org.apache.spark.sql.AnalysisException: cannot resolve '`idError`' given input columns: [id, name];;
'Project ['idError]
+- LocalRelation [id#2, name#3]
at org.apache.spark.sql.catalyst.analysis.packa

But there are some alternatives. - using low level RDD API. Not elegant, no tungsten optimizations, no encoders. - using frameless https://github.com/typelevel/frameless Using the same selection as before - but now via frameless properly fails at compile time.

fUsers.project[UInfoError]
Cannot prove that User can be projected to UInfoError. Perhaps not all member names and types of UInfoError are the same in User?
fUsers.project[UInfoError]

Comparison of typesafe datasets via frameless with spark internal’s

Let’s load some more data:

case class User(id: Int, name: String)
case class Event(eventId: Int, userId: Int)
case class UInfo(name: String)
case class UInfoError(nameError: String)
val events = Seq(Event(101, 1),
Event(102, 2),
Event(103, 1),
Event(104, 2),
Event(105, 1),
Event(106, 2),
Event(107, 1),
Event(108, 2)).toDS
val fUsers = TypedDataset.create(users)
val fEvents = TypedDataset.create(events)
scala> users.show
+---+----+
| id|name|
+---+----+
| 1|Anna|
| 2| Bob|
+---+----+
scala> users.printSchema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
scala> users.explain
== Physical Plan ==
LocalTableScan [id#2, name#3]
scala> events.printSchema
root
|-- eventId: integer (nullable = false)
|-- userId: integer (nullable = false)
scala> events.show
+-------+------+
|eventId|userId|
+-------+------+
| 101| 1|
| 102| 2|
...

For the typed datasets the output should be the same. Also the DAG is the same. Typesafety here does not incurr additional serialization overhead.

fUsers.show().run
+---+----+
| id|name|
+---+----+
| 1|Anna|
| 2| Bob|
+---+----+
scala> fUsers.printSchema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
scala> fUsers.explain()
== Physical Plan ==
LocalTableScan [id#2, name#3]
scala> fEvents.show().run
+-------+------+
|eventId|userId|
+-------+------+
| 101| 1|
| 102| 2|
...

I see two problems with spark: > - spark’s string based referencing of columns is not refactorable nicely in an IDE > - typing issues or mostly show up at runtime. The same goes for typos when referencing columns.

Below code is using dataframe and dataset API. DAGs are all the same.

// cant refactor 'id nicely
users.select('id).show
+---+
| id|
+---+
| 1|
| 2|
+---+
scala> users.select('id).explain
== Physical Plan ==
LocalTableScan [id#2]
scala> users.select('id).as[Int].show
+---+
| id|
+---+
| 1|
| 2|
+---+
scala> users.select('id).as[Int].explain
== Physical Plan ==
LocalTableScan [id#2]

Now compare the same operations with frameless:

fUsers.select(fUsers('name)).show().run
+----+
| _1|
+----+
|Anna|
| Bob|
+----+
scala> fUsers.select(fUsers('name)).explain()
== Physical Plan ==
LocalTableScan [_1#171]

It works just as expected and no additional overhead degrades performance. The only downside is that _1 is displayed instead of the correct string alias of the column name, i.e. name for this example. However, fUsers(‘id) still can’t be refactored in an IDE nicely. Both problems can be fixed by referencing a class and using a projection:

fUsers.project[UInfo].show().run
+----+
|name|
+----+
|Anna|
| Bob|
+----+
scala> fUsers.project[UInfo].explain()
== Physical Plan ==
LocalTableScan [name#3]

Instead of project also drop can be used. It is less general, but but currently acts as a synonym. Both positively select (i.e. project) the data frame and do not work like regular spark-sql df.drop(“foo”) which removes the referenced column.

fUsers.drop[UInfo].show().run
+----+
|name|
+----+
|Anna|
| Bob|
+----+
scala> fUsers.drop[UInfo].explain()
== Physical Plan ==
LocalTableScan [name#3]

Naively type safety can be added by resorting to sparks RDD API. However catalyst and tugsten optimizations are not present and additional serialization overhead applies:

users.map(_.id).show
println(users.map(_.id).explain)
== Physical Plan ==
*SerializeFromObject [input[0, int, false] AS value#246]
+- *MapElements <function1>, obj#245: int
+- *DeserializeToObject newInstance(class $line10.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$User), obj#244: $line10.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$User
+- LocalTableScan [id#2, name#3]

Looking at joins, spark offers join and joinWith where the latter is rather similar to the joins frameless is offering. Below the code for join:

val joinedDF = events.join(users, events("userId") === users("id")).drop('id)
joinedDF.show
+-------+------+----+
|eventId|userId|name|
+-------+------+----+
| 101| 1|Anna|
...
scala> joinedDF.explain
== Physical Plan ==
*Project [eventId#7, userId#8, name#3]
+- *BroadcastHashJoin [userId#8], [id#2], Inner, BuildRight
:- LocalTableScan [eventId#7, userId#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#2, name#3]
scala> joinedDF.printSchema
root
|-- eventId: integer (nullable = false)
|-- userId: integer (nullable = false)
|-- name: string (nullable = true)

Now for joinWith, sparks dataset API:

val joinedDS = events.joinWith(users, events.col("userId") === users.col("id")).drop('id)
joinedDS.show
-------+--------+
| _1| _2|
+-------+--------+
|[101,1]|[1,Anna]|
...
scala> joinedDS.printSchema
root
|-- _1: struct (nullable = false)
| |-- eventId: integer (nullable = false)
| |-- userId: integer (nullable = false)
|-- _2: struct (nullable = false)
| |-- id: integer (nullable = false)
| |-- name: string (nullable = true)
scala> joinedDS.explain
== Physical Plan ==
*BroadcastHashJoin [_1#275.userId], [_2#276.id], Inner, BuildRight
:- LocalTableScan [_1#275]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, struct<id:int,name:string>, false].id as bigint)))
+- LocalTableScan [_2#276]

The generated query plans (DAGs) are almost the same for dataframes and data sets. Only the resulting schema is a bit different.

The same goes for frameless:

val fJoined = fUsers.joinInner(fEvents) {
fUsers('id) === fEvents('userId)
} // TODO drop column id ############################
fJoined.show().run
+--------+-------+
| _1| _2|
+--------+-------+
|[1,Anna]|[107,1]|
...
scala> fJoined.explain()
== Physical Plan ==
*Project [named_struct(id, id#2, name, name#3) AS _1#293, named_struct(eventId, eventId#7, userId, userId#8) AS _2#294]
+- *BroadcastHashJoin [id#2], [userId#8], Inner, BuildRight
:- LocalTableScan [id#2, name#3]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, false] as bigint)))
+- LocalTableScan [eventId#7, userId#8]
fJoined.printSchema
root
|-- _1: struct (nullable = false)
| |-- id: integer (nullable = false)
| |-- name: string (nullable = true)
|-- _2: struct (nullable = false)
| |-- eventId: integer (nullable = false)
| |-- userId: integer (nullable = false)

Applying some filtering logic after the join (could be optimized by hand and moved before the join in this case):

val filteredDF = joinedDF.filter('eventId >= 105)
filteredDF.show
-------+------+----+
|eventId|userId|name|
+-------+------+----+
| 105| 1|Anna|
| 106| 2| Bob|
| 107| 1|Anna|
| 108| 2| Bob|
+-------+------+----+
scala> filteredDF.explain
== Physical Plan ==
*Project [eventId#7, userId#8, name#3]
+- *BroadcastHashJoin [userId#8], [id#2], Inner, BuildRight
:- *Filter (eventId#7 >= 105)
: +- LocalTableScan [eventId#7, userId#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#2, name#3]

Prior to filtering the schema needs to be normalized:

val normalized = fJoined
.select(
fJoined.colMany('_2, 'eventId),
fJoined.colMany('_2, 'userId),
fJoined.colMany('_1, 'name)
)
.as[UserEvent]
val fFiltered = normalized.filter(normalized('eventId) >= 105)
fFiltered.show().run
fFiltered.explain()
== Physical Plan ==
*Project [eventId#7, userId#8, name#3]
+- *BroadcastHashJoin [id#2], [userId#8], Inner, BuildRight, (named_struct(eventId, eventId#7, userId, id#2).eventId >= 105)
:- LocalTableScan [id#2, name#3]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, false] as bigint)))
+- *Filter (named_struct(eventId, eventId#7, userId, userId#8).eventId >= 105)
+- LocalTableScan [eventId#7, userId#8]

At any time one can easily turn back to sparks API via

fFiltered.dataset.show

shortcomings

  • renaming in selection required df.select(df('name)).show().run only returns an unnamed column +----+ | _1 | +----+ |Anna| | Bob| +----+
  • semantics of drop are not intuitive. Instead a regular projection is applied
  • verbose for a large number of columns
  • IDE support (intellij) is not working for all the tricks. There are some problems when normalizing the schema from a join due to shapeless & macro tricks.

what I like

  • compile time type safety
  • simple operations remain simple
  • improved support for refactoring

summary

Frameless still is marked as experimental, but already makes a lot of operations more secure and type safe without adding performance penalty. However, might still require some improvements to easily handle dataframes with a large number of columns.

To follow along install - JDK 8 - a current versio nof sbt and execute sbt new geoHeil/sparkMultiProjectTemplate.g8 to get the sample code

Georg Heiler
Georg Heiler
Researcher & data scientist

My research interests include large geo-spatial time and network data analytics.