Scaling geospatial data processing in R

bring sf to spark in production

Optimized distrubted spatial join

R users traditionally use the sf package to process geospatial operations. This is limited to a single compute node, often a single thread and to data which fits in memory.

Traditionally scaling this type of data analysis involved heay lifting on the coding side. One example is Uber who propose a solution using ESRi Hive UDF and presto to offer a scalable SQL based interface to geospatial operations.

But this requires quite some infrastructure up front and still requires you to code - as the Uber presto UDFs are not open source yet.

Instead in the spark ecosystem geospark has evolved and now not only offers a code based low level API but offers SQL functionality:

SELECT *
FROM polygondf, pointdf
WHERE ST_Contains(polygondf.polygonshape,pointdf.pointshape)

This allows you to execute a lot of geospatial operations and especially optimizes for spatial joins. There the data is partitioned among the cluster nodes taking its spatial extent under consideration.

For many users however scala, sql are not their preferred tool for handling data, but rather R. sparklyR offered an integration between r and spark for some while, but lacked support for geospark.

Recently, the geospark R package was introduced to make handling of large distributed geospatial data for R users as simple as executing local sf functions.

Below is a minimal examle which shows how to use it.

Example

installation

devtools::install_github("harryprince/geospark")

starting spark

Then instanciate your spark session. But add additional configuration to use the GeoSpark kryo serializer to improve efficiency.

library(sparklyr)
library(geospark)
library(dplyr)

conf <- spark_config()
conf$`sparklyr.shell.driver-memory` <- "8G"
# Enable kryo to decrease serialization overhead
conf$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"
conf$spark.kryo.registrator <- "org.datasyslab.geospark.serde.GeoSparkKryoRegistrator"

spark <- spark_connect(master = "local", config = conf)

register spatial SQL functions with spark:

register_gis(spark)

Now load some datasets:

polygons <- read.table(system.file(package="geospark","examples/polygons.txt"), sep="|", col.names=c("area","geom"))
points <- read.table(system.file(package="geospark","examples/points.txt"), sep="|", col.names=c("city","state","geom"))

and move them to spark

polygons_wkt <- copy_to(spark, polygons)
points_wkt <- copy_to(spark, points)

execute a distributed spatial join

Now we can perform a GeoSpatial join using the st_contains predicate which converts well known text (wkt) into geometry objects after parsing the plain coordinates from WKT format to geometries:

mutate(polygons_wkt, y = st_geomfromwkt(geom)) %>%
  select(-geom)%>%
  sdf_register("poly_geometry")
mutate(points_wkt, x = st_geomfromwkt(geom)) %>%
  select(-geom) %>%
  sdf_register("point_geometry")

result <- sdf_sql(spark, "SELECT * FROM poly_geometry, point_geometry WHERE ST_Intersects(y, x)")
result %>%
  head

NOTE: some SQL is still required as the underlying R packages, namely dbplyr do not yet support this type of spatial join / generate clean enough SQL for the geospark optimizer to work.

Finally, close the spark session again.

spark_disconnect(spark)

Summary

The geospark R package allows tidyverse R users to simply scale out their geospatial processing capabilities. But writing some SQL code is still necessary to obtain the optimizations to make it work beyond minimal sample datasets.

Georg Heiler
Georg Heiler
Researcher & data scientist

My research interests include large geo-spatial time and network data analytics.