OSM to Spark

Large processing of open-street-map data

Open Street Map

Open Steet Map is a common provider for maps. Their raw datastructure is composed of Nodes, Ways and relations. A classical PostGIS import of larger quantities can take fairly long. You might want to speed it up using spark. Or your motivation could also be that you want to analyze the whole OSM community.

Initially, you need to ingest the PBF files to your cluster (HDFS, S3) in a big-data-friendly file format. The regular and compressed binary OSM PBF files cannot easily be processed on multiple nodes in parallel.

To perform this conversion you have two options:

I chose the second option, namely the osm-parquetizer as it has more github stars (44 vs. 27) and I prefer separate tables as this will make it easier to handle the data later on.

ingestion

As I am currently based in Austria, I will download the OSM map of Austria from Geofabrik, do note though that this conversion is not necessary if you want to download the full planet as osm-data.skobbler.net kindly provides it daily.

Note: the idea outlined here is to perform a periodic full ingest of the OSM data

For GDPR reasons the geofabrik data https://osm-internal.download.geofabrik.de/index.html drops identifiers of open-street-map individual contributors (,uid, user_sid, changeset). https://download.openstreetmap.fr/extracts/europe/ would still make them available. As I do not need these user identifiers, I will drop the columns.

Initially, you need to buld the parquetizer:

git clone git@github.com:adrianulbona/osm-parquetizer.git
cd osm-parquetizer
git checkout e7f126abbf7cc4170e23048b62201214ceb07bfd # maybe use a more recent commit
mvn clean package

The next step is to download the map data:

wget https://download.geofabrik.de/europe/austria-latest.osm.pbf

Conversion from PBF to parquet is performed like:

java -jar target/osm-parquetizer-1.0.1-SNAPSHOT.jar austria-latest.osm.pbf

which will result in three files, one each for node, way, relation.

When starting your spark-shell, it is important to pass spark.sql.parquet.binaryAsString=true, as you otherwise will get nasty Array[Byte] insstead of Strings:

spark-shell --driver-memory 10g --conf spark.sql.parquet.binaryAsString=true

Read the data and add a current paritioning date. As the tags are only returned as a struct of key, value instead of a nice Map, I manually parse it to be a MapType:

import org.apache.spark.sql._
def toMap(tupesArray: Seq[Row]): Option[Map[String, String]] = {
    if (tupesArray == null) {
      None
    } else {
      val tuples = tupesArray.map(e => {
        (
          e.getAs[String]("key"),
          e.getAs[String]("value")
        )
      })
      Some(tuples.toMap)
    }
  }

def handleCommon(currentDate:String = "20190430")(df:DataFrame):DataFrame = {
  val toMapUDF = udf(toMap _)
  df.withColumn("dt", lit(currentDate))
    .drop("uid", "user_sid", "changeset")
    .withColumn("tags", toMapUDF(col("tags")))
    .withColumn("timestamp", from_unixtime(col("timestamp") / 1000))
}

val node = spark.read.parquet("<</path/to/node.parquet>>").transform(handleCommon())
val way = spark.read.parquet("<</path/to/way.parquet>>").transform(handleCommon())
val relation = spark.read.parquet("<</path/to/relation.parquet>>").transform(handleCommon())

For easier handling in hive I will convert them to ORC when storing the files into the metastore. Additionally, some columns are renamed, as I prefer the more explicit x_long_wgs84 instead of longitude.

spark.sql("SET spark.sql.shuffle.partitions=20") // for AT the data is rahter small, keep the number of files also smaller than the default of 200
 node
   .withColumnRenamed("latitude", "y_lat_wgs84")
   .withColumnRenamed("longitude", "x_long_wgs84")
   .orderBy("id")
   .write.mode(SaveMode.Overwrite).partitionBy("dt").option("compression", "zlib").format("orc").saveAsTable(s"my_db.osm_node")

way
  .withColumnRenamed("latitude", "y_lat_wgs84")
  .withColumnRenamed("longitude", "x_long_wgs84")
  .orderBy("id")
  .write.mode(SaveMode.Overwrite).partitionBy("dt").option("compression", "zlib").format("orc").saveAsTable(s"my_db.osm_way")

relation
  .withColumnRenamed("latitude", "y_lat_wgs84")
  .withColumnRenamed("longitude", "x_long_wgs84")
  .orderBy("id")
  .write.mode(SaveMode.Overwrite).partitionBy("dt").option("compression", "zlib").format("orc").saveAsTable(s"my_db.osm_relation")

If you are on a spark cluster with 2.3 or higher, you should consider to apply bucketing as there will be many joins to retrieve geometries from the geo-spatial OSM graph. Bucketing avoids unncessary shuffles.

Unfortunately, at least spark in version 2.3 is required to have a proper hive integration (less divergence in bucketing semantics between Hive and Spark) .

import org.apache.spark.sql._

node
.withColumnRenamed("latitude", "y_lat_wgs84")
.withColumnRenamed("longitude", "x_long_wgs84")
.write.mode(SaveMode.Overwrite)
  .partitionBy("dt")
  .sortBy("id")
  .bucketBy(20,"id")
  .option("compression", "zlib")
  .format("orc")
  .saveAsTable(s"my_db.osm_node")
way
.withColumnRenamed("latitude", "y_lat_wgs84")
.withColumnRenamed("longitude", "x_long_wgs84")
.write.mode(SaveMode.Overwrite)
  .partitionBy("dt")
  .sortBy("id")
  .bucketBy(20,"id")
  .option("compression", "zlib")
  .format("orc")
  .saveAsTable(s"my_db.osm_way")
relation
.withColumnRenamed("latitude", "y_lat_wgs84")
.withColumnRenamed("longitude", "x_long_wgs84")
.write.mode(SaveMode.Overwrite)
  .partitionBy("dt")
  .sortBy("id")
  .bucketBy(20,"id")
  .option("compression", "zlib")
  .format("orc")
  .saveAsTable(s"my_db.osm_relation")

results

Concgratulations, you now should have three separate tables, one for each node, way and relation. Their schem and content should look similar to mine below:

Node

root
 |-- id: long (nullable = true)
 |-- version: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- dt: string (nullable = false)

 +------+-------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+----------+--------+
|id    |version|timestamp          |tags                                                                                                                                                                                                                                                                                                                                                                                                                                                  |latitude          |longitude |dt      |
+------+-------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+----------+--------+
|69    |2      |2019-03-03 16:02:52|Map(name:ko -> 푸킹, name -> Fucking, name:lt -> Fukingas, name:el -> Φούκινγκ, population -> 111, population:date -> 2018-01-01, name:uk -> Фукінг, name:ur -> فکنگ، آسٹریا, name:he -> פוקינג, wikidata -> Q172668, name:zh -> 富金, wikipedia -> de:Fucking, name:fa -> فاکینگ, name:ja -> フッキング, name:yue -> 福擎, place -> hamlet, name:ar -> فاكينغ, name:ru -> Фуккинг, name:sr -> Фукинг, name:lv -> Fukinga, name:th -> ฟุกกิง, name:hy -> Ֆուքինգ)|48.0674106        |12.8623448|20190430|
|155843|3      |2010-02-21 12:45:22|Map(TMC:cid_58:tabcd_1:Class -> Point, TMC:cid_58:tabcd_1:LCLversion -> 9.00, TMC:cid_58:tabcd_1:LocationCode -> 60779, TMC:cid_58:tabcd_1:Direction -> positive, TMC:cid_58:tabcd_1:PrevLocationCode -> 60778, TMC:cid_58:tabcd_1:NextLocationCode -> 60780)                                                                                                                                                                                         |47.702766700000005|12.159043 |20190430|
+------+-------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+----------+--------+

Way

root
 |-- id: long (nullable = true)
 |-- version: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- nodes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- index: integer (nullable = true)
 |    |    |-- nodeId: long (nullable = true)
 |-- dt: string (nullable = false)

 +------+-------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|id    |version|timestamp          |tags                                                                                                                                        |nodes                                                                                                                                                                                                                                                                                                                                    |dt      |
+------+-------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|149673|21     |2018-12-06 16:06:16|Map(name -> Waidhausenstraße, surface -> asphalt, maxspeed -> 30, highway -> tertiary, lanes -> 2, source:maxspeed -> AT:zone, oneway -> no)|[[0,377950], [1,5875952787], [2,378435], [3,378436], [4,6121720050], [5,291535793], [6,4364942678], [7,291536343], [8,1436632383], [9,581546], [10,5942962961], [11,3581236718], [12,672214], [13,3531012147], [14,3581236693], [15,1832915324], [16,580257], [17,672211], [18,291538694], [19,1832915339], [20,3096027066], [21,378439]]|20190430|
|149694|5      |2018-02-14 20:24:23|Map(name -> Klinkowströmgasse, surface -> asphalt, maxspeed -> 30, highway -> residential, source:maxspeed -> AT:zone, oneway -> yes)       |[[0,580253], [1,580255]]                                                                                                                                                                                                                                                                                                                 |20190430|
+------+-------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+

Relation

root
 |-- id: long (nullable = true)
 |-- version: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- members: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- role: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- dt: string (nullable = false)

 +---+-------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+--------+
|id |version|timestamp          |tags                                                                                                                                                                                                                                                                              |members                                                                                                                                |dt      |
+---+-------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+--------+
|75 |18     |2018-04-17 16:59:01|Map(name -> Untere Alte Donau, natural -> water, type -> multipolygon, water -> oxbow)                                                                                                                                                                                            |[[174269668,inner,Way], [27057698,inner,Way], [543129865,outer,Way], [8129965,outer,Way], [496339945,outer,Way], [496339943,outer,Way]]|20190430|
|476|6      |2014-09-24 19:50:00|Map(name -> Karl-Franzens-Universität Hauptgebäude, addr:street -> Universitätsplatz, addr:country -> AT, addr:postcode -> 8010, amenity -> university, addr:housenumber -> 3, building -> public, addr:city -> Graz, operator -> Karl-Franzens-Universität, type -> multipolygon)|[[4399927,outer,Way], [8143951,inner,Way]]                                                                                             |20190430|
+---+-------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+--------+
Avatar
Georg Heiler
PhD candidate & data scientist

My research interests include large geo-spatial time and network data analytics.

Related

comments powered by Disqus