Analyze OSM data in spark
This article assumes that you already possess open-street-map data in a format similar to the one described here.
analyzing the OSM community
Load the dataframes:
val node = spark.sql("SELECT * FROM my_db.osm_node").drop("dt")
val way = spark.sql("SELECT * FROM my_db.osm_way").drop("dt")
val relation = spark.sql("SELECT * FROM my_db.osm_relation").drop("dt")
As only a single partition is ingested with this sample data, it is fine to drop the partitioning column as it is constant anyhow. In a more production ready setup remember to select a fitting partition value for all three tables.
How did the number of nodes evolve over the years? (inspired by):
node.withColumn("year", year(col("timestamp")))
.groupBy("year")
.agg(count("*").alias("nodes_per_year"))
.orderBy(desc("year"))
.show
uid
, user_sid
, changeset
to directly track the OSM community read https://osm-internal.download.geofabrik.de/index.html and get the raw data files from a source where these are not dropped.generate geometry from the graph
A simple case - extract points (x,y) for each hospital:
node.filter(col("tags").getItem("amenity") === "hospital")
.select('id, 'x_long_wgs84, 'y_lat_wgs84)
.show(5, false)
results in:
+----------+------------------+------------------+
| id| x_long_wgs84| y_lat_wgs84|
+----------+------------------+------------------+
| 482297303| 16.509893| 47.507452|
| 702520462| 12.9887397| 46.6995002|
|....
| 334634341|16.257896000000002| 48.0913958|
|5626135755| 15.4172255| 47.0597672|
+----------+------------------+------------------+
Now, a little bit more complex: find the geometries of highways. Usually posexplode
(way.selectExpr("*", "posexplode(nodes) as (indexedNodeIndex,indexedNode)").drop("nodes")
) is required to keep the original order of the elements inside the array. But in this case the index
is already provided so a simple explode
is good enough.
val highways = way.filter('tags.getItem("highway").isNotNull)
.select($"id".as("wayId"), explode($"nodes").as("indexedNode"))
val intermediateNodes = node.select($"id".as("nodeId"), $"x_long_wgs84", $"y_lat_wgs84")
val wayGeometries = highways.join(intermediateNodes, $"indexedNode.nodeId" === $"nodeId")
.groupBy("wayId")
.agg(sort_array(collect_list(struct($"indexedNode.index", $"y_lat_wgs84", $"x_long_wgs84"))).as("geometry"))
The result looks like:
root
|-- wayId: long (nullable = true)
|-- geometry: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- index: integer (nullable = true)
| | |-- y_lat_wgs84: double (nullable = true)
| | |-- x_long_wgs84: double (nullable = true)
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|wayId |geometry |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|374036 |[[0,48.1923654,16.273967900000002], [1,48.192309800000004,16.2739489], [2,48.192234000000006,16.2739279], [3,48.192175500000005,16.2739176]] |
|2975945|[[0,47.559291900000005,13.8593094], [1,47.559302100000004,13.859371600000001], [2,47.55932120000001,13.859477], [3,47.559189200000006,13.859636], [4,47.558987,13.859803300000001], [5,47.5588545,13.859868800000001], [6,47.5585602,13.8601109], [7,47.5584739,13.860249600000001], [8,47.558379800000004,13.860469100000001], [9,47.558193900000006,13.860748500000001]]|
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Now adding some JTS magic or a couple of string operations easily yield either a geometry object for further processing or a WKT linestring.
Converting relations is left as an exercise for the reader. It is a bit more complex, but a good introduction can be found here as ways must be reassembled recursively.