-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Write manifests to zarr store #45
Changes from all commits
386844a
02e457e
43872ab
f16911b
c319d30
8f0ee51
fc4cb84
c8add61
42e17d1
23772b9
4f2655f
b4c38fe
79f39e1
98a259e
c3d88d5
973eccd
0151652
6ba41de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
import xarray as xr | ||
import numpy as np | ||
import xarray.testing as xrt | ||
from virtualizarr import open_virtual_dataset, ManifestArray | ||
from virtualizarr.manifests.manifest import ChunkEntry | ||
|
||
|
||
def test_zarr_v3_roundtrip(tmpdir): | ||
arr = ManifestArray( | ||
chunkmanifest={"0.0": ChunkEntry(path="test.nc", offset=6144, length=48)}, | ||
zarray=dict( | ||
shape=(2, 3), | ||
dtype=np.dtype("<i8"), | ||
chunks=(2, 3), | ||
compressor=None, | ||
filters=None, | ||
fill_value=None, | ||
order="C", | ||
zarr_format=3, | ||
), | ||
) | ||
original = xr.Dataset({"a": (["x", "y"], arr)}, attrs={"something": 0}) | ||
|
||
original.virtualize.to_zarr(tmpdir / "store.zarr") | ||
roundtrip = open_virtual_dataset(tmpdir / "store.zarr", filetype="zarr_v3", indexes={}) | ||
|
||
xrt.assert_identical(roundtrip, original) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
The MIT License (MIT) | ||
|
||
Copyright (c) 2015-2024 Zarr Developers <https://github.com/zarr-developers> | ||
|
||
Permission is hereby granted, free of charge, to any person obtaining a copy | ||
of this software and associated documentation files (the "Software"), to deal | ||
in the Software without restriction, including without limitation the rights | ||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
copies of the Software, and to permit persons to whom the Software is | ||
furnished to do so, subject to the following conditions: | ||
|
||
The above copyright notice and this permission notice shall be included in all | ||
copies or substantial portions of the Software. | ||
|
||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
SOFTWARE. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
import json | ||
import numbers | ||
|
||
from typing import Any | ||
|
||
|
||
class NumberEncoder(json.JSONEncoder): | ||
def default(self, o): | ||
# See json.JSONEncoder.default docstring for explanation | ||
# This is necessary to encode numpy dtype | ||
if isinstance(o, numbers.Integral): | ||
return int(o) | ||
if isinstance(o, numbers.Real): | ||
return float(o) | ||
return json.JSONEncoder.default(self, o) | ||
|
||
|
||
def json_dumps(o: Any) -> bytes: | ||
"""Write JSON in a consistent, human-readable way.""" | ||
return json.dumps( | ||
o, indent=4, sort_keys=True, ensure_ascii=True, separators=(",", ": "), cls=NumberEncoder | ||
).encode("ascii") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
from pathlib import Path | ||
from typing import List, Literal, Mapping, Optional, Union, overload, MutableMapping, Iterable | ||
|
||
import ujson # type: ignore | ||
|
@@ -10,6 +11,7 @@ | |
import virtualizarr.kerchunk as kerchunk | ||
from virtualizarr.kerchunk import KerchunkStoreRefs, FileType | ||
from virtualizarr.manifests import ChunkManifest, ManifestArray | ||
from virtualizarr.zarr import dataset_to_zarr, attrs_from_zarr_group_json, metadata_from_zarr_json | ||
|
||
|
||
class ManifestBackendArray(ManifestArray, BackendArray): | ||
|
@@ -39,7 +41,7 @@ def open_virtual_dataset( | |
File path to open as a set of virtualized zarr arrays. | ||
filetype : FileType, default None | ||
Type of file to be opened. Used to determine which kerchunk file format backend to use. | ||
Can be one of {'netCDF3', 'netCDF4'}. | ||
Can be one of {'netCDF3', 'netCDF4', 'zarr_v3'}. | ||
If not provided will attempt to automatically infer the correct filetype from the the filepath's extension. | ||
drop_variables: list[str], default is None | ||
Variables in the file to drop before returning. | ||
|
@@ -76,57 +78,116 @@ def open_virtual_dataset( | |
if common: | ||
raise ValueError(f"Cannot both load and drop variables {common}") | ||
|
||
# this is the only place we actually always need to use kerchunk directly | ||
# TODO avoid even reading byte ranges for variables that will be dropped later anyway? | ||
vds_refs = kerchunk.read_kerchunk_references_from_file( | ||
filepath=filepath, | ||
filetype=filetype, | ||
) | ||
virtual_vars = virtual_vars_from_kerchunk_refs( | ||
vds_refs, | ||
drop_variables=drop_variables + loadable_variables, | ||
virtual_array_class=virtual_array_class, | ||
) | ||
ds_attrs = kerchunk.fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {}) | ||
|
||
if indexes is None or len(loadable_variables) > 0: | ||
# TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables... | ||
# TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references | ||
# TODO really we probably want a dedicated xarray backend that iterates over all variables only once | ||
ds = xr.open_dataset(filepath, drop_variables=drop_variables) | ||
|
||
if indexes is None: | ||
# add default indexes by reading data from file | ||
indexes = {name: index for name, index in ds.xindexes.items()} | ||
elif indexes != {}: | ||
# TODO allow manual specification of index objects | ||
raise NotImplementedError() | ||
else: | ||
indexes = dict(**indexes) # for type hinting: to allow mutation | ||
|
||
loadable_vars = {name: var for name, var in ds.variables.items() if name in loadable_variables} | ||
if virtual_array_class is not ManifestArray: | ||
raise NotImplementedError() | ||
|
||
# if we only read the indexes we can just close the file right away as nothing is lazy | ||
if loadable_vars == {}: | ||
ds.close() | ||
if filetype == "zarr_v3": | ||
# TODO is there a neat way of auto-detecting this? | ||
Comment on lines
+85
to
+86
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit ugly - I want to automatically distinguish between non-zarr, zarr v2 (both to be read using kerchunk) and zarr v3 (to be read using this code). I guess I will just have to search for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @norlandrhagen do you have any thoughts on a neat way to handle this? |
||
return open_virtual_dataset_from_v3_store(storepath=filepath, drop_variables=drop_variables, indexes=indexes) | ||
else: | ||
loadable_vars = {} | ||
indexes = {} | ||
# this is the only place we actually always need to use kerchunk directly | ||
# TODO avoid even reading byte ranges for variables that will be dropped later anyway? | ||
vds_refs = kerchunk.read_kerchunk_references_from_file( | ||
filepath=filepath, | ||
filetype=filetype, | ||
) | ||
virtual_vars = virtual_vars_from_kerchunk_refs( | ||
vds_refs, | ||
drop_variables=drop_variables + loadable_variables, | ||
virtual_array_class=virtual_array_class, | ||
) | ||
ds_attrs = kerchunk.fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {}) | ||
|
||
if indexes is None or len(loadable_variables) > 0: | ||
# TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables... | ||
# TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references | ||
# TODO really we probably want a dedicated xarray backend that iterates over all variables only once | ||
ds = xr.open_dataset(filepath, drop_variables=drop_variables) | ||
|
||
if indexes is None: | ||
# add default indexes by reading data from file | ||
indexes = {name: index for name, index in ds.xindexes.items()} | ||
elif indexes != {}: | ||
# TODO allow manual specification of index objects | ||
raise NotImplementedError() | ||
else: | ||
indexes = dict(**indexes) # for type hinting: to allow mutation | ||
|
||
loadable_vars = {name: var for name, var in ds.variables.items() if name in loadable_variables} | ||
|
||
# if we only read the indexes we can just close the file right away as nothing is lazy | ||
if loadable_vars == {}: | ||
ds.close() | ||
else: | ||
loadable_vars = {} | ||
indexes = {} | ||
|
||
vars = {**virtual_vars, **loadable_vars} | ||
|
||
data_vars, coords = separate_coords(vars, indexes) | ||
|
||
vds = xr.Dataset( | ||
data_vars, | ||
coords=coords, | ||
# indexes={}, # TODO should be added in a later version of xarray | ||
attrs=ds_attrs, | ||
) | ||
|
||
# TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened | ||
|
||
vars = {**virtual_vars, **loadable_vars} | ||
return vds | ||
|
||
|
||
def open_virtual_dataset_from_v3_store( | ||
storepath: str, | ||
drop_variables: List[str], | ||
indexes: Optional[Mapping[str, Index]], | ||
) -> xr.Dataset: | ||
""" | ||
Read a Zarr v3 store and return an xarray Dataset containing virtualized arrays. | ||
""" | ||
_storepath = Path(storepath) | ||
|
||
ds_attrs = attrs_from_zarr_group_json(_storepath / "zarr.json") | ||
|
||
# TODO recursive glob to create a datatree | ||
# Note: this .is_file() check should not be necessary according to the pathlib docs, but tests fail on github CI without it | ||
# see https://github.com/TomNicholas/VirtualiZarr/pull/45#discussion_r1547833166 | ||
all_paths = _storepath.glob("*/") | ||
directory_paths = [p for p in all_paths if not p.is_file()] | ||
|
||
vars = {} | ||
for array_dir in directory_paths: | ||
var_name = array_dir.name | ||
if var_name in drop_variables: | ||
break | ||
|
||
zarray, dim_names, attrs = metadata_from_zarr_json(array_dir / "zarr.json") | ||
manifest = ChunkManifest.from_zarr_json(str(array_dir / "manifest.json")) | ||
|
||
marr = ManifestArray(chunkmanifest=manifest, zarray=zarray) | ||
var = xr.Variable(data=marr, dims=dim_names, attrs=attrs) | ||
vars[var_name] = var | ||
|
||
if indexes is None: | ||
raise NotImplementedError() | ||
elif indexes != {}: | ||
# TODO allow manual specification of index objects | ||
raise NotImplementedError() | ||
else: | ||
indexes = dict(**indexes) # for type hinting: to allow mutation | ||
|
||
data_vars, coords = separate_coords(vars, indexes) | ||
|
||
vds = xr.Dataset( | ||
ds = xr.Dataset( | ||
data_vars, | ||
coords=coords, | ||
# indexes={}, # TODO should be added in a later version of xarray | ||
attrs=ds_attrs, | ||
) | ||
|
||
# TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened | ||
|
||
return vds | ||
return ds | ||
|
||
|
||
def virtual_vars_from_kerchunk_refs( | ||
|
@@ -161,9 +222,9 @@ def virtual_vars_from_kerchunk_refs( | |
|
||
def dataset_from_kerchunk_refs( | ||
refs: KerchunkStoreRefs, | ||
drop_variables: Optional[List[str]] = None, | ||
virtual_array_class=ManifestArray, | ||
indexes={}, | ||
drop_variables: List[str] = [], | ||
virtual_array_class: type = ManifestArray, | ||
indexes: Optional[MutableMapping[str, Index]] = None, | ||
) -> xr.Dataset: | ||
""" | ||
Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays. | ||
|
@@ -177,6 +238,8 @@ def dataset_from_kerchunk_refs( | |
|
||
vars = virtual_vars_from_kerchunk_refs(refs, drop_variables, virtual_array_class) | ||
|
||
if indexes is None: | ||
indexes = {} | ||
data_vars, coords = separate_coords(vars, indexes) | ||
|
||
ds_attrs = kerchunk.fully_decode_arr_refs(refs["refs"]).get(".zattrs", {}) | ||
|
@@ -261,13 +324,16 @@ def to_zarr(self, storepath: str) -> None: | |
""" | ||
Serialize all virtualized arrays in this xarray dataset as a Zarr store. | ||
|
||
Currently requires all variables to be backed by ManifestArray objects. | ||
|
||
Not very useful until some implementation of a Zarr reader can actually read these manifest.json files. | ||
See https://github.com/zarr-developers/zarr-specs/issues/287 | ||
|
||
Parameters | ||
---------- | ||
storepath : str | ||
""" | ||
raise NotImplementedError( | ||
"No point in writing out these virtual arrays to Zarr until at least one Zarr reader can actually read them." | ||
) | ||
dataset_to_zarr(self.ds, storepath) | ||
|
||
@overload | ||
def to_kerchunk(self, filepath: None, format: Literal["dict"]) -> KerchunkStoreRefs: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose to vendor this because I didn't want to import internals of the zarr-python library while it's in flux, and also this helps make it clear exactly which parts of this package even need
zarr-python
at all.