Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to pass storage_options to pandas.to_parquet. Update docs/CHANGELOG #79

Merged
merged 47 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
e69eb9f
build sindex takes args, returns self, cleanup
brl0 Feb 14, 2021
6fea4fa
Merge remote-tracking branch 'upstream/master' into 20210212
brl0 Jun 19, 2021
742eb00
Merge remote-tracking branch 'upstream/master' into 20210212
brl0 Jul 29, 2021
7f98aac
Pass storage and engine options, add some typing
brl0 Aug 3, 2021
86691da
Pass kwargs to _load_parquet_pandas_metadata
brl0 Aug 3, 2021
6af84b1
Remove unused import in tests
brl0 Aug 3, 2021
113a0d2
Fix argument order for pack_partitions_to_parquet
brl0 Aug 3, 2021
abf2d2f
Pass storage_options to read_parquet_dask
Aug 5, 2021
68d73df
Update docstrings for pack_partitions_to_parquet
Aug 5, 2021
839f837
Update docstrings for validate_coerce_filesystem
Aug 5, 2021
8a16c70
Pass storage_options to read_parquet_dask
Aug 5, 2021
5a59c38
Update docstrings for read_parquet_dask
Aug 5, 2021
a73d10a
Update CHANGELOG
Aug 5, 2021
4f54195
Add overwrite option, minor clean up
brl0 Aug 5, 2021
3f7562b
Update CHANGELOG
Aug 5, 2021
d3cfe22
Merge remote-tracking branch 'spatialpandas-brl0/20210212' into fix/d…
Aug 5, 2021
3c3485c
Implement version dependent pandas to_parquet function
Aug 6, 2021
60fa367
Update CHANGELOG
Aug 6, 2021
97db5be
Pass storage_options to read_parquet_dask
Aug 6, 2021
8283801
Add pd_to_parquet to __init__
Aug 6, 2021
0440451
Upate import statement
Aug 6, 2021
6e47d2a
Move import statement
Aug 6, 2021
48d35f4
Minor typing change
Aug 6, 2021
a37cb12
Add __all__, minor cleanup
brl0 Aug 6, 2021
74164f7
Update to_parquet function call
Aug 6, 2021
3cb3a32
Merge remote-tracking branch 'spatialpandas-brl0/20210212' into fix/d…
Aug 6, 2021
5ac852f
Update CHANGELOG
Aug 6, 2021
7559797
Add engine_kwargs to pd.to_parquet call
iameskild Aug 6, 2021
6f69e2e
Add engine_kwargs to dask.to_parquet
iameskild Aug 6, 2021
8584515
Add storage_options to read_parquet
iameskild Aug 6, 2021
2687420
Add engine_kwargs to read_parquet
iameskild Aug 6, 2021
689899f
Modify engine_kwargs to pack_partitions_to_parquet
Aug 6, 2021
427ee5d
Merge branch 'fix/dask_parquet' of github.com:iameskild/spatialpandas…
Aug 6, 2021
de63b44
Update CHANGELOG.md
iameskild Aug 6, 2021
4386c86
Update CHANGELOG.md
iameskild Aug 6, 2021
a3062f0
Fix missing final newline
iameskild Aug 6, 2021
35c45eb
Modify how fs is handled in to_parquet
Aug 6, 2021
dc42594
Merge branch 'fix/dask_parquet' of github.com:iameskild/spatialpandas…
Aug 6, 2021
7a9ec81
Remove extra whitespace
Aug 6, 2021
de07d47
Fix return statement indent
Aug 6, 2021
14e0f79
Remove extra whitespace
iameskild Aug 6, 2021
25642dc
Merge holoviz/spatialpandas master into fix/dask_parquet
Aug 6, 2021
bd756af
Merge holoviz/spatialpandas master into fix/dask_parquet
Aug 6, 2021
ead064a
Merge holoviz/spatialpandas master into fix/dask_parquet
Aug 6, 2021
6bfe470
Merge branch 'fix/dask_parquet' of github.com:iameskild/spatialpandas…
Aug 6, 2021
3d25819
Update engine_kwargs formatting
Aug 6, 2021
4589ac8
Move PANDAS_GE_12 into io/parquet.py
Aug 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## Version 0.4.3

Date: 2021-08-05

This release primarily expands the optional arguments that can be passed to `to_parquet_dask`/`read_parquet_dask` ensuring that `storage_options` is successfully passed where needed. It also adds the ability to pass `storage_options` to the `pandas.to_parquet` function (only for pandas > 1.2) and renames any reference to `fname` with `path` to align with the pandas convention.

Bug fixes:
- Update `validate_coerce_filesystem` to pass `storage_options` through. ([#78](https://github.com/holoviz/spatialpandas/pull/78))


## Version 0.4.2

This release primarily achieves compatibility with recent releases of Pandas. Many thanks to @Hoxbro for contributing the fixes and @philippjfr for ongoing maintenance of the project.
Expand Down
28 changes: 24 additions & 4 deletions spatialpandas/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,16 @@ def pack_partitions_to_parquet(

These directories are deleted as soon as possible during the execution
of the function.
storage_options: Key/value pairs to be passed on to the file-system backend, if any.
engine_kwargs: pyarrow.parquet engine-related keyword arguments.
Returns:
DaskGeoDataFrame backed by newly written parquet dataset
"""
from .io import read_parquet, read_parquet_dask
from .io.utils import validate_coerce_filesystem

engine_kwargs = engine_kwargs or {}

# Get fsspec filesystem object
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)

Expand Down Expand Up @@ -346,9 +350,10 @@ def write_partition(df_part, part_path):
f,
compression=compression,
index=True,
**(engine_kwargs or {}),
**engine_kwargs,
)


def process_partition(df, i):
subpart_paths = {}
for out_partition, df_part in df.groupby('_partition'):
Expand Down Expand Up @@ -394,7 +399,12 @@ def read_parquet_retry(parts_tmp_path, subpart_paths, part_output_path):
# Handle rare case where the task was resubmitted and the work has
# already been done. This shouldn't happen with pure=False, but it
# seems like it does very rarely.
return read_parquet(part_output_path, filesystem=filesystem)
return read_parquet(
part_output_path,
filesystem=filesystem,
storage_options=storage_options,
**engine_kwargs,
)

ls_res = sorted(filesystem.ls(parts_tmp_path, **ls_kwargs))
subpart_paths_stripped = sorted([filesystem._strip_protocol(_) for _ in subpart_paths])
Expand All @@ -414,7 +424,12 @@ def read_parquet_retry(parts_tmp_path, subpart_paths, part_output_path):
extras=list(extras)
)
)
return read_parquet(parts_tmp_path, filesystem=filesystem)
return read_parquet(
parts_tmp_path,
filesystem=filesystem,
storage_options=storage_options,
**engine_kwargs,
)

def concat_parts(parts_tmp_path, subpart_paths, part_output_path):
filesystem.invalidate_cache()
Expand Down Expand Up @@ -512,7 +527,12 @@ def write_commonmetadata_file():
pq.write_metadata(new_schema, f)
write_commonmetadata_file()

return read_parquet_dask(path, filesystem=filesystem)
return read_parquet_dask(
path,
filesystem=filesystem,
storage_options=storage_options,
engine_kwargs=engine_kwargs,
)

def _compute_packing_npartitions(self, npartitions):
if npartitions is None:
Expand Down
50 changes: 38 additions & 12 deletions spatialpandas/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import json
import pathlib
from distutils.version import LooseVersion
from functools import reduce
from glob import has_magic
from numbers import Number
Expand Down Expand Up @@ -29,6 +30,9 @@
validate_coerce_filesystem,
)

# improve pandas compatibility, based on geopandas _compat.py
PANDAS_GE_12 = str(pd.__version__) >= LooseVersion("1.2.0")

_geometry_dtypes = [
PointDtype, MultiPointDtype, RingDtype, LineDtype,
MultiLineDtype, PolygonDtype, MultiPolygonDtype
Expand All @@ -50,6 +54,7 @@ def _load_parquet_pandas_metadata(
storage_options=None,
engine_kwargs=None,
):
engine_kwargs = engine_kwargs or {}
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)
if not filesystem.exists(path):
raise ValueError("Path not found: " + path)
Expand All @@ -59,7 +64,7 @@ def _load_parquet_pandas_metadata(
path,
filesystem=filesystem,
validate_schema=False,
**(engine_kwargs or {}),
**engine_kwargs,
)
common_metadata = pqds.common_metadata
if common_metadata is None:
Expand Down Expand Up @@ -98,20 +103,35 @@ def _get_geometry_columns(pandas_metadata):

def to_parquet(
df: GeoDataFrame,
fname: PathType,
path: PathType,
compression: Optional[str] = "snappy",
filesystem: Optional[fsspec.AbstractFileSystem] = None,
index: Optional[bool] = None,
storage_options: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
if filesystem is not None:
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)

# Standard pandas to_parquet with pyarrow engine
pd_to_parquet(
df,
fname,
engine="pyarrow",
compression=compression,
index=index,
to_parquet_args = {
"df": df,
"path": path,
"engine": "pyarrow",
"compression": compression,
"filesystem": filesystem,
"index": index,
**kwargs,
)
}

if PANDAS_GE_12:
to_parquet_args.update({"storage_options": storage_options})
else:
if filesystem is None:
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)
to_parquet_args.update({"filesystem": filesystem})

pd_to_parquet(**to_parquet_args)


def read_parquet(
Expand All @@ -122,6 +142,7 @@ def read_parquet(
engine_kwargs: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> GeoDataFrame:
engine_kwargs = engine_kwargs or {}
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)

# Load pandas parquet metadata
Expand Down Expand Up @@ -154,7 +175,7 @@ def read_parquet(
path,
filesystem=filesystem,
validate_schema=False,
**(engine_kwargs or {}),
**engine_kwargs,
**kwargs,
).read(columns=columns).to_pandas()

Expand All @@ -176,6 +197,8 @@ def to_parquet_dask(
engine_kwargs: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
engine_kwargs = engine_kwargs or {}

if not isinstance(ddf, DaskGeoDataFrame):
raise TypeError(f"Expected DaskGeoDataFrame not {type(ddf)}")
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)
Expand Down Expand Up @@ -207,6 +230,7 @@ def to_parquet_dask(
columns=[series_name],
filesystem=filesystem,
load_divisions=False,
storage_options=storage_options,
)[series_name]
partition_bounds[series_name] = series.partition_bounds.to_dict()

Expand All @@ -217,7 +241,7 @@ def to_parquet_dask(
path,
filesystem=filesystem,
validate_schema=False,
**(engine_kwargs or {}),
**engine_kwargs,
)
all_metadata = copy.copy(pqds.common_metadata.metadata)
all_metadata[b'spatialpandas'] = b_spatial_metadata
Expand Down Expand Up @@ -268,6 +292,8 @@ def read_parquet_dask(
data written by dask/fastparquet, not otherwise.
build_sindex : boolean
Whether to build partition level spatial indexes to speed up indexing.
storage_options: Key/value pairs to be passed on to the file-system backend, if any.
engine_kwargs: pyarrow.parquet engine-related keyword arguments.
Returns:
DaskGeoDataFrame
"""
Expand Down Expand Up @@ -321,12 +347,12 @@ def _perform_read_parquet_dask(
storage_options=None,
engine_kwargs=None,
):
engine_kwargs = engine_kwargs or {}
filesystem = validate_coerce_filesystem(
paths[0],
filesystem,
storage_options,
)
engine_kwargs = engine_kwargs or {}
datasets = [
pa.parquet.ParquetDataset(
path,
Expand Down
1 change: 1 addition & 0 deletions spatialpandas/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def validate_coerce_filesystem(
path: Path as a string
filesystem: Optional fsspec filesystem object to use to open the file. If not
provided, filesystem type is inferred from path
storage_options: Key/value pairs to be passed on to the file-system backend, if any.

Returns:
fsspec file system
Expand Down