-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xarray.open_mzar: open multiple zarr files (in parallel) #4003
Changes from 39 commits
f55ed1c
49f6512
f35a3e5
9f728aa
8d0a844
b3b0f1d
2221943
ac35e7c
64654f3
d5a5cef
4c0ef19
d158c21
5171420
e1e51bb
b6bf2cf
3bc4be8
a79b125
2d3bbb5
f7cf580
53c8623
34d755e
b39b37e
276006a
6f04be6
aa97e1a
06de16a
f94fc9f
16e08e3
22828fc
021f2cc
985f28c
e8ed887
d693514
df34f18
98351c7
160bd67
7e57e9b
ac0f093
6a1516c
8999faf
5df0985
2d94ea2
377ef53
f48c84b
8376cca
aed1cc5
b488363
bae7f10
37ff214
b8b98f5
5c37329
831f15b
80dd7da
4ebf380
4ce3007
89a780b
6f6eb23
62893ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -303,6 +303,7 @@ def open_dataset( | |
drop_variables=None, | ||
backend_kwargs=None, | ||
use_cftime=None, | ||
overwrite_encoded_chunks=False, | ||
): | ||
"""Open and decode a dataset from a file or file-like object. | ||
|
||
|
@@ -345,7 +346,7 @@ def open_dataset( | |
If True, decode the 'coordinates' attribute to identify coordinates in | ||
the resulting dataset. | ||
engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio', 'cfgrib', \ | ||
'pseudonetcdf'}, optional | ||
'pseudonetcdf', 'zarr'}, optional | ||
Engine to use when reading files. If not provided, the default engine | ||
is chosen based on available dependencies, with a preference for | ||
'netcdf4'. | ||
|
@@ -383,6 +384,10 @@ def open_dataset( | |
represented using ``np.datetime64[ns]`` objects. If False, always | ||
decode times to ``np.datetime64[ns]`` objects; if this is not possible | ||
raise an error. | ||
overwrite_encoded_chunks: bool, optional | ||
Whether to drop the zarr chunks encoded for each variable when a | ||
dataset is loaded with specified chunk sizes (default: False) | ||
|
||
|
||
Returns | ||
------- | ||
|
@@ -409,7 +414,9 @@ def open_dataset( | |
"pynio", | ||
"cfgrib", | ||
"pseudonetcdf", | ||
"zarr", | ||
] | ||
|
||
if engine not in engines: | ||
raise ValueError( | ||
"unrecognized engine for open_dataset: {}\n" | ||
|
@@ -455,36 +462,7 @@ def maybe_decode_store(store, lock=False): | |
|
||
_protect_dataset_variables_inplace(ds, cache) | ||
|
||
if chunks is not None: | ||
from dask.base import tokenize | ||
|
||
# if passed an actual file path, augment the token with | ||
# the file modification time | ||
if isinstance(filename_or_obj, str) and not is_remote_uri(filename_or_obj): | ||
mtime = os.path.getmtime(filename_or_obj) | ||
else: | ||
mtime = None | ||
token = tokenize( | ||
filename_or_obj, | ||
mtime, | ||
group, | ||
decode_cf, | ||
mask_and_scale, | ||
decode_times, | ||
concat_characters, | ||
decode_coords, | ||
engine, | ||
chunks, | ||
drop_variables, | ||
use_cftime, | ||
) | ||
name_prefix = "open_dataset-%s" % token | ||
ds2 = ds.chunk(chunks, name_prefix=name_prefix, token=token) | ||
ds2._file_obj = ds._file_obj | ||
else: | ||
ds2 = ds | ||
|
||
return ds2 | ||
return ds | ||
|
||
if isinstance(filename_or_obj, Path): | ||
filename_or_obj = str(filename_or_obj) | ||
|
@@ -519,7 +497,14 @@ def maybe_decode_store(store, lock=False): | |
store = backends.CfGribDataStore( | ||
filename_or_obj, lock=lock, **backend_kwargs | ||
) | ||
|
||
elif engine == "zarr": | ||
# on ZarrStore, mode='r', synchronizer=None, group=None, | ||
# consolidated=False. | ||
store = backends.ZarrStore.open_group( | ||
filename_or_obj, | ||
group=group, | ||
**backend_kwargs | ||
) | ||
else: | ||
if engine not in [None, "scipy", "h5netcdf"]: | ||
raise ValueError( | ||
|
@@ -542,7 +527,99 @@ def maybe_decode_store(store, lock=False): | |
if isinstance(filename_or_obj, str): | ||
ds.encoding["source"] = filename_or_obj | ||
|
||
return ds | ||
if chunks is not None: | ||
from dask.base import tokenize | ||
if engine != 'zarr': | ||
|
||
# if passed an actual file path, augment the token with | ||
# the file modification time | ||
if isinstance(filename_or_obj, str) and not is_remote_uri(filename_or_obj): | ||
mtime = os.path.getmtime(filename_or_obj) | ||
else: | ||
mtime = None | ||
token = tokenize( | ||
filename_or_obj, | ||
mtime, | ||
group, | ||
decode_cf, | ||
mask_and_scale, | ||
decode_times, | ||
concat_characters, | ||
decode_coords, | ||
engine, | ||
chunks, | ||
drop_variables, | ||
use_cftime, | ||
) | ||
name_prefix = "open_dataset-%s" % token | ||
ds2 = ds.chunk(chunks, name_prefix=name_prefix, token=token) | ||
ds2._file_obj = ds._file_obj | ||
|
||
else: # file is zarr! | ||
|
||
# adapted from Dataset.Chunk() and taken from open_zarr | ||
if not isinstance(chunks, (int, dict)): | ||
if chunks != "auto": | ||
raise ValueError( | ||
"chunks must be an int, dict, 'auto', or None. " | ||
"Instead found %s. " % chunks | ||
) | ||
if isinstance(chunks, int): | ||
chunks = dict.fromkeys(ds.dims, chunks) | ||
|
||
if isinstance(chunks, tuple) and len(chunks) == len(ds.dims): | ||
chunks = dict(zip(ds.dims, chunks)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this line is reachable. How could chunks ever be a tuple when line 561 checks against There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, doesn't look like is reachable. I'll remove it. |
||
|
||
def get_chunk(name, var, chunks): | ||
chunk_spec = dict(zip(var.dims, var.encoding.get("chunks"))) | ||
|
||
# Coordinate labels aren't chunked | ||
if var.ndim == 1 and var.dims[0] == name: | ||
return chunk_spec | ||
|
||
if chunks == "auto": | ||
return chunk_spec | ||
|
||
for dim in var.dims: | ||
if dim in chunks: | ||
spec = chunks[dim] | ||
if isinstance(spec, int): | ||
spec = (spec,) | ||
if isinstance(spec, (tuple, list)) and chunk_spec[dim]: | ||
if any(s % chunk_spec[dim] for s in spec): | ||
warnings.warn( | ||
"Specified Dask chunks %r would " | ||
"separate Zarr chunk shape %r for " | ||
"dimension %r. This significantly " | ||
"degrades performance. Consider " | ||
"rechunking after loading instead." | ||
% (chunks[dim], chunk_spec[dim], dim), | ||
stacklevel=2, | ||
) | ||
chunk_spec[dim] = chunks[dim] | ||
return chunk_spec | ||
|
||
def maybe_chunk(name, var, chunks): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I have a slight preference to relocate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does make sense, and would make the code cleaner/easier to read and/or debug. You think it should be implemented within |
||
chunk_spec = get_chunk(name, var, chunks) | ||
|
||
if (var.ndim > 0) and (chunk_spec is not None): | ||
# does this cause any data to be read? | ||
token2 = tokenize(name, var._data) | ||
name2 = "zarr-%s" % token2 | ||
var = var.chunk(chunk_spec, name=name2, lock=None) | ||
if overwrite_encoded_chunks and var.chunks is not None: | ||
var.encoding["chunks"] = tuple(x[0] for x in var.chunk) | ||
return var | ||
else: | ||
return var | ||
|
||
variables = {k: maybe_chunk(k, v, chunks) for k, v in ds.variables.items()} | ||
ds2 = ds._replace_vars_and_dims(variables) | ||
return ds2 | ||
else: | ||
ds2 = ds | ||
|
||
return ds2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we return in the block above, I think it would be okay to also return in line 620. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This part I am not so sure I completely understand. This brings to a question: Can we just have ds = ds.chunk(chunks, ...) instead of ds2 = ds.chunk(chunks,...)
ds2._file_obj = ds._file_obj that way I could just return |
||
|
||
|
||
def open_dataarray( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this only applies to the zarr backend, would it make sense to pass this through
backend_kwargs
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, it does make sense to not make it an argument for
open_dataset
, and pass it toopen_dataset
throughbackend_kwargs
from depricatedopen_zarr
(i.e. zarr.py). However, it is not an argument forZarrStore.open_group
, instead it is used withindef maybe_chunk
. I had to pop-it and use it before passingbackend_kwargs
toopen_group
on zarr.pyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add
overwrite_encoded_chunks
as a new argument inZarrStore.open_group
? That seems preferable to adding another highly backend specific method on toopen_dataset
.(To be honest, I'm not entirely sure why we need
overwrite_encoded_chunks
... it's not obvious from the description why it's useful)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quoting from #2530 (comment), where
overwrite_encoded_chunks
was initially added.As mentioned by @Mikejmnez ,
overwrite_encoded_chunks
is not actually used byZarrStore.open_group
, it is used at theopen_dataset
level. So it's not strictly a 'backend_kwarg', but we can still put it in there I suppose? Just looking through the code now to see how best to do it.