From 05e22726df37f0cbaf6b140755c76f34cebc32a4 Mon Sep 17 00:00:00 2001 From: Kamil Raczycki Date: Sun, 31 Dec 2023 18:09:36 +0100 Subject: [PATCH] chore: change relations and ways grouping --- quackosm/_rich_progress.py | 2 +- quackosm/pbf_file_reader.py | 412 ++++++++++++++++++++++++------------ 2 files changed, 277 insertions(+), 137 deletions(-) diff --git a/quackosm/_rich_progress.py b/quackosm/_rich_progress.py index e3b88b9..66df159 100644 --- a/quackosm/_rich_progress.py +++ b/quackosm/_rich_progress.py @@ -5,7 +5,7 @@ __all__ = ["TaskProgressSpinner", "TaskProgressBar"] -TOTAL_STEPS = 34 +TOTAL_STEPS = 36 class TaskProgressSpinner: def __init__(self, step_name: str, step_number: str): diff --git a/quackosm/pbf_file_reader.py b/quackosm/pbf_file_reader.py index bb14508..65b181f 100644 --- a/quackosm/pbf_file_reader.py +++ b/quackosm/pbf_file_reader.py @@ -6,6 +6,7 @@ import hashlib import json +import shutil import tempfile import warnings from collections.abc import Iterable @@ -224,8 +225,6 @@ def convert_pbf_to_gpq( explode_tags = self.tags_filter is not None with tempfile.TemporaryDirectory(dir=self.working_directory.resolve()) as tmp_dir_name: - # tmp_dir_name = tmp_dir_name_2 - # tmp_dir_name = "files/xd" try: self._set_up_duckdb_connection(tmp_dir_name) result_file_path = result_file_path or self._generate_geoparquet_result_file_path( @@ -253,14 +252,18 @@ def _set_up_duckdb_connection(self, tmp_dir_name: str) -> None: self.connection.install_extension(extension_name) self.connection.load_extension(extension_name) - self.connection.sql(""" + self.connection.sql( + """ CREATE OR REPLACE MACRO linestring_to_linestring_wkt(ls) AS 'LINESTRING (' || array_to_string([pt.x || ' ' || pt.y for pt in ls], ', ') || ')'; - """) - self.connection.sql(""" + """ + ) + self.connection.sql( + """ CREATE OR REPLACE MACRO linestring_to_polygon_wkt(ls) AS 'POLYGON ((' || array_to_string([pt.x || ' ' || pt.y for pt in ls], ', ') || '))'; - """) + """ + ) def _parse_pbf_file( self, @@ -305,27 +308,39 @@ def _parse_pbf_file( ], ) + filtered_ways_with_linestrings = self._get_filtered_ways_with_linestrings( + osm_parquet_files=converted_osm_parquet_files, + required_nodes_with_structs=required_nodes_with_structs, + tmp_dir_name=tmp_dir_name, + ) required_ways_with_linestrings = self._get_required_ways_with_linestrings( - converted_osm_parquet_files, required_nodes_with_structs, tmp_dir_name + osm_parquet_files=converted_osm_parquet_files, + required_nodes_with_structs=required_nodes_with_structs, + tmp_dir_name=tmp_dir_name, ) self._delete_directories( tmp_dir_name, [ + "ways_required_grouped", "ways_required_grouped", "ways_required_ids", "ways_with_unnested_nodes_refs", "required_nodes_with_points", + "required_ways_grouped", + "filtered_ways_grouped", ], ) filtered_ways_with_proper_geometry = self._get_filtered_ways_with_proper_geometry( - converted_osm_parquet_files, required_ways_with_linestrings, tmp_dir_name + converted_osm_parquet_files, filtered_ways_with_linestrings, tmp_dir_name ) self._delete_directories( tmp_dir_name, [ "ways_prepared_ids", + "ways_filtered_ids", "ways_all_with_tags", + "filtered_ways_with_linestrings", ], ) @@ -470,11 +485,13 @@ def _prefilter_elements_ids( # - select all with more then one ref # - join all NV to refs # - select all where all refs has been joined (total_refs == found_refs) - self.connection.sql(f""" + self.connection.sql( + f""" SELECT * FROM ({elements.sql_query()}) w WHERE kind = 'way' AND len(refs) >= 2 - """).to_view("ways", replace=True) + """ + ).to_view("ways", replace=True) ways_all_with_tags = self._sql_to_parquet_file( sql_query=f""" WITH filtered_tags AS ( @@ -545,13 +562,10 @@ def _prefilter_elements_ids( file_path=Path(tmp_dir_name) / "ways_filtered_non_distinct_ids", ) - ways_prepared_ids_path = Path(tmp_dir_name) / "ways_prepared_ids" - ways_prepared_ids_path.mkdir(parents=True, exist_ok=True) - with TaskProgressSpinner("Calculating distinct filtered ways ids", "10"): ways_filtered_ids = self._calculate_unique_ids_to_parquet( Path(tmp_dir_name) / "ways_filtered_non_distinct_ids", - ways_prepared_ids_path / "filtered", + Path(tmp_dir_name) / "ways_filtered_ids", ) with TaskProgressSpinner("Reading relations", "11"): @@ -561,13 +575,15 @@ def _prefilter_elements_ids( # - select all with type in ['boundary', 'multipolygon'] # - join all WV to refs # - select all where all refs has been joined (total_refs == found_refs) - self.connection.sql(f""" + self.connection.sql( + f""" SELECT * FROM ({elements.sql_query()}) WHERE kind = 'relation' AND len(refs) > 0 AND list_contains(map_keys(tags), 'type') AND list_has_any(map_extract(tags, 'type'), ['boundary', 'multipolygon']) - """).to_view("relations", replace=True) + """ + ).to_view("relations", replace=True) relations_all_with_tags = self._sql_to_parquet_file( sql_query=f""" WITH filtered_tags AS ( @@ -661,6 +677,9 @@ def _prefilter_elements_ids( relations_ids_path / "filtered", Path(tmp_dir_name) / "relations_filtered_ids" ) + ways_prepared_ids_path = Path(tmp_dir_name) / "ways_prepared_ids" + ways_prepared_ids_path.mkdir(parents=True, exist_ok=True) + with TaskProgressSpinner("Loading required ways - by relations", "17"): # WAYS - REQUIRED (WR) # - required - all IDs from WF @@ -733,7 +752,7 @@ def _delete_directories( directory_path = Path(tmp_dir_name) / directory if not directory_path.exists(): continue - # shutil.rmtree(directory_path) + shutil.rmtree(directory_path) def _generate_osm_tags_sql_filter(self) -> str: """Prepare features filter clauses based on tags filter.""" @@ -816,14 +835,18 @@ def _sql_to_parquet_file(self, sql_query: str, file_path: Path) -> "duckdb.DuckD def _save_parquet_file( self, relation: "duckdb.DuckDBPyRelation", file_path: Path ) -> "duckdb.DuckDBPyRelation": - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT * FROM ({relation.sql_query()}) ) TO '{file_path}' (FORMAT 'parquet', PER_THREAD_OUTPUT true, ROW_GROUP_SIZE 25000) - """) - return self.connection.sql(f""" + """ + ) + return self.connection.sql( + f""" SELECT * FROM read_parquet('{file_path}/**') - """) + """ + ) def _calculate_unique_ids_to_parquet( self, file_path: Path, result_path: Optional[Path] = None @@ -831,29 +854,35 @@ def _calculate_unique_ids_to_parquet( if result_path is None: result_path = file_path / "distinct" - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT id FROM read_parquet('{file_path}/**') GROUP BY id ) TO '{result_path}' (FORMAT 'parquet', PER_THREAD_OUTPUT true, ROW_GROUP_SIZE 25000) - """) + """ + ) - return self.connection.sql(f""" + return self.connection.sql( + f""" SELECT * FROM read_parquet('{result_path}/**') - """) + """ + ) def _get_filtered_nodes_with_geometry( self, osm_parquet_files: ConvertedOSMParquetFiles, tmp_dir_name: str, ) -> "duckdb.DuckDBPyRelation": - nodes_with_geometry = self.connection.sql(f""" + nodes_with_geometry = self.connection.sql( + f""" SELECT n.id, n.tags, ST_Point(round(n.lon, 7), round(n.lat, 7)) geometry FROM ({osm_parquet_files.nodes_valid_with_tags.sql_query()}) n SEMI JOIN ({osm_parquet_files.nodes_filtered_ids.sql_query()}) fn ON n.id = fn.id - """) + """ + ) nodes_parquet = self._save_parquet_file_with_geometry( relation=nodes_with_geometry, file_path=Path(tmp_dir_name) / "filtered_nodes_with_geometry", @@ -867,13 +896,15 @@ def _get_required_nodes_with_structs( osm_parquet_files: ConvertedOSMParquetFiles, tmp_dir_name: str, ) -> "duckdb.DuckDBPyRelation": - nodes_with_structs = self.connection.sql(f""" + nodes_with_structs = self.connection.sql( + f""" SELECT n.id, struct_pack(x := round(n.lon, 7), y := round(n.lat, 7))::POINT_2D point FROM ({osm_parquet_files.nodes_valid_with_tags.sql_query()}) n SEMI JOIN ({osm_parquet_files.nodes_required_ids.sql_query()}) rn ON n.id = rn.id - """) + """ + ) with TaskProgressSpinner("Saving required nodes with structs", "23"): nodes_parquet = self._save_parquet_file( relation=nodes_with_structs, @@ -881,74 +912,138 @@ def _get_required_nodes_with_structs( ) return nodes_parquet + def _get_filtered_ways_with_linestrings( + self, + osm_parquet_files: ConvertedOSMParquetFiles, + required_nodes_with_structs: "duckdb.DuckDBPyRelation", + tmp_dir_name: str, + ) -> "duckdb.DuckDBPyRelation": + grouped_ways_path = Path(tmp_dir_name) / "filtered_ways_grouped" + destination_dir_path = Path(tmp_dir_name) / "filtered_ways_with_linestrings" + + with TaskProgressSpinner("Grouping filtered ways", "24"): + groups = self._group_ways( + ways_ids=osm_parquet_files.ways_filtered_ids, + osm_parquet_files=osm_parquet_files, + required_nodes_with_structs=required_nodes_with_structs, + grouped_ways_path=grouped_ways_path, + destination_dir_path=destination_dir_path, + ) + + with TaskProgressBar("Saving filtered ways with linestrings", "25") as bar: + self._construct_ways_linestrings( + bar=bar, + groups=groups, + destination_dir_path=destination_dir_path, + grouped_ways_path=grouped_ways_path, + ) + + ways_parquet = self.connection.sql( + f""" + SELECT * FROM read_parquet('{destination_dir_path}/**') + """ + ) + return ways_parquet + def _get_required_ways_with_linestrings( self, osm_parquet_files: ConvertedOSMParquetFiles, required_nodes_with_structs: "duckdb.DuckDBPyRelation", tmp_dir_name: str, ) -> "duckdb.DuckDBPyRelation": - with TaskProgressSpinner("Grouping required ways", "24"): - total_required_ways = osm_parquet_files.ways_required_ids.count("id").fetchone()[0] + grouped_ways_path = Path(tmp_dir_name) / "required_ways_grouped" + destination_dir_path = Path(tmp_dir_name) / "required_ways_with_linestrings" + + with TaskProgressSpinner("Grouping required ways", "26"): + groups = self._group_ways( + ways_ids=osm_parquet_files.ways_required_ids, + osm_parquet_files=osm_parquet_files, + required_nodes_with_structs=required_nodes_with_structs, + grouped_ways_path=grouped_ways_path, + destination_dir_path=destination_dir_path, + ) - required_ways_with_linestrings_path = ( - Path(tmp_dir_name) / "required_ways_with_linestrings" + with TaskProgressBar("Saving required ways with linestrings", "27") as bar: + self._construct_ways_linestrings( + bar=bar, + groups=groups, + destination_dir_path=destination_dir_path, + grouped_ways_path=grouped_ways_path, ) - required_ways_with_linestrings_path.mkdir(parents=True, exist_ok=True) - if total_required_ways == 0: - empty_file_path = str(required_ways_with_linestrings_path / "empty.parquet") - self.connection.sql( - "CREATE OR REPLACE TABLE x(id STRING, linestring LINESTRING_2D);" - ) - self.connection.table("x").to_parquet(empty_file_path) - return self.connection.read_parquet(empty_file_path) + ways_parquet = self.connection.sql( + f""" + SELECT * FROM read_parquet('{destination_dir_path}/**') + """ + ) + return ways_parquet - groups = floor(total_required_ways / self.rows_per_bucket) - grouped_required_ways_path = Path(tmp_dir_name) / "ways_required_grouped" + def _group_ways( + self, + ways_ids: "duckdb.DuckDBPyRelation", + osm_parquet_files: ConvertedOSMParquetFiles, + required_nodes_with_structs: "duckdb.DuckDBPyRelation", + destination_dir_path: Path, + grouped_ways_path: Path, + ) -> int: + total_required_ways = ways_ids.count("id").fetchone()[0] - self.connection.sql(f""" - COPY ( - WITH grouped_ways_ids AS ( - SELECT id, - floor( - row_number() OVER () / {self.rows_per_bucket} - )::INTEGER as "group", - FROM ({osm_parquet_files.ways_required_ids.sql_query()}) - ) - SELECT - w.id, n.point, w.ref_idx, rw."group", - FROM ({osm_parquet_files.ways_with_unnested_nodes_refs.sql_query()}) w - JOIN grouped_ways_ids rw - ON w.id = rw.id - JOIN ({required_nodes_with_structs.sql_query()}) n - ON n.id = w.ref - ) TO '{grouped_required_ways_path}' - (FORMAT 'parquet', PARTITION_BY ("group"), ROW_GROUP_SIZE 25000) - """) - - with TaskProgressBar("Saving required ways with linestrings", "25") as bar: - for group in bar.track(range(groups + 1)): - current_required_ways_group_path = ( - grouped_required_ways_path / f"group={group}" - ) - current_required_ways_group_relation = self.connection.sql(f""" - SELECT * FROM read_parquet('{current_required_ways_group_path}/**') - """) + destination_dir_path.mkdir(parents=True, exist_ok=True) - ways_with_linestrings = self.connection.sql(f""" - SELECT id, list(point ORDER BY ref_idx ASC)::LINESTRING_2D linestring - FROM ({current_required_ways_group_relation.sql_query()}) - GROUP BY id - """) - self._save_parquet_file( - relation=ways_with_linestrings, - file_path=required_ways_with_linestrings_path / f"group={group}", + if total_required_ways == 0: + empty_file_path = str(destination_dir_path / "empty.parquet") + self.connection.sql("CREATE OR REPLACE TABLE x(id STRING, linestring LINESTRING_2D);") + self.connection.table("x").to_parquet(empty_file_path) + return -1 + + groups = int(floor(total_required_ways / self.rows_per_bucket)) + + self.connection.sql( + f""" + COPY ( + WITH grouped_ways_ids AS ( + SELECT id, + floor( + row_number() OVER () / {self.rows_per_bucket} + )::INTEGER as "group", + FROM ({ways_ids.sql_query()}) ) + SELECT + w.id, n.point, w.ref_idx, rw."group", + FROM ({osm_parquet_files.ways_with_unnested_nodes_refs.sql_query()}) w + JOIN grouped_ways_ids rw + ON w.id = rw.id + JOIN ({required_nodes_with_structs.sql_query()}) n + ON n.id = w.ref + ) TO '{grouped_ways_path}' + (FORMAT 'parquet', PARTITION_BY ("group"), ROW_GROUP_SIZE 25000) + """ + ) - ways_parquet = self.connection.sql(f""" - SELECT * FROM read_parquet('{required_ways_with_linestrings_path}/**') - """) - return ways_parquet + return groups + + def _construct_ways_linestrings( + self, bar: TaskProgressBar, groups: int, destination_dir_path: Path, grouped_ways_path: Path + ) -> None: + for group in bar.track(range(groups + 1)): + current_required_ways_group_path = grouped_ways_path / f"group={group}" + current_required_ways_group_relation = self.connection.sql( + f""" + SELECT * FROM read_parquet('{current_required_ways_group_path}/**') + """ + ) + + ways_with_linestrings = self.connection.sql( + f""" + SELECT id, list(point ORDER BY ref_idx ASC)::LINESTRING_2D linestring + FROM ({current_required_ways_group_relation.sql_query()}) + GROUP BY id + """ + ) + self._save_parquet_file( + relation=ways_with_linestrings, + file_path=destination_dir_path / f"group={group}", + ) def _get_filtered_ways_with_proper_geometry( self, @@ -984,7 +1079,8 @@ def _get_filtered_ways_with_proper_geometry( f" list_has_any(map_extract(raw_tags, '{osm_tag_key}'), [{escaped_values}])" ) - ways_with_proper_geometry = self.connection.sql(f""" + ways_with_proper_geometry = self.connection.sql( + f""" WITH required_ways_with_linestrings AS ( SELECT w.id, @@ -1021,12 +1117,13 @@ def _get_filtered_ways_with_proper_geometry( required_ways_with_linestrings w ) SELECT id, tags, geometry FROM proper_geometries - """) + """ + ) ways_parquet = self._save_parquet_file_with_geometry( relation=ways_with_proper_geometry, file_path=Path(tmp_dir_name) / "filtered_ways_with_geometry", step_name="Saving filtered ways with geometries", - step_number="26", + step_number="28", ) return ways_parquet @@ -1036,7 +1133,8 @@ def _get_filtered_relations_with_geometry( required_ways_with_linestrings: "duckdb.DuckDBPyRelation", tmp_dir_name: str, ) -> "duckdb.DuckDBPyRelation": - valid_relation_parts = self.connection.sql(f""" + valid_relation_parts = self.connection.sql( + f""" WITH unnested_relations AS ( SELECT r.id, @@ -1091,38 +1189,44 @@ def _get_filtered_relations_with_geometry( ) SELECT * FROM relations_with_geometries SEMI JOIN valid_relations ON relations_with_geometries.id = valid_relations.id - """) + """ + ) valid_relation_parts_parquet = self._save_parquet_file_with_geometry( relation=valid_relation_parts, file_path=Path(tmp_dir_name) / "valid_relation_parts", step_name="Saving valid relations parts", - step_number="27", + step_number="29", ) - relation_inner_parts = self.connection.sql(f""" + relation_inner_parts = self.connection.sql( + f""" SELECT id, geometry_id, ST_MakePolygon(geometry) geometry FROM ({valid_relation_parts_parquet.sql_query()}) WHERE ref_role = 'inner' - """) + """ + ) relation_inner_parts_parquet = self._save_parquet_file_with_geometry( relation=relation_inner_parts, file_path=Path(tmp_dir_name) / "relation_inner_parts", fix_geometries=True, step_name="Saving relations inner parts", - step_number="28", + step_number="30", ) - relation_outer_parts = self.connection.sql(f""" + relation_outer_parts = self.connection.sql( + f""" SELECT id, geometry_id, ST_MakePolygon(geometry) geometry FROM ({valid_relation_parts_parquet.sql_query()}) WHERE ref_role = 'outer' - """) + """ + ) relation_outer_parts_parquet = self._save_parquet_file_with_geometry( relation=relation_outer_parts, file_path=Path(tmp_dir_name) / "relation_outer_parts", fix_geometries=True, step_name="Saving relations outer parts", - step_number="29", + step_number="31", ) - relation_outer_parts_with_holes = self.connection.sql(f""" + relation_outer_parts_with_holes = self.connection.sql( + f""" SELECT og.id, og.geometry_id, @@ -1131,14 +1235,16 @@ def _get_filtered_relations_with_geometry( JOIN ({relation_inner_parts_parquet.sql_query()}) ig ON og.id = ig.id AND ST_WITHIN(ig.geometry, og.geometry) GROUP BY og.id, og.geometry_id - """) + """ + ) relation_outer_parts_with_holes_parquet = self._save_parquet_file_with_geometry( relation=relation_outer_parts_with_holes, file_path=Path(tmp_dir_name) / "relation_outer_parts_with_holes", step_name="Saving relations outer parts with holes", - step_number="30", + step_number="32", ) - relation_outer_parts_without_holes = self.connection.sql(f""" + relation_outer_parts_without_holes = self.connection.sql( + f""" SELECT og.id, og.geometry_id, @@ -1146,14 +1252,16 @@ def _get_filtered_relations_with_geometry( FROM ({relation_outer_parts_parquet.sql_query()}) og ANTI JOIN ({relation_outer_parts_with_holes_parquet.sql_query()}) ogwh ON og.id = ogwh.id AND og.geometry_id = ogwh.geometry_id - """) + """ + ) relation_outer_parts_without_holes_parquet = self._save_parquet_file_with_geometry( relation=relation_outer_parts_without_holes, file_path=Path(tmp_dir_name) / "relation_outer_parts_without_holes", step_name="Saving relations outer parts without holes", - step_number="31", + step_number="33", ) - relations_with_geometry = self.connection.sql(f""" + relations_with_geometry = self.connection.sql( + f""" WITH unioned_outer_geometries AS ( SELECT id, geometry FROM ({relation_outer_parts_with_holes_parquet.sql_query()}) @@ -1170,12 +1278,13 @@ def _get_filtered_relations_with_geometry( FROM final_geometries r_g JOIN ({osm_parquet_files.relations_all_with_tags.sql_query()}) r ON r.id = r_g.id - """) + """ + ) relations_parquet = self._save_parquet_file_with_geometry( relation=relations_with_geometry, file_path=Path(tmp_dir_name) / "filtered_relations_with_geometry", step_name="Saving filtered relations with geometries", - step_number="32", + step_number="34", ) return relations_parquet @@ -1189,7 +1298,8 @@ def _save_parquet_file_with_geometry( ) -> "duckdb.DuckDBPyRelation": if not fix_geometries: with TaskProgressSpinner(step_name, f"{step_number}"): - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT * EXCLUDE (geometry), ST_AsWKB(geometry) geometry_wkb @@ -1199,7 +1309,8 @@ def _save_parquet_file_with_geometry( PER_THREAD_OUTPUT true, ROW_GROUP_SIZE 25000 ) - """) + """ + ) else: valid_path = file_path / "valid" invalid_path = file_path / "invalid" @@ -1211,7 +1322,8 @@ def _save_parquet_file_with_geometry( # Save valid features with TaskProgressSpinner(f"{step_name} - valid geometries", f"{step_number}.1"): - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT * EXCLUDE (geometry), ST_AsWKB(geometry) geometry_wkb @@ -1222,11 +1334,13 @@ def _save_parquet_file_with_geometry( PER_THREAD_OUTPUT true, ROW_GROUP_SIZE 25000 ) - """) + """ + ) # Save invalid features with TaskProgressSpinner(f"{step_name} - invalid geometries", f"{step_number}.2"): - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT * EXCLUDE (geometry), ST_AsWKB(geometry) geometry_wkb, @@ -1238,7 +1352,8 @@ def _save_parquet_file_with_geometry( ) TO '{invalid_path}' ( FORMAT 'parquet', PARTITION_BY ("group"), ROW_GROUP_SIZE 25000 ) - """) + """ + ) # Fix invalid features total_groups = 0 @@ -1278,10 +1393,12 @@ def _save_parquet_file_with_geometry( self._delete_directories(invalid_path.parent, ["invalid"]) - return self.connection.sql(f""" + return self.connection.sql( + f""" SELECT * EXCLUDE (geometry_wkb), ST_GeomFromWKB(geometry_wkb) geometry FROM read_parquet('{file_path}/**') - """) + """ + ) def _concatenate_results_to_geoparquet( self, @@ -1299,7 +1416,8 @@ def _concatenate_results_to_geoparquet( way_select_clauses = ["'way/' || id as feature_id", *select_clauses] relation_select_clauses = ["'relation/' || id as feature_id", *select_clauses] - unioned_features = self.connection.sql(f""" + unioned_features = self.connection.sql( + f""" SELECT {', '.join(node_select_clauses)} FROM ({parsed_data.nodes.sql_query()}) n UNION ALL @@ -1308,21 +1426,24 @@ def _concatenate_results_to_geoparquet( UNION ALL SELECT {', '.join(relation_select_clauses)} FROM ({parsed_data.relations.sql_query()}) r - """) + """ + ) grouped_features = self._parse_features_relation_to_groups(unioned_features, explode_tags) - valid_features_full_relation = self.connection.sql(f""" + valid_features_full_relation = self.connection.sql( + f""" SELECT * FROM ({grouped_features.sql_query()}) WHERE ST_IsValid(geometry) - """) + """ + ) valid_features_parquet_path = Path(tmp_dir_name) / "osm_valid_elements" valid_features_parquet_relation = self._save_parquet_file_with_geometry( valid_features_full_relation, valid_features_parquet_path, step_name="Saving valid features", - step_number="33.1", + step_number="35.1", ) valid_features_parquet_table = pq.read_table(valid_features_parquet_path) @@ -1343,11 +1464,13 @@ def _concatenate_results_to_geoparquet( parquet_tables = [valid_features_parquet_table] - invalid_features_full_relation = self.connection.sql(f""" + invalid_features_full_relation = self.connection.sql( + f""" SELECT * FROM ({grouped_features.sql_query()}) a ANTI JOIN ({valid_features_parquet_relation.sql_query()}) b ON a.feature_id = b.feature_id - """) + """ + ) total_nodes = parsed_data.nodes.count("id").fetchone()[0] total_ways = parsed_data.ways.count("id").fetchone()[0] @@ -1359,12 +1482,13 @@ def _concatenate_results_to_geoparquet( invalid_features = total_features - valid_features if invalid_features > 0: - with TaskProgressSpinner("Grouping invalid features", "33.2"): + with TaskProgressSpinner("Grouping invalid features", "35.2"): groups = floor(invalid_features / self.rows_per_bucket) grouped_invalid_features_result_parquet = ( Path(tmp_dir_name) / "osm_invalid_elements_grouped" ) - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT * EXCLUDE (geometry), ST_AsWKB(geometry) geometry_wkb, @@ -1374,9 +1498,10 @@ def _concatenate_results_to_geoparquet( FROM ({invalid_features_full_relation.sql_query()}) ) TO '{grouped_invalid_features_result_parquet}' (FORMAT 'parquet', PARTITION_BY ("group"), ROW_GROUP_SIZE 25000) - """) + """ + ) - with TaskProgressBar("Fixing invalid features", "33.3") as bar: + with TaskProgressBar("Fixing invalid features", "35.3") as bar: for group in bar.track(range(groups + 1)): current_invalid_features_group_path = ( grouped_invalid_features_result_parquet / f"group={group}" @@ -1422,7 +1547,7 @@ def _concatenate_results_to_geoparquet( if empty_columns: joined_parquet_table = joined_parquet_table.drop(empty_columns) - with TaskProgressSpinner("Saving final geoparquet file", "34"): + with TaskProgressSpinner("Saving final geoparquet file", "36"): io.write_geoparquet_table( # type: ignore joined_parquet_table, save_file_path, primary_geometry_column=GEOMETRY_COLUMN ) @@ -1443,10 +1568,15 @@ def _generate_osm_tags_sql_select( parsed_data.ways, parsed_data.relations, ): - found_tag_keys = [row[0] for row in self.connection.sql(f""" + found_tag_keys = [ + row[0] + for row in self.connection.sql( + f""" SELECT DISTINCT UNNEST(map_keys(tags)) tag_key FROM ({elements.sql_query()}) - """).fetchall()] + """ + ).fetchall() + ] osm_tag_keys.update(found_tag_keys) osm_tag_keys_select_clauses = [ f"list_extract(map_extract(tags, '{osm_tag_key}'), 1) as \"{osm_tag_key}\"" @@ -1469,7 +1599,8 @@ def _generate_osm_tags_sql_select( f"(tag_entry.key = '{filter_tag_key}' AND tag_entry.value IN" f" ({', '.join(values_list)}))" ) - osm_tag_keys_select_clauses = [f""" + osm_tag_keys_select_clauses = [ + f""" map_from_entries( [ tag_entry @@ -1477,7 +1608,8 @@ def _generate_osm_tags_sql_select( if {" OR ".join(filter_tag_clauses)} ] ) as tags - """] + """ + ] elif self.merged_tags_filter and explode_tags: for filter_tag_key, filter_tag_value in self.merged_tags_filter.items(): if isinstance(filter_tag_value, bool) and filter_tag_value: @@ -1487,24 +1619,28 @@ def _generate_osm_tags_sql_select( ) elif isinstance(filter_tag_value, str): escaped_value = self._sql_escape(filter_tag_value) - osm_tag_keys_select_clauses.append(f""" + osm_tag_keys_select_clauses.append( + f""" CASE WHEN list_extract( map_extract(tags, '{filter_tag_key}'), 1 ) = '{escaped_value}' THEN '{escaped_value}' ELSE NULL END as "{filter_tag_key}" - """) + """ + ) elif isinstance(filter_tag_value, list) and filter_tag_value: values_list = [f"'{self._sql_escape(value)}'" for value in filter_tag_value] - osm_tag_keys_select_clauses.append(f""" + osm_tag_keys_select_clauses.append( + f""" CASE WHEN list_extract( map_extract(tags, '{filter_tag_key}'), 1 ) IN ({', '.join(values_list)}) THEN list_extract(map_extract(tags, '{filter_tag_key}'), 1) ELSE NULL END as "{filter_tag_key}" - """) + """ + ) if len(osm_tag_keys_select_clauses) > 100: warnings.warn( @@ -1571,10 +1707,12 @@ def _parse_features_relation_to_groups( case_clauses.append(case_clause) joined_case_clauses = ", ".join(case_clauses) - grouped_features_relation = self.connection.sql(f""" + grouped_features_relation = self.connection.sql( + f""" SELECT feature_id, {joined_case_clauses}, geometry FROM ({features_relation.sql_query()}) - """) + """ + ) else: case_clauses = [] group_names = sorted(grouped_tags_filter.keys()) @@ -1615,9 +1753,11 @@ def _parse_features_relation_to_groups( ] ) as tags""" - grouped_features_relation = self.connection.sql(f""" + grouped_features_relation = self.connection.sql( + f""" SELECT feature_id, {non_null_groups_map}, geometry FROM ({features_relation.sql_query()}) - """) + """ + ) return grouped_features_relation