# Ultimate open vector geoprocessing on spark

Combining the best of both world from geomesa and geospark

The marriage of geomesa and geospark

More and more people start to work with large quantities of geospatial data and think about using spark and one of its geospatial additions like geomesa, geospark or geotrellis. And soon they come to realize that one of the tools does not provide a function which would be commonly required. In the case of geomesa, i.e. spatial joins are still a bottleneck. But there would be a nice alternative available: geospark. However, if you go all in with geospark, you have to re-invent the wheel and handcode a lot of the geospatial functions geomesa already offers ad spark-native functions.

In the following I will show you how to combine the best of both worlds.

If you are new to the hadoop world and are not sure if your use case really requires all this mess really look at Postgres 12 and PostGIS 3! http://blog.cleverelephant.ca/2019/05/parallel-postgis-4.html they offer parallel spatial join capabilities.

## the first step

Using your build tool of choice you need to put both dependencies on the classpath. I like to use gradle for this and add all of geomesa, geospark and their subdependencies which are required for nice SQL access to their geospatial functions.

ext {
scalaFullV = "${scalaMinorV}.$scalaPatchV"
sparkFullV = "$sparkOpenV.$sparkOpenVPatch.$hdpV" deps = [geomesaSparkSql : "org.locationtech.geomesa:geomesa-spark-sql_${scalaMinorV}:$geomesaV", geospark : "org.datasyslab:geospark:$geosparkV",
geosparkSql        : "org.datasyslab:geospark-sql_$sparkOpenV:$geosparkV",
sparkCore          : "org.apache.spark:spark-core_${scalaMinorV}:$sparkFullV",
sparkSql           : "org.apache.spark:spark-sql_${scalaMinorV}:$sparkFullV",
sparkHive          : "org.apache.spark:spark-hive_${scalaMinorV}:$sparkFullV",
scalaLib           : "org.scala-lang:scala-library:\$scalaFullV"
]
}


To be able to pull all the downloads be sure to add some additional repositories to the configuration of your build tool:

repositories {
maven { url "https://repo.locationtech.org/content/groups/releases" }
maven { url "http://repo.boundlessgeo.com/main" }
maven { url "http://conjars.org/repo" }
jcenter()


Then, simply add the dependencies to your project in the implementation/compile scope:

dependencies {

compile deps.geomesaSparkSql
compile deps.geospark
compile deps.geosparkSql

compileOnly deps.sparkCore
compileOnly deps.sparkSql
compileOnly deps.sparkHive
compileOnly deps.scalaLib

}


## optimized serialization

By default, spark uses java serialization, which is less efficeint than a properly registered kryo serializer. So be sure to also fuse the serializer both geomesa and geospark provide in similar fashion:

val commonReg = new CommonKryoRegistrator
val geomesaReg = new GeoMesaSparkKryoRegistrator
val geosparkKryoReg = new GeoSparkKryoRegistrator

commonReg.registerClasses(kryo)
geomesaReg.registerClasses(kryo)
geosparkKryoReg.registerClasses(kryo)

// further required geo-spatial classes
kryo.register(
classOf[
scala.Array[org.datasyslab.geospark.spatialRddTool.StatCalculator]])
kryo.register(
classOf[org.datasyslab.geospark.spatialRddTool.StatCalculator])
kryo.register(Class.forName("[[B"))


As both do provide some overlapping UDF (have name clashes) it is important to register one of them with a prefix. I choose to prefix geospark UDFs as follows:

UdtRegistratorWrapper.registerAll()
Catalog.expressions.foreach(f => sparkSession.sessionState.functionRegistry.registerFunction("geospark_" + f.getClass.getSimpleName.dropRight(1), f))
Catalog.aggregateExpressions.foreach(f => sparkSession.udf.register("geospark_" + f.getClass.getSimpleName, f))


## optimized spatial joins

Unfortunately, if you look at the execution plan for a spatial join:

all the geospark magic seems to be lost. But it is easy to bring back efficient spatial joins:

sparkSession.experimental.extraStrategies = JoinQueryDetector :: Nil


## using them both together

To use them togehter a couple of steps are needed:

// 1. use your combined kryo registrator
val spark = SparkSession
.builder()
.config(new SparkConf()
.setAppName("geomesaGeospark")
.setMaster("local[*]")
.setIfMissing("spark.serializer",
classOf[KryoSerializer].getCanonicalName)
.setIfMissing("spark.kryo.registrator",
classOf[SpatialKryoRegistrator].getName)
.getOrCreate()

// 2. register geospark functions with prefix
CustomGeosparkRegistrator.registerAll(spark)
// 3. register geomesa
spark.withJTS

// now they are all available
spark.sessionState.functionRegistry.listFunction.foreach(println)


And they can be used similarly to:

// perform some sptial computations in geomesa (these functions are not available in geospark
val polygonsEnriched = polygons
.withColumn("centeroid_x", st_x(st_centroid(st_geomFromWKT(col("wkt")))))
.withColumn("centeroid_y", st_y(st_centroid(st_geomFromWKT(col("wkt")))))

polygonsEnriched.show

// perform a spatial join, first create spatial binary geometry types using geospark functions, then join
val pointsGeom = points.withColumn("geom_points", expr(s"geospark_ST_Point(x, y)"))
pointsGeom.show
val polygonsGeom = polygonsEnriched.withColumn("geom_polygons", expr(s"geospark_ST_GeomFromWKT(wkt)"))
polygonsGeom.show

// TODO 1) how can I get rid of the textual SQL below and actually use DSL expressions? Or at least expr()? Seems only to work on map side operations as above
pointsGeom.createOrReplaceTempView("points")
polygonsGeom.createOrReplaceTempView("polygons")
val joinedDF =
spark.sql(
"""
|SELECT *
|FROM polygons, points
|WHERE geospark_ST_Contains(polygons.geom_polygons, points.geom_points)
""".stripMargin)

joinedDF.show(false)

println(joinedDF.count)


# conclusion

All the code can be found here

if you like the marriage of geospark and geomesa then star the draft repository https://github.com/geoHeil/geomesa-geospark!

If you really are interested in moving the community forward help to work on the integration of Geomesa nd Geospark as a unified package so this is a more straight forward and integrated process in the future.

##### Georg Heiler
###### PhD candidate & data scientist

My research interests include large geo-spatial time and network data analytics.