OSM to Spark
Open Steet Map is a common provider for maps. Their raw datastructure is composed of Nodes, Ways and relations. A classical PostGIS import of larger quantities can take fairly long. You might want to speed it up using spark. Or your motivation could also be that you want to analyze the whole OSM community.
Initially, you need to ingest the PBF files to your cluster (HDFS, S3) in a big-data-friendly file format. The regular and compressed binary OSM PBF files cannot easily be processed on multiple nodes in parallel.
To perform this conversion you have two options:
- https://github.com/mojodna/osm2orc ORC, single file, more active, osmosis integration
- https://github.com/adrianulbona/osm-parquetizer Parquet, one file per table, more github stars
I chose the second option, namely the osm-parquetizer as it has more github stars (44 vs. 27) and I prefer separate tables as this will make it easier to handle the data later on.
ingestion
As I am currently based in Austria, I will download the OSM map of Austria from Geofabrik, do note though that this conversion is not necessary if you want to download the full planet as osm-data.skobbler.net kindly provides it daily.
Note: the idea outlined here is to perform a periodic full ingest of the OSM data
,uid, user_sid, changeset
). https://download.openstreetmap.fr/extracts/europe/ would still make them available.
As I do not need these user identifiers, I will drop the columns.Initially, you need to buld the parquetizer:
git clone git@github.com:adrianulbona/osm-parquetizer.git
cd osm-parquetizer
git checkout e7f126abbf7cc4170e23048b62201214ceb07bfd # maybe use a more recent commit
mvn clean package
The next step is to download the map data:
wget https://download.geofabrik.de/europe/austria-latest.osm.pbf
Conversion from PBF to parquet is performed like:
java -jar target/osm-parquetizer-1.0.1-SNAPSHOT.jar austria-latest.osm.pbf
which will result in three files, one each for node, way, relation.
When starting your spark-shell
, it is important to pass spark.sql.parquet.binaryAsString=true
, as you otherwise will get nasty Array[Byte]
insstead of Strings:
spark-shell --driver-memory 10g --conf spark.sql.parquet.binaryAsString=true
Read the data and add a current paritioning date. As the tags are only returned as a struct of key, value instead of a nice Map, I manually parse it to be a MapType
:
import org.apache.spark.sql._
def toMap(tupesArray: Seq[Row]): Option[Map[String, String]] = {
if (tupesArray == null) {
None
} else {
val tuples = tupesArray.map(e => {
(
e.getAs[String]("key"),
e.getAs[String]("value")
)
})
Some(tuples.toMap)
}
}
def handleCommon(currentDate:String = "20190430")(df:DataFrame):DataFrame = {
val toMapUDF = udf(toMap _)
df.withColumn("dt", lit(currentDate))
.drop("uid", "user_sid", "changeset")
.withColumn("tags", toMapUDF(col("tags")))
.withColumn("timestamp", from_unixtime(col("timestamp") / 1000))
}
val node = spark.read.parquet("<</path/to/node.parquet>>").transform(handleCommon())
val way = spark.read.parquet("<</path/to/way.parquet>>").transform(handleCommon())
val relation = spark.read.parquet("<</path/to/relation.parquet>>").transform(handleCommon())
For easier handling in hive I will convert them to ORC when storing the files into the metastore. Additionally, some columns are renamed, as I prefer the more explicit x_long_wgs84
instead of longitude
.
spark.sql("SET spark.sql.shuffle.partitions=20") // for AT the data is rahter small, keep the number of files also smaller than the default of 200
node
.withColumnRenamed("latitude", "y_lat_wgs84")
.withColumnRenamed("longitude", "x_long_wgs84")
.orderBy("id")
.write.mode(SaveMode.Overwrite).partitionBy("dt").option("compression", "zlib").format("orc").saveAsTable(s"my_db.osm_node")
way
.orderBy("id")
.write.mode(SaveMode.Overwrite).partitionBy("dt").option("compression", "zlib").format("orc").saveAsTable(s"my_db.osm_way")
relation
.orderBy("id")
.write.mode(SaveMode.Overwrite).partitionBy("dt").option("compression", "zlib").format("orc").saveAsTable(s"my_db.osm_relation")
If you are on a spark cluster with 2.3 or higher, you should consider to apply bucketing as there will be many joins to retrieve geometries from the geo-spatial OSM graph. Bucketing avoids unncessary shuffles.
Unfortunately, at least spark in version 2.3 is required to have a proper hive integration (less divergence in bucketing semantics between Hive and Spark) .
import org.apache.spark.sql._
node
.withColumnRenamed("latitude", "y_lat_wgs84")
.withColumnRenamed("longitude", "x_long_wgs84")
.write.mode(SaveMode.Overwrite)
.partitionBy("dt")
.sortBy("id")
.bucketBy(20,"id")
.option("compression", "zlib")
.format("orc")
.saveAsTable(s"my_db.osm_node")
way
.withColumnRenamed("latitude", "y_lat_wgs84")
.withColumnRenamed("longitude", "x_long_wgs84")
.write.mode(SaveMode.Overwrite)
.partitionBy("dt")
.sortBy("id")
.bucketBy(20,"id")
.option("compression", "zlib")
.format("orc")
.saveAsTable(s"my_db.osm_way")
relation
.withColumnRenamed("latitude", "y_lat_wgs84")
.withColumnRenamed("longitude", "x_long_wgs84")
.write.mode(SaveMode.Overwrite)
.partitionBy("dt")
.sortBy("id")
.bucketBy(20,"id")
.option("compression", "zlib")
.format("orc")
.saveAsTable(s"my_db.osm_relation")
results
Concgratulations, you now should have three separate tables, one for each node, way and relation. Their schem and content should look similar to mine below:
Node
root
|-- id: long (nullable = true)
|-- version: integer (nullable = true)
|-- timestamp: string (nullable = true)
|-- tags: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- dt: string (nullable = false)
+------+-------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+----------+--------+
|id |version|timestamp |tags |latitude |longitude |dt |
+------+-------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+----------+--------+
|69 |2 |2019-03-03 16:02:52|Map(name:ko -> 푸킹, name -> Fucking, name:lt -> Fukingas, name:el -> Φούκινγκ, population -> 111, population:date -> 2018-01-01, name:uk -> Фукінг, name:ur -> فکنگ، آسٹریا, name:he -> פוקינג, wikidata -> Q172668, name:zh -> 富金, wikipedia -> de:Fucking, name:fa -> فاکینگ, name:ja -> フッキング, name:yue -> 福擎, place -> hamlet, name:ar -> فاكينغ, name:ru -> Фуккинг, name:sr -> Фукинг, name:lv -> Fukinga, name:th -> ฟุกกิง, name:hy -> Ֆուքինգ)|48.0674106 |12.8623448|20190430|
|155843|3 |2010-02-21 12:45:22|Map(TMC:cid_58:tabcd_1:Class -> Point, TMC:cid_58:tabcd_1:LCLversion -> 9.00, TMC:cid_58:tabcd_1:LocationCode -> 60779, TMC:cid_58:tabcd_1:Direction -> positive, TMC:cid_58:tabcd_1:PrevLocationCode -> 60778, TMC:cid_58:tabcd_1:NextLocationCode -> 60780) |47.702766700000005|12.159043 |20190430|
+------+-------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+----------+--------+
Way
root
|-- id: long (nullable = true)
|-- version: integer (nullable = true)
|-- timestamp: string (nullable = true)
|-- tags: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- nodes: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- index: integer (nullable = true)
| | |-- nodeId: long (nullable = true)
|-- dt: string (nullable = false)
+------+-------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|id |version|timestamp |tags |nodes |dt |
+------+-------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|149673|21 |2018-12-06 16:06:16|Map(name -> Waidhausenstraße, surface -> asphalt, maxspeed -> 30, highway -> tertiary, lanes -> 2, source:maxspeed -> AT:zone, oneway -> no)|[[0,377950], [1,5875952787], [2,378435], [3,378436], [4,6121720050], [5,291535793], [6,4364942678], [7,291536343], [8,1436632383], [9,581546], [10,5942962961], [11,3581236718], [12,672214], [13,3531012147], [14,3581236693], [15,1832915324], [16,580257], [17,672211], [18,291538694], [19,1832915339], [20,3096027066], [21,378439]]|20190430|
|149694|5 |2018-02-14 20:24:23|Map(name -> Klinkowströmgasse, surface -> asphalt, maxspeed -> 30, highway -> residential, source:maxspeed -> AT:zone, oneway -> yes) |[[0,580253], [1,580255]] |20190430|
+------+-------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
Relation
root
|-- id: long (nullable = true)
|-- version: integer (nullable = true)
|-- timestamp: string (nullable = true)
|-- tags: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- members: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- role: string (nullable = true)
| | |-- type: string (nullable = true)
|-- dt: string (nullable = false)
+---+-------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+--------+
|id |version|timestamp |tags |members |dt |
+---+-------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+--------+
|75 |18 |2018-04-17 16:59:01|Map(name -> Untere Alte Donau, natural -> water, type -> multipolygon, water -> oxbow) |[[174269668,inner,Way], [27057698,inner,Way], [543129865,outer,Way], [8129965,outer,Way], [496339945,outer,Way], [496339943,outer,Way]]|20190430|
|476|6 |2014-09-24 19:50:00|Map(name -> Karl-Franzens-Universität Hauptgebäude, addr:street -> Universitätsplatz, addr:country -> AT, addr:postcode -> 8010, amenity -> university, addr:housenumber -> 3, building -> public, addr:city -> Graz, operator -> Karl-Franzens-Universität, type -> multipolygon)|[[4399927,outer,Way], [8143951,inner,Way]] |20190430|
+---+-------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+--------+