Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xarray open_mfdataset with engine Zarr #4187

Merged
merged 105 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from 97 commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
f55ed1c
create def for multiple zarr files and added commentary/definition, w…
Apr 23, 2020
49f6512
just as with ``xr.open_mfdatasets``, identify the paths as local dire…
Apr 23, 2020
f35a3e5
added error if no path
Apr 23, 2020
9f728aa
finished copying similar code from `xr.open_mfdatasets`
Apr 23, 2020
8d0a844
remove blank lines
Apr 23, 2020
b3b0f1d
fixed typo
Apr 23, 2020
2221943
added ``xr.open_mzarr()`` to the list of available modules to call
Apr 23, 2020
ac35e7c
imported missing function
Apr 23, 2020
64654f3
imported missing glob
Apr 23, 2020
d5a5cef
imported function from backend.api
Apr 23, 2020
4c0ef19
imported function to facilitate mzarr
Apr 23, 2020
d158c21
correctly imported functions from core to mzarr
Apr 23, 2020
5171420
imported to use on open_mzarr
Apr 23, 2020
e1e51bb
removed lock and autoclose since not taken by ``open_zarr``
Apr 23, 2020
b6bf2cf
fixed typo
Apr 23, 2020
3bc4be8
class is not needed since zarr stores don`t remain open
Apr 23, 2020
a79b125
removed old behavior
Apr 23, 2020
2d3bbb5
set default
Apr 23, 2020
f7cf580
listed open_mzarr
Apr 25, 2020
53c8623
removed unused imported function
Apr 25, 2020
34d755e
imported Path - hadn`t before
Apr 25, 2020
b39b37e
remove unncessesary comments
Apr 25, 2020
276006a
modified comments
Apr 25, 2020
6f04be6
isorted zarr
Apr 25, 2020
aa97e1a
isorted
Apr 25, 2020
06de16a
erased open_mzarr. Added capability to open_dataset to open zarr files
Apr 28, 2020
f94fc9f
removed imported but unused
Apr 28, 2020
16e08e3
comment to `zarr` engine
Apr 28, 2020
22828fc
added chunking code from `open_zarr`
Apr 28, 2020
021f2cc
remove import `open_mzarr``
Apr 28, 2020
985f28c
removed `open_mzarr`` from top-level-function
Apr 28, 2020
e8ed887
missing return in nested function
Apr 29, 2020
d693514
moved outside of nested function, had touble with reading before assi…
Apr 29, 2020
df34f18
added missing argument associated with zarr stores, onto the definiti…
Apr 29, 2020
98351c7
isort zarr.py
Apr 29, 2020
160bd67
removed blank lines, fixed typo on `chunks`
Apr 29, 2020
7e57e9b
removed imported but unused
Apr 29, 2020
ac0f093
restored conditional for `auto`
Apr 29, 2020
6a1516c
removed imported but unused `dask.array`
Apr 29, 2020
8999faf
added capabilities for file_or_obj to be a mutablemapper such as `fss…
Apr 29, 2020
5df0985
moved to a different conditional since file_or_obj is a mutablemappin…
Apr 29, 2020
2d94ea2
isort api.py
Apr 29, 2020
377ef53
restored the option for when file_or_obk is a str, such as an url.
Apr 29, 2020
f48c84b
fixed relabel
Apr 29, 2020
8376cca
update open_dataset for zarr files
Apr 29, 2020
aed1cc5
remove open_zarr from tests, now open_dataset(engine=`zarr`)
Apr 29, 2020
b488363
remove extra file, and raise deprecating warning on open_zarr
Apr 29, 2020
bae7f10
added internal call to open_dataset from depricated open_zarr
Apr 29, 2020
37ff214
defined engine=`zarr`
Apr 29, 2020
b8b98f5
correct argument for open_dataset
Apr 29, 2020
5c37329
pass arguments as backend_kwargs
Apr 29, 2020
831f15b
pass backend_kwargs as argument
Apr 29, 2020
80dd7da
typo
Apr 29, 2020
4ebf380
set `overwrite_enconded_chunks as backend_kwargs
Apr 29, 2020
4ce3007
do not pass as backend, use for chunking
Apr 29, 2020
89a780b
removed commented code
May 22, 2020
6f6eb23
moved definitions to zarr backends
May 22, 2020
62893ab
Merge pull request #1 from Mikejmnez/new_branch
May 22, 2020
13be3e0
Ensure class functions have necessary variables
weiji14 Jun 29, 2020
1977ba1
Merge remote-tracking branch 'origin/master' into open_mfzarr
weiji14 Jun 29, 2020
afbcf78
Combine MutableMapping and Zarr engine condition
weiji14 Jun 29, 2020
cba93c3
Pop out overwrite_encoded_chunks after shallow copy backend_kwargs dict
weiji14 Jun 29, 2020
bc740f6
Fix some errors noticed by PEP8
weiji14 Jun 30, 2020
746caa6
Reorganize code in backends api.py and actually test using engine zarr
weiji14 Jun 30, 2020
ef2a0f6
Add back missing decode_timedelta kwarg
weiji14 Jun 30, 2020
a7b24ff
Add back a missing engine="zarr" to test_distributed.py
weiji14 Jun 30, 2020
6823721
Ensure conditional statements make sense
weiji14 Jun 30, 2020
3fa73a7
Fix UnboundLocalError on 'chunks' referenced before assignment
weiji14 Jun 30, 2020
70fa30e
Run isort to fix import order
weiji14 Jun 30, 2020
cb6d066
Fix tests where kwargs needs to be inside of backend_kwargs dict now
weiji14 Jun 30, 2020
cd0b9ef
Change open_zarr to open_dataset with engine="zarr" in io.rst
weiji14 Jun 30, 2020
f2c368a
Fix test_distributed by wrapping consolidated in backend_kwargs dict
weiji14 Jun 30, 2020
0530bba
Ensure read-only mode when using open_dataset with engine="zarr"
weiji14 Jul 1, 2020
738303b
Turn chunks from "auto" to None if dask is not available
weiji14 Jul 1, 2020
38fc6f7
Add back a missing else statement in maybe_chunk
weiji14 Jul 1, 2020
aca2012
Allow xfail test_vectorized_indexing when has_dask
weiji14 Jul 1, 2020
0b34ab8
Typo on chunks arg in open_dataset
weiji14 Jul 1, 2020
6fbeadf
Fix ZeroDivisionError by adding back check that chunks is not False
weiji14 Jul 1, 2020
6f6aae7
Fix a typo that was causing TypeError: 'method' object is not iterable
weiji14 Jul 1, 2020
543a1c7
Move the `if not chunks` block to after auto detect
weiji14 Jul 1, 2020
aa4833f
Revert "Allow xfail test_vectorized_indexing when has_dask"
weiji14 Jul 1, 2020
6b99225
Temporarily xfail test_vectorized_indexing with or without dask
weiji14 Jul 1, 2020
5571fff
Put zarr in open_mfdataset engine list
weiji14 Jul 1, 2020
b9a239e
Test open_mfdataset_manyfiles with engine zarr
weiji14 Jul 1, 2020
e9c35f9
Remember to set a ._file_obj when using Zarr
weiji14 Jul 2, 2020
827e546
Expect np.ndarray when using open_mfdataset on Zarr with chunks None
weiji14 Jul 2, 2020
1484a2a
Merge branch 'master' into open_mfzarr
weiji14 Jul 2, 2020
ad6f31e
Add an entry to what's new for open_mfdataset with Zarr engine
weiji14 Jul 2, 2020
e2e1c81
Make zarr engine's custom chunk mechanism more in line with ds.chunk
weiji14 Jul 2, 2020
dce4e7c
Workaround problem where dask arrays aren't returned when chunks is None
weiji14 Jul 3, 2020
31ce87d
Default to chunks="auto" for Zarr tests to fix test_vectorized_indexing
weiji14 Jul 3, 2020
09bf681
Merge branch 'master' into open_mfzarr
weiji14 Jul 5, 2020
47f1e32
Merge remote-tracking branch 'origin/master' into open_mfzarr
weiji14 Jul 20, 2020
da42dab
Merge branch 'master' into open_mfzarr
weiji14 Aug 12, 2020
9b9dc3a
Merge branch 'master' into open_mfzarr
dcherian Aug 21, 2020
4a0b922
Merge branch 'master' into open_mfzarr
weiji14 Aug 26, 2020
cd783a5
Fix test by passing in chunk_store to backend_kwargs
weiji14 Aug 26, 2020
2c28a98
Merge remote-tracking branch 'origin/master' into open_mfzarr
weiji14 Sep 17, 2020
7b34d1b
Revert "Change open_zarr to open_dataset with engine="zarr" in io.rst"
weiji14 Jun 30, 2020
40c4d46
Remove open_zarr DeprecationWarning
weiji14 Sep 17, 2020
2c73e0b
Merge branch 'master' into open_mfzarr
weiji14 Sep 21, 2020
3b0e9b1
Update open_dataset docstring to specify chunk options for zarr engine
weiji14 Sep 21, 2020
d4398d4
Let only chunks = None return non-chunked arrays
weiji14 Sep 21, 2020
da7baae
Remove for-loop in test_manual_chunk since testing only one no_chunk
weiji14 Sep 21, 2020
48dae50
Update open_dataset docstring to remove mention of chunks=None with Zarr
weiji14 Sep 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions doc/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -898,11 +898,11 @@ can be omitted as it will internally be set to ``'a'``.
To store variable length strings use ``dtype=object``.

To read back a zarr dataset that has been created this way, we use the
:py:func:`open_zarr` method:
:py:func:`open_dataset` method with ``engine="zarr"``:

.. ipython:: python

ds_zarr = xr.open_zarr("path/to/directory.zarr")
ds_zarr = xr.open_dataset("path/to/directory.zarr", engine="zarr")
ds_zarr

Cloud Storage Buckets
Expand All @@ -919,7 +919,7 @@ pass to xarray::
# write to the bucket
ds.to_zarr(store=gcsmap)
# read it back
ds_gcs = xr.open_zarr(gcsmap)
ds_gcs = xr.open_dataset(gcsmap, engine="zarr")

.. _Zarr: http://zarr.readthedocs.io/
.. _Amazon S3: https://aws.amazon.com/s3/
Expand Down Expand Up @@ -970,12 +970,12 @@ with consolidated metadata. To write consolidated metadata, pass the
``consolidated=True`` option to the
:py:attr:`Dataset.to_zarr` method::

ds.to_zarr('foo.zarr', consolidated=True)
ds.to_zarr("foo.zarr", consolidated=True)

To read a consolidated store, pass the ``consolidated=True`` option to
:py:func:`open_zarr`::
To read a consolidated store, pass ``{"consolidated": True}`` to the
backend_kwargs option when using :py:func:`open_dataset` with ``engine="zarr"``::

ds = xr.open_zarr('foo.zarr', consolidated=True)
ds = xr.open_dataset("foo.zarr", engine="zarr", backend_kwargs={"consolidated": True})
weiji14 marked this conversation as resolved.
Show resolved Hide resolved

Xarray can't perform consolidation on pre-existing zarr datasets. This should
be done directly from zarr, as described in the
Expand Down
3 changes: 3 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ New Features
of :py:class:`DataArray` and :py:class:`Dataset` objects and
document the new method in :doc:`internals`. (:pull:`4248`).
By `Justus Magin <https://github.com/keewis>`_.
- :py:func:`open_dataset` and :py:func:`open_mfdataset`
now works with ``engine="zarr"`` (:issue:`3668`, :pull:`4003`, :pull:`4187`).
By `Miguel Jimenez <https://github.com/Mikejmnez>`_ and `Wei Ji Leong <https://github.com/weiji14>`_.
- Add support for parsing datetime strings formatted following the default
string representation of cftime objects, i.e. YYYY-MM-DD hh:mm:ss, in
partial datetime string indexing, as well as :py:meth:`~xarray.cftime_range`
Expand Down
70 changes: 61 additions & 9 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os.path
import warnings
from collections.abc import MutableMapping
from glob import glob
from io import BytesIO
from numbers import Number
Expand Down Expand Up @@ -344,12 +345,12 @@ def open_dataset(
If True, decode the 'coordinates' attribute to identify coordinates in
the resulting dataset.
engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib", \
"pseudonetcdf"}, optional
"pseudonetcdf", "zarr"}, optional
Engine to use when reading files. If not provided, the default engine
is chosen based on available dependencies, with a preference for
"netcdf4".
chunks : int or dict, optional
If chunks is provided, it used to load the new dataset into dask
If chunks is provided, it is used to load the new dataset into dask
arrays. ``chunks={}`` loads the dataset with dask using a single
chunk for all arrays.
weiji14 marked this conversation as resolved.
Show resolved Hide resolved
lock : False or lock-like, optional
Expand Down Expand Up @@ -413,6 +414,7 @@ def open_dataset(
"pynio",
"cfgrib",
"pseudonetcdf",
"zarr",
]
if engine not in engines:
raise ValueError(
Expand Down Expand Up @@ -447,7 +449,7 @@ def open_dataset(
if backend_kwargs is None:
backend_kwargs = {}

def maybe_decode_store(store, lock=False):
def maybe_decode_store(store, chunks, lock=False):
ds = conventions.decode_cf(
store,
mask_and_scale=mask_and_scale,
Expand All @@ -461,7 +463,7 @@ def maybe_decode_store(store, lock=False):

_protect_dataset_variables_inplace(ds, cache)

if chunks is not None:
if chunks is not None and engine != "zarr":
from dask.base import tokenize

# if passed an actual file path, augment the token with
Expand All @@ -487,10 +489,40 @@ def maybe_decode_store(store, lock=False):
)
name_prefix = "open_dataset-%s" % token
ds2 = ds.chunk(chunks, name_prefix=name_prefix, token=token)
ds2._file_obj = ds._file_obj

elif engine == "zarr":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern with this code is that introducing an entirely separate code-path inside open_dataset() for chunking zarr in particular feels strange and a little unexpected. Any time we use totally separate code branches for some logic, the odds of introducing inconsistencies/bugs increases greatly.

I wonder if we could consolidate this logic somehow in order to avoid adding a separate branch for the code here? For example, we could put a get_chunk method on all xarray backends classes, even if it currently only returns a filler value and/or raises an error for chunks='auto'? Chunking is not unique to zarr, e.g., netCDF4 files also have chunks, although the default "auto" chunking logic should probably be different.

I would be OK holding this off for a later clean-up, but this really would be worth doing eventually. CC @alexamici RE: the backends refactor.

Copy link
Contributor Author

@weiji14 weiji14 Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern with this code is that introducing an entirely separate code-path inside open_dataset() for chunking zarr in particular feels strange and a little unexpected. Any time we use totally separate code branches for some logic, the odds of introducing inconsistencies/bugs increases greatly.

I wonder if we could consolidate this logic somehow in order to avoid adding a separate branch for the code here? For example, we could put a get_chunk method on all xarray backends classes, even if it currently only returns a filler value and/or raises an error for chunks='auto'? Chunking is not unique to zarr, e.g., netCDF4 files also have chunks, although the default "auto" chunking logic should probably be different.

Thanks for pointing this out! I agree completely that the open_dataset() function is overdue for a refactor, it was a nightmare to go through all the if-then branches, but the comprehensive test suite helped to catch most of the bugs and I've tested it on my own real world dataset so the logic should be ok for now 🤞.

I would be OK holding this off for a later clean-up, but this really would be worth doing eventually. CC @alexamici RE: the backends refactor.

Personally I would prefer to hold this off, since this open_mfdataset PR (and the previous one at #4003) has been sitting around for months, and I've had to resolve quite a few merge conflicts to keep up. No point in contaminating this complex PR by refactoring the NetCDF logic either.

# adapted from Dataset.Chunk() and taken from open_zarr
if not (isinstance(chunks, (int, dict)) or chunks is None):
if chunks != "auto":
raise ValueError(
"chunks must be an int, dict, 'auto', or None. "
"Instead found %s. " % chunks
)

if chunks == "auto":
try:
import dask.array # noqa
except ImportError:
chunks = None

# auto chunking needs to be here and not in ZarrStore because
# the variable chunks does not survive decode_cf
# return trivial case
if not chunks: # e.g. chunks is 0, None or {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cam we make this if chunks is None instead?

I know this is a discrepancy from how open_zarr() works today, but currently open_dataset(..., chunks={}) is a way to open a dataset with dask chunks equal to the full size of any arrays.

I doubt the (different) behavior of open_zarr in this case was intentional....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try it locally and see if the tests break, felt like there was a reason it had to be not chunks but can't remember the context now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Not a big deal if we have to push off this clean-up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, 2 tests would break with a ZeroDivisionError if we switch to if chunks is None. Specifically:

  • TestZarrDictStore.test_manual_chunk
  • TestZarrDirectoryStore.test_manual_chunk

Related to my comment hidden in the mess above at #4187 (comment) 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I would expect test_manual_chunk to fail here: it is explicitly verifying that chunks=0 and chunks={} result in-memory numpy arrays. Does it work if you remove those cases, e.g., by setting NO_CHUNKS = (None,) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, setting NO_CHUNKS = (None,) works with if chunks is None. I'll make the change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it helps by the way, test_manual_chunk with NO_CHUNKS = (None, 0, {}) was added in #2530.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we can go ahead and change that. It doesn't look like that was carefully evaluated in #2530.

return ds
Copy link
Contributor Author

@weiji14 weiji14 Jul 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this code was more or less copied from open_zarr. The biggest thing to note is that users switching from open_zarr to open_dataset(..., engine="zarr") will encounter a different default chunks setting:

  • open_zarr defaults to chunks="auto"
  • open_dataset default to chunks=None

What this means in practice, is that data will be returned as numpy arrays instead of dask arrays with the default chunks=None setting (for users who didn't explicitly set a chunks argument). If I'm not mistaken, this is because the try: import dask.array block isn't executed, and so dask isn't used by default.

Copy link
Contributor

@dcherian dcherian Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open_zarr defaults to chunks="auto"
open_dataset default to chunks=None

At one of our past meetings, the consensus was to keep open_zarr around with chunks="auto" as default for backward compatibility.

open_dataset(engine="zarr") should use the current default for chunks kwarg.

With that in mind can we avoid adding this stuff here?

Copy link
Contributor Author

@weiji14 weiji14 Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open_zarr defaults to chunks="auto"
open_dataset default to chunks=None

Happy with this 👌

With that in mind can we avoid adding this stuff here?

Do you mean removing the if chunks == "auto": ... code block? I think it should be kept here in open_dataset (instead of putting it in the open_zarr function) because users might want to do open_mfdataset(engine="zarr", chunks="auto"), and this would require the chunks="auto" logic to be in open_dataset.


if isinstance(chunks, int):
chunks = dict.fromkeys(ds.dims, chunks)

variables = {
k: store.maybe_chunk(k, v, chunks, overwrite_encoded_chunks)
for k, v in ds.variables.items()
}
ds2 = ds._replace(variables)

else:
ds2 = ds

ds2._file_obj = ds._file_obj
return ds2

if isinstance(filename_or_obj, Path):
Expand All @@ -499,6 +531,17 @@ def maybe_decode_store(store, lock=False):
if isinstance(filename_or_obj, AbstractDataStore):
store = filename_or_obj

elif isinstance(filename_or_obj, MutableMapping) and engine == "zarr":
# Zarr supports a wide range of access modes, but for now xarray either
# reads or writes from a store, never both.
# For open_dataset(engine="zarr"), we only read (i.e. mode="r")
mode = "r"
_backend_kwargs = backend_kwargs.copy()
overwrite_encoded_chunks = _backend_kwargs.pop("overwrite_encoded_chunks", None)
store = backends.ZarrStore.open_group(
filename_or_obj, mode=mode, group=group, **_backend_kwargs
)

elif isinstance(filename_or_obj, str):
filename_or_obj = _normalize_path(filename_or_obj)

Expand Down Expand Up @@ -526,7 +569,16 @@ def maybe_decode_store(store, lock=False):
store = backends.CfGribDataStore(
filename_or_obj, lock=lock, **backend_kwargs
)

elif engine == "zarr":
# on ZarrStore, mode='r', synchronizer=None, group=None,
# consolidated=False.
_backend_kwargs = backend_kwargs.copy()
overwrite_encoded_chunks = _backend_kwargs.pop(
"overwrite_encoded_chunks", None
)
store = backends.ZarrStore.open_group(
filename_or_obj, group=group, **_backend_kwargs
)
else:
if engine not in [None, "scipy", "h5netcdf"]:
raise ValueError(
Expand All @@ -542,7 +594,7 @@ def maybe_decode_store(store, lock=False):
)

with close_on_error(store):
ds = maybe_decode_store(store)
ds = maybe_decode_store(store, chunks)

# Ensure source filename always stored in dataset object (GH issue #2550)
if "source" not in ds.encoding:
Expand Down Expand Up @@ -794,7 +846,7 @@ def open_mfdataset(
If provided, call this function on each dataset prior to concatenation.
You can find the file-name from which each dataset was loaded in
``ds.encoding["source"]``.
engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib"}, \
engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib", "zarr"}, \
optional
Engine to use when reading files. If not provided, the default engine
is chosen based on available dependencies, with a preference for
Expand Down
Loading