Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow fsspec URLs in open_(mf)dataset #4823

Merged
merged 24 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,13 +870,19 @@ def open_mfdataset(
"""
if isinstance(paths, str):
if is_remote_uri(paths):
raise ValueError(
"cannot do wild-card matching for paths that are remote URLs: "
"{!r}. Instead, supply paths as an explicit list of strings.".format(
paths
)
from fsspec.core import get_fs_token_paths

# get_fs_token_paths also allows arguments embedded in URLs
fs, _, _ = get_fs_token_paths(
paths,
mode="rb",
storage_options=kwargs.get("backend_kwargs", {}).get("storage_options"),
expand=False,
)
paths = sorted(glob(_normalize_path(paths)))
paths = fs.glob(fs._strip_protocol(paths))
paths = [fs.get_mapper(path) for path in paths]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit tricky. This assumes the backend is to want a mapper object (as the zarr backend does). But, what if the glob returns a list of netcdf files? Wouldn't we want a list of file objects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this is my comment for "should we actually special case zarr". It could make files - for now it would just error. We don't have tests for this, though, but now might be the time to start.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now tracking with the comments above, I think we have two options:

  1. inject some backend specific logic in the api here to make a decision about what sort of object to return (if engine=='zarr', return mapper; else return file_obj)
  2. only support globbing remote zarr stores

(1) seems to be the more reasonable thing to do here but is slightly less principled as we've been working to cleanly separate the api from the backends.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose a third alternative might be to pass the paths through, and create mappers in the zarr backend (will re-instantiate the FS, but that's fine) and add the opening of remote files into each of the other backends that can handle it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have essentially done 1), but excluded HTTP for the non-zarr path, because it has a special place for some backends (dap...). In any case, I don't suppose anyone is using globbing with http, since it's generally unreliable.

else:
paths = sorted(glob(_normalize_path(paths)))
else:
paths = [str(p) if isinstance(p, Path) else p for p in paths]

Expand Down
18 changes: 16 additions & 2 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import numpy as np

from xarray.tests import LooseVersion
martindurant marked this conversation as resolved.
Show resolved Hide resolved

from .. import coding, conventions
from ..core import indexing
from ..core.pycompat import integer_types
Expand Down Expand Up @@ -286,6 +288,7 @@ def open_group(
consolidated=False,
consolidate_on_close=False,
chunk_store=None,
storage_options=None,
append_dim=None,
write_region=None,
):
Expand All @@ -295,7 +298,15 @@ def open_group(
if isinstance(store, pathlib.Path):
store = os.fspath(store)

open_kwargs = dict(mode=mode, synchronizer=synchronizer, path=group)
open_kwargs = dict(
mode=mode,
synchronizer=synchronizer,
path=group,
)
if LooseVersion(zarr.__version__) >= "2.5.0":
open_kwargs["storage_options"] = storage_options
elif storage_options:
raise ValueError("Storage options only compatible with zarr>=2.5.0")
if chunk_store:
open_kwargs["chunk_store"] = chunk_store

Expand Down Expand Up @@ -530,6 +541,7 @@ def open_zarr(
consolidated=False,
overwrite_encoded_chunks=False,
chunk_store=None,
storage_options=None,
decode_timedelta=None,
use_cftime=None,
**kwargs,
Expand Down Expand Up @@ -642,6 +654,7 @@ def open_zarr(
"consolidated": consolidated,
"overwrite_encoded_chunks": overwrite_encoded_chunks,
"chunk_store": chunk_store,
"storage_options": storage_options,
}

ds = open_dataset(
Expand Down Expand Up @@ -678,8 +691,8 @@ def open_backend_dataset_zarr(
consolidated=False,
consolidate_on_close=False,
chunk_store=None,
storage_options=None,
):

store = ZarrStore.open_group(
filename_or_obj,
group=group,
Expand All @@ -688,6 +701,7 @@ def open_backend_dataset_zarr(
consolidated=consolidated,
consolidate_on_close=consolidate_on_close,
chunk_store=chunk_store,
storage_options=storage_options,
)

with close_on_error(store):
Expand Down
7 changes: 6 additions & 1 deletion xarray/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,12 @@ def close_on_error(f):


def is_remote_uri(path: str) -> bool:
return bool(re.search(r"^https?\://", path))
"""Finds URLs of the form protocol:// or protocol::

This also matches for http[s]://, which were the only remote URLs
supported in <=v0.16.2.
"""
return bool(re.search(r"^[a-z][a-z0-9]*(\://|\:\:)", path))


def read_magic_number(filename_or_obj, count=8):
Expand Down
1 change: 1 addition & 0 deletions xarray/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def LooseVersion(vstring):
has_nc_time_axis, requires_nc_time_axis = _importorskip("nc_time_axis")
has_rasterio, requires_rasterio = _importorskip("rasterio")
has_zarr, requires_zarr = _importorskip("zarr")
has_fsspec, requires_fsspec = _importorskip("fsspec")
has_iris, requires_iris = _importorskip("iris")
has_cfgrib, requires_cfgrib = _importorskip("cfgrib")
has_numbagg, requires_numbagg = _importorskip("numbagg")
Expand Down
50 changes: 49 additions & 1 deletion xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
requires_cfgrib,
requires_cftime,
requires_dask,
requires_fsspec,
requires_h5netcdf,
requires_netCDF4,
requires_pseudonetcdf,
Expand Down Expand Up @@ -3041,7 +3042,12 @@ def test_open_mfdataset(self):
with raises_regex(IOError, "no files to open"):
open_mfdataset("foo-bar-baz-*.nc")

with raises_regex(ValueError, "wild-card"):
@requires_fsspec
def test_open_mfdataset_no_files(self):
pytest.importorskip("aiobotocore")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means this test is always skipped. Should this be added to some of the environments?


with raises_regex(OSError, "no files"):
# glob is attempted as of #4823, but finds no files
open_mfdataset("http://some/remote/uri")

def test_open_mfdataset_2d(self):
Expand Down Expand Up @@ -4799,6 +4805,48 @@ def test_extract_zarr_variable_encoding():
)


@requires_zarr
@requires_fsspec
def test_open_fsspec():
import fsspec # type: ignore
keewis marked this conversation as resolved.
Show resolved Hide resolved
import zarr

if not hasattr(zarr.storage, "FSStore") or not hasattr(
zarr.storage.FSStore, "getitems"
):
pytest.skip("zarr too old")

ds = open_dataset(os.path.join(os.path.dirname(__file__), "data", "example_1.nc"))

m = fsspec.filesystem("memory")
mm = m.get_mapper("out1.zarr")
ds.to_zarr(mm) # old interface
ds0 = ds.copy()
ds0["time"] = ds.time + pd.to_timedelta("1 day")
mm = m.get_mapper("out2.zarr")
ds0.to_zarr(mm) # old interface

# single dataset
url = "memory://out2.zarr"
ds2 = open_dataset(url, engine="zarr")
assert ds0 == ds2

# single dataset with caching
url = "simplecache::memory://out2.zarr"
ds2 = open_dataset(url, engine="zarr")
assert ds0 == ds2

# multi dataset
url = "memory://out*.zarr"
ds2 = open_mfdataset(url, engine="zarr")
assert xr.concat([ds, ds0], dim="time") == ds2

# multi dataset with caching
url = "simplecache::memory://out*.zarr"
ds2 = open_mfdataset(url, engine="zarr")
assert xr.concat([ds, ds0], dim="time") == ds2


@requires_h5netcdf
def test_load_single_value_h5netcdf(tmp_path):
"""Test that numeric single-element vector attributes are handled fine.
Expand Down