Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: change geometry filtering step #117

Merged
merged 25 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5193f90
refactor: change geometry filtering step
RaczeQ May 15, 2024
bb20d7b
refactor: change spatial filtering from geos to native duckdb
RaczeQ May 15, 2024
cce3134
chore: add additional filter step and remove distinct clause
RaczeQ May 15, 2024
7389c3b
refactor: change intersecting logic
RaczeQ May 15, 2024
25b1ca1
refactor: change contains to shapely
RaczeQ May 15, 2024
6123e27
refactor: change operations to R-tree from shapely
RaczeQ May 15, 2024
78caa4e
refractor: add multiprocessing to STRtree method
RaczeQ May 16, 2024
4e7faf8
fix: change pbf extract geometry propagation
RaczeQ May 16, 2024
9e758a5
chore: change intersection to dask
RaczeQ May 17, 2024
3d5b028
fix: change partitions number
RaczeQ May 17, 2024
13b59a0
chore: test dask from git
RaczeQ May 17, 2024
548d95d
Revert "chore: change intersection to dask"
RaczeQ May 17, 2024
ca4ff6f
chore: change lock
RaczeQ May 17, 2024
a1f6e22
feat: speed up STRtree generation
RaczeQ May 17, 2024
6902dd8
chore: apply refurb suggestion
RaczeQ May 17, 2024
b18b6fc
chore: apply refurb suggestion
RaczeQ May 17, 2024
fbcd0a4
chore: add intersection test and pragma clauses
RaczeQ May 17, 2024
75584ac
chore: add changelog entries
RaczeQ May 17, 2024
943c273
Merge branch 'main' into 112-split-multipart-geometry-filter-in-inter…
RaczeQ May 17, 2024
7d29648
chore: add automatic process termination in case of exception
RaczeQ May 19, 2024
3636fea
feat: refactor parquet multiprocessing wrapper
RaczeQ May 21, 2024
7c31fd3
chore: change test exception value
RaczeQ May 21, 2024
589aa43
fix: wrap queue empty error
RaczeQ May 21, 2024
2adef1d
fix: change test and debug flag
RaczeQ May 21, 2024
1e9c770
chore: add pragmas and change changelog
RaczeQ May 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Added new internal parquet dataset processing logic using multiprocessing
- Refactored nodes intersection step from `ST_Intersects` in DuckDB to Shapely's `STRtree`
- `PbfFileReader`'s internal `geometry_filter` is additionally clipped by PBF extract geometry to speed up intersections

### Added

- `geoarrow-rust-core` library to the main dependencies
- Test for hashing geometry filter with mixed order
- Test for parquet multiprocessing logic
- Test for new intersection step

## [0.8.1] - 2024-05-11

### Added
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ Required:

- `shapely (>=2.0)`: For parsing WKT and GeoJSON strings and fixing geometries

- `geoarrow-rust-core (>=0.2)`: For transforming Arrow data to Shapely objects

- `polars (>=0.19.4)`: For faster OSM ways grouping operation

- `typeguard (>=3.0)`: For internal validation of types
Expand Down
351 changes: 183 additions & 168 deletions pdm.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies = [
"requests",
"polars>=0.19.4",
"rich>=10.11.0",
"geoarrow-rust-core>=0.2.0",
]
requires-python = ">=3.9"
readme = "README.md"
Expand Down
4 changes: 2 additions & 2 deletions quackosm/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ def main() -> None:
)
raise ImportError(error_msg) from exc

cli.app(prog_name=__app_name__)
cli.app(prog_name=__app_name__) # pragma: no cover


if __name__ == "__main__":
if __name__ == "__main__": # pragma: no cover
main()
52 changes: 52 additions & 0 deletions quackosm/_intersection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from functools import partial
from pathlib import Path
from typing import Optional

import pyarrow as pa
from geoarrow.rust.core import PointArray
from shapely import STRtree
from shapely.geometry.base import BaseGeometry

from quackosm._parquet_multiprocessing import map_parquet_dataset
from quackosm._rich_progress import TaskProgressBar # type: ignore[attr-defined]


def _intersect_nodes(
table: pa.Table,
geometry_filter: BaseGeometry,
) -> pa.Table: # pragma: no cover
points_array = PointArray.from_xy(
x=table["lon"].combine_chunks(), y=table["lat"].combine_chunks()
)

tree = STRtree(points_array.to_shapely())

intersecting_ids_array = table["id"].take(tree.query(geometry_filter, predicate="intersects"))

return pa.table({"id": intersecting_ids_array})


def intersect_nodes_with_geometry(
tmp_dir_path: Path,
geometry_filter: BaseGeometry,
progress_bar: Optional[TaskProgressBar] = None,
) -> None:
"""
Intersects nodes points with geometry filter using spatial index with multiprocessing.

Args:
tmp_dir_path (Path): Path of the working directory.
geometry_filter (BaseGeometry): Geometry used for filtering.
progress_bar (Optional[TaskProgressBar]): Progress bar to show task status.
Defaults to `None`
"""
dataset_path = tmp_dir_path / "nodes_valid_with_tags"
destination_path = tmp_dir_path / "nodes_intersecting_ids"

map_parquet_dataset(
dataset_path=dataset_path,
destination_path=destination_path,
progress_bar=progress_bar,
function=partial(_intersect_nodes, geometry_filter=geometry_filter),
columns=["id", "lat", "lon"],
)
158 changes: 158 additions & 0 deletions quackosm/_parquet_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import multiprocessing
import traceback
from pathlib import Path
from queue import Empty, Queue
from time import sleep
from typing import Callable, Optional

import pyarrow as pa
import pyarrow.parquet as pq

from quackosm._rich_progress import TaskProgressBar # type: ignore[attr-defined]


def _job(
queue: Queue[tuple[str, int]],
save_path: Path,
function: Callable[[pa.Table], pa.Table],
columns: Optional[list[str]] = None,
) -> None: # pragma: no cover
current_pid = multiprocessing.current_process().pid

filepath = save_path / f"{current_pid}.parquet"
writer = None
while not queue.empty():
try:
file_name, row_group_index = None, None
file_name, row_group_index = queue.get_nowait()

pq_file = pq.ParquetFile(file_name)
row_group_table = pq_file.read_row_group(row_group_index, columns=columns)
if len(row_group_table) == 0:
continue

result_table = function(row_group_table)

if not writer:
writer = pq.ParquetWriter(filepath, result_table.schema)

writer.write_table(result_table)
except Empty:
pass
except Exception as ex:
if file_name is not None and row_group_index is not None:
queue.put((file_name, row_group_index))

msg = (
f"Error in worker (PID: {current_pid},"
f" Parquet: {file_name}, Row group: {row_group_index})"
)
raise RuntimeError(msg) from ex

if writer:
writer.close()


class WorkerProcess(multiprocessing.Process):
def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def]
multiprocessing.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = multiprocessing.Pipe()
self._exception: Optional[tuple[Exception, str]] = None

def run(self) -> None: # pragma: no cover
try:
multiprocessing.Process.run(self)
self._cconn.send(None)
except Exception as e:
tb: str = traceback.format_exc()
self._cconn.send((e, tb))

@property
def exception(self) -> Optional[tuple[Exception, str]]:
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception


def map_parquet_dataset(
dataset_path: Path,
destination_path: Path,
function: Callable[[pa.Table], pa.Table],
columns: Optional[list[str]] = None,
progress_bar: Optional[TaskProgressBar] = None,
) -> None:
"""
Apply a function over parquet dataset in a multiprocessing environment.

Will save results in multiple files in a destination path.

Args:
dataset_path (Path): Path of the parquet dataset.
destination_path (Path): Path of the destination.
function (Callable[[pa.Table], pa.Table]): Function to apply over a row group table.
Will save resulting table in a new parquet file.
columns (Optional[list[str]]): List of columns to read. Defaults to `None`.
progress_bar (Optional[TaskProgressBar]): Progress bar to show task status.
Defaults to `None`.
"""
queue: Queue[tuple[str, int]] = multiprocessing.Manager().Queue()

dataset = pq.ParquetDataset(dataset_path)

for pq_file in dataset.files:
for row_group in range(pq.ParquetFile(pq_file).num_row_groups):
queue.put((pq_file, row_group))

total = queue.qsize()

destination_path.mkdir(parents=True, exist_ok=True)

try:
processes = [
WorkerProcess(
target=_job,
args=(queue, destination_path, function, columns),
) # type: ignore[no-untyped-call]
for _ in range(multiprocessing.cpu_count())
]

# Run processes
for p in processes:
p.start()

if progress_bar: # pragma: no cover
progress_bar.create_manual_bar(total=total)
while any(process.is_alive() for process in processes):
if any(p.exception for p in processes): # pragma: no cover
break

if progress_bar: # pragma: no cover
progress_bar.update_manual_bar(current_progress=total - queue.qsize())
sleep(1)

if progress_bar: # pragma: no cover
progress_bar.update_manual_bar(current_progress=total)
finally: # pragma: no cover
# In case of exception
exceptions = []
for p in processes:
if p.is_alive():
p.terminate()

if p.exception:
exceptions.append(p.exception)

if exceptions:

Check notice on line 145 in quackosm/_parquet_multiprocessing.py

View check run for this annotation

codefactor.io / CodeFactor

quackosm/_parquet_multiprocessing.py#L75-L145

Complex Method
# use ExceptionGroup in Python3.11
_raise_multiple(exceptions)


def _raise_multiple(exceptions: list[tuple[Exception, str]]) -> None:
if not exceptions:
return
try:
error, traceback = exceptions.pop()
msg = f"{error}\n\nOriginal {traceback}"
raise type(error)(msg)
finally:
_raise_multiple(exceptions)
63 changes: 35 additions & 28 deletions quackosm/_rich_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,38 +147,39 @@ def __init__(
self.progress_cls = progress_cls
self.live_obj = live_obj

def _create_progress(self):
columns = [
SpinnerColumn(),
TextColumn(self.step_number),
TextColumn(
"[progress.description]{task.description}"
" [progress.percentage]{task.percentage:>3.0f}%"
),
BarColumn(),
MofNCompleteColumn(),
TextColumn("•"),
TimeElapsedColumn(),
TextColumn("<"),
TimeRemainingColumn(),
TextColumn("•"),
SpeedColumn(),
]

if self.skip_step_number:
columns.pop(1)

self.progress = self.progress_cls(
*columns,
live_obj=self.live_obj,
transient=self.transient_mode,
speed_estimate_period=1800,
)

def __enter__(self):
if self.silent_mode:
self.progress = None
else:

columns = [
SpinnerColumn(),
TextColumn(self.step_number),
TextColumn(
"[progress.description]{task.description}"
" [progress.percentage]{task.percentage:>3.0f}%"
),
BarColumn(),
MofNCompleteColumn(),
TextColumn("•"),
TimeElapsedColumn(),
TextColumn("<"),
TimeRemainingColumn(),
TextColumn("•"),
SpeedColumn(),
]

if self.skip_step_number:
columns.pop(1)

self.progress = self.progress_cls(
*columns,
live_obj=self.live_obj,
transient=self.transient_mode,
speed_estimate_period=1800,
)

self._create_progress()
self.progress.__enter__()

return self
Expand All @@ -189,6 +190,12 @@ def __exit__(self, exc_type, exc_value, exc_tb):

self.progress = None

def create_manual_bar(self, total: int):
self.progress.add_task(description=self.step_name, total=total)

def update_manual_bar(self, current_progress: int):
self.progress.update(task_id=self.progress.task_ids[0], completed=current_progress)

def track(self, iterable: Iterable):
if self.progress is not None:
for i in self.progress.track(list(iterable), description=self.step_name):
Expand Down
4 changes: 2 additions & 2 deletions quackosm/osm_extracts/_poly_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
__all__ = ["parse_polygon_file"]


def parse_polygon_file(polygon_url: str) -> Optional[MultiPolygon]:
def parse_polygon_file(polygon_url: str) -> Optional[MultiPolygon]: # pragma: no cover
"""
Parse poly file from URL to geometry.

Expand All @@ -30,7 +30,7 @@ def parse_polygon_file(polygon_url: str) -> Optional[MultiPolygon]:
return poly


def parse_poly(lines: list[str]) -> MultiPolygon:
def parse_poly(lines: list[str]) -> MultiPolygon: # pragma: no cover
"""
Parse an Osmosis polygon filter file.

Expand Down
4 changes: 2 additions & 2 deletions quackosm/osm_extracts/bbbike.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def _load_bbbike_index() -> gpd.GeoDataFrame:

if save_path.exists():
gdf = gpd.read_file(save_path)
else:
else: # pragma: no cover
extracts = _iterate_bbbike_index()
gdf = gpd.GeoDataFrame(
data=[asdict(extract) for extract in extracts], geometry="geometry"
Expand All @@ -57,7 +57,7 @@ def _load_bbbike_index() -> gpd.GeoDataFrame:
return gdf


def _iterate_bbbike_index() -> list[OpenStreetMapExtract]:
def _iterate_bbbike_index() -> list[OpenStreetMapExtract]: # pragma: no cover
"""
Iterate OpenStreetMap.fr extracts service page.

Expand Down
2 changes: 1 addition & 1 deletion quackosm/osm_extracts/geofabrik.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _load_geofabrik_index() -> gpd.GeoDataFrame:

if save_path.exists():
gdf = gpd.read_file(save_path)
else:
else: # pragma: no cover
result = requests.get(
GEOFABRIK_INDEX_URL,
headers={
Expand Down
Loading
Loading