Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Real world use case: Virtualizarring CMIP6 data #93

Open
jbusecke opened this issue Apr 25, 2024 · 6 comments
Open

Real world use case: Virtualizarring CMIP6 data #93

jbusecke opened this issue Apr 25, 2024 · 6 comments
Labels
usage example Real world use case examples

Comments

@jbusecke
Copy link
Contributor

jbusecke commented Apr 25, 2024

Note

All of this is currently based on a dev branch which represents a merge of main into #67

Motivated to come up with a proof of concept until tomorrow for the ESGF conference I am at right now, I am trying to test Virtualizarr on real world CMIP6 data on s3 (a complex example for #61)

I am running the following:

from virtualizarr import open_virtual_dataset
import xarray as xr
files = [
    's3://esgf-world/CMIP6/CMIP/CCCma/CanESM5/historical/r10i1p1f1/Omon/uo/gn/v20190429/uo_Omon_CanESM5_historical_r10i1p1f1_gn_185001-186012.nc',
    's3://esgf-world/CMIP6/CMIP/CCCma/CanESM5/historical/r10i1p1f1/Omon/uo/gn/v20190429/uo_Omon_CanESM5_historical_r10i1p1f1_gn_187101-188012.nc',
]
vds_list = []
for f in files:
    vds = open_virtual_dataset(f, filetype='netCDF4',indexes={})
    vds_list.append(vds)
combined_vds = xr.combine_nested(vds_list, concat_dim=['time'], coords='minimal', compat='override')
combined_vds.virtualize.to_kerchunk('combined.json', format='json')

This works until here, which is really phenomenal. Thanks for the great work here.

image

But when I try to read from the reference file

import fsspec

fs = fsspec.filesystem("reference", fo=f"combined.json")
mapper = fs.get_mapper("")

combined_ds = xr.open_dataset(mapper, engine="kerchunk")

I get this error:

--------------------------------------------------------------------------- ClientError Traceback (most recent call last) File [/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:113](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py#line=112), in _error_wrapper(func, args, kwargs, retries) 112 try: --> 113 return await func(*args, **kwargs) 114 except S3_RETRYABLE_ERRORS as e:

File /srv/conda/envs/notebook/lib/python3.11/site-packages/aiobotocore/client.py:408, in AioBaseClient._make_api_call(self, operation_name, api_params)
407 error_class = self.exceptions.from_code(error_code)
--> 408 raise error_class(parsed_response, operation_name)
409 else:

ClientError: An error occurred (InvalidAccessKeyId) when calling the GetObject operation: The AWS Access Key Id you provided does not exist in our records.

The above exception was the direct cause of the following exception:

PermissionError Traceback (most recent call last)
File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/asyn.py:245, in _run_coros_in_chunks.._run_coro(coro, i)
244 try:
--> 245 return await asyncio.wait_for(coro, timeout=timeout), i
246 except Exception as e:

File /srv/conda/envs/notebook/lib/python3.11/asyncio/tasks.py:452, in wait_for(fut, timeout)
451 if timeout is None:
--> 452 return await fut
454 if timeout <= 0:

File /srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:1125, in S3FileSystem._cat_file(self, path, version_id, start, end)
1123 resp["Body"].close()
-> 1125 return await _error_wrapper(_call_and_read, retries=self.retries)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:142, in _error_wrapper(func, args, kwargs, retries)
141 err = translate_boto_error(err)
--> 142 raise err

File /srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:113, in _error_wrapper(func, args, kwargs, retries)
112 try:
--> 113 return await func(*args, **kwargs)
114 except S3_RETRYABLE_ERRORS as e:

File /srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:1112, in S3FileSystem._cat_file.._call_and_read()
1111 async def _call_and_read():
-> 1112 resp = await self._call_s3(
1113 "get_object",
1114 Bucket=bucket,
1115 Key=key,
1116 **version_id_kw(version_id or vers),
1117 **head,
1118 **self.req_kw,
1119 )
1120 try:

File /srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:362, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
361 additional_kwargs = self._get_s3_method_kwargs(method, *akwarglist, **kwargs)
--> 362 return await _error_wrapper(
363 method, kwargs=additional_kwargs, retries=self.retries
364 )

File /srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:142, in _error_wrapper(func, args, kwargs, retries)
141 err = translate_boto_error(err)
--> 142 raise err

PermissionError: The AWS Access Key Id you provided does not exist in our records.

The above exception was the direct cause of the following exception:

ReferenceNotReachable Traceback (most recent call last)
Cell In[17], line 6
3 fs = fsspec.filesystem("reference", fo=f"combined.json", anon=True)
4 mapper = fs.get_mapper("")
----> 6 combined_ds = xr.open_dataset(mapper, engine="kerchunk")

File /srv/conda/envs/notebook/lib/python3.11/site-packages/xarray/backends/api.py:573, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
561 decoders = _resolve_decoders_kwargs(
562 decode_cf,
563 open_backend_dataset_parameters=backend.open_dataset_parameters,
(...)
569 decode_coords=decode_coords,
570 )
572 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 573 backend_ds = backend.open_dataset(
574 filename_or_obj,
575 drop_variables=drop_variables,
576 **decoders,
577 **kwargs,
578 )
579 ds = _dataset_from_backend_dataset(
580 backend_ds,
581 filename_or_obj,
(...)
591 **kwargs,
592 )
593 return ds

File /srv/conda/envs/notebook/lib/python3.11/site-packages/kerchunk/xarray_backend.py:17, in KerchunkBackend.open_dataset(self, filename_or_obj, drop_variables, storage_options, open_dataset_options)
8 def open_dataset(
9 self,
10 filename_or_obj,
(...)
14 open_dataset_options=None
15 ):
---> 17 ref_ds = open_reference_dataset(
18 filename_or_obj,
19 storage_options=storage_options,
20 open_dataset_options=open_dataset_options,
21 )
22 if drop_variables is not None:
23 ref_ds = ref_ds.drop_vars(drop_variables)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/kerchunk/xarray_backend.py:51, in open_reference_dataset(filename_or_obj, storage_options, open_dataset_options)
48 if open_dataset_options is None:
49 open_dataset_options = {}
---> 51 m = fsspec.get_mapper("reference://", fo=filename_or_obj, **storage_options)
53 return xr.open_dataset(m, engine="zarr", consolidated=False, **open_dataset_options)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/mapping.py:249, in get_mapper(url, check, create, missing_exceptions, alternate_root, **kwargs)
218 """Create key-value interface for given URL and options
219
220 The URL will be of the form "protocol://location" and point to the root
(...)
246 FSMap instance, the dict-like key-value store.
247 """
248 # Removing protocol here - could defer to each open() on the backend
--> 249 fs, urlpath = url_to_fs(url, **kwargs)
250 root = alternate_root if alternate_root is not None else urlpath
251 return FSMap(root, fs, check, create, missing_exceptions=missing_exceptions)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/core.py:395, in url_to_fs(url, **kwargs)
393 inkwargs["fo"] = urls
394 urlpath, protocol, _ = chain[0]
--> 395 fs = filesystem(protocol, **inkwargs)
396 return fs, urlpath

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/registry.py:293, in filesystem(protocol, **storage_options)
286 warnings.warn(
287 "The 'arrow_hdfs' protocol has been deprecated and will be "
288 "removed in the future. Specify it as 'hdfs'.",
289 DeprecationWarning,
290 )
292 cls = get_filesystem_class(protocol)
--> 293 return cls(**storage_options)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/spec.py:80, in _Cached.call(cls, *args, **kwargs)
78 return cls._cache[token]
79 else:
---> 80 obj = super().call(*args, **kwargs)
81 # Setting _fs_token here causes some static linters to complain.
82 obj.fs_token = token

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/implementations/reference.py:713, in ReferenceFileSystem.init(self, fo, target, ref_storage_args, target_protocol, target_options, remote_protocol, remote_options, fs, template_overrides, simple_templates, max_gap, max_block, cache_size, **kwargs)
709 self.fss[protocol] = fs
710 if remote_protocol is None:
711 # get single protocol from references
712 # TODO: warning here, since this can be very expensive?
--> 713 for ref in self.references.values():
714 if callable(ref):
715 ref = ref()

File :880, in iter(self)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/mapping.py:155, in FSMap.getitem(self, key, default)
153 k = self._key_to_str(key)
154 try:
--> 155 result = self.fs.cat(k)
156 except self.missing_exceptions:
157 if default is not None:

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/implementations/reference.py:917, in ReferenceFileSystem.cat(self, path, recursive, on_error, **kwargs)
915 new_ex.cause = ex
916 if on_error == "raise":
--> 917 raise new_ex
918 elif on_error != "omit":
919 out[k] = new_ex

ReferenceNotReachable: Reference "i/0" failed to fetch target ['s3://esgf-world/CMIP6/CMIP/CCCma/CanESM5/historical/r10i1p1f1/Omon/uo/gn/v20190429/uo_Omon_CanESM5_historical_r10i1p1f1_gn_185001-186012.nc', 47078, 1440]

To me this indicates that somehow the required storage_options={'anon':True} is not properly passed.

Adding

fs = fsspec.filesystem("reference", fo=f"combined.json", remote_options={'anon':True}) 

gets around that error but the opening never works. After waiting for 10 minutes I get this trace:

--------------------------------------------------------------------------- KeyboardInterrupt Traceback (most recent call last) File :6

File /srv/conda/envs/notebook/lib/python3.11/site-packages/xarray/backends/api.py:573, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
561 decoders = _resolve_decoders_kwargs(
562 decode_cf,
563 open_backend_dataset_parameters=backend.open_dataset_parameters,
(...)
569 decode_coords=decode_coords,
570 )
572 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 573 backend_ds = backend.open_dataset(
574 filename_or_obj,
575 drop_variables=drop_variables,
576 **decoders,
577 **kwargs,
578 )
579 ds = _dataset_from_backend_dataset(
580 backend_ds,
581 filename_or_obj,
(...)
591 **kwargs,
592 )
593 return ds

File /srv/conda/envs/notebook/lib/python3.11/site-packages/kerchunk/xarray_backend.py:17, in KerchunkBackend.open_dataset(self, filename_or_obj, drop_variables, storage_options, open_dataset_options)
8 def open_dataset(
9 self,
10 filename_or_obj,
(...)
14 open_dataset_options=None
15 ):
---> 17 ref_ds = open_reference_dataset(
18 filename_or_obj,
19 storage_options=storage_options,
20 open_dataset_options=open_dataset_options,
21 )
22 if drop_variables is not None:
23 ref_ds = ref_ds.drop_vars(drop_variables)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/kerchunk/xarray_backend.py:51, in open_reference_dataset(filename_or_obj, storage_options, open_dataset_options)
48 if open_dataset_options is None:
49 open_dataset_options = {}
---> 51 m = fsspec.get_mapper("reference://", fo=filename_or_obj, **storage_options)
53 return xr.open_dataset(m, engine="zarr", consolidated=False, **open_dataset_options)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/mapping.py:249, in get_mapper(url, check, create, missing_exceptions, alternate_root, **kwargs)
218 """Create key-value interface for given URL and options
219
220 The URL will be of the form "protocol://location" and point to the root
(...)
246 FSMap instance, the dict-like key-value store.
247 """
248 # Removing protocol here - could defer to each open() on the backend
--> 249 fs, urlpath = url_to_fs(url, **kwargs)
250 root = alternate_root if alternate_root is not None else urlpath
251 return FSMap(root, fs, check, create, missing_exceptions=missing_exceptions)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/core.py:395, in url_to_fs(url, **kwargs)
393 inkwargs["fo"] = urls
394 urlpath, protocol, _ = chain[0]
--> 395 fs = filesystem(protocol, **inkwargs)
396 return fs, urlpath

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/registry.py:293, in filesystem(protocol, **storage_options)
286 warnings.warn(
287 "The 'arrow_hdfs' protocol has been deprecated and will be "
288 "removed in the future. Specify it as 'hdfs'.",
289 DeprecationWarning,
290 )
292 cls = get_filesystem_class(protocol)
--> 293 return cls(**storage_options)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/spec.py:80, in _Cached.call(cls, *args, **kwargs)
78 return cls._cache[token]
79 else:
---> 80 obj = super().call(*args, **kwargs)
81 # Setting _fs_token here causes some static linters to complain.
82 obj.fs_token = token

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/implementations/reference.py:713, in ReferenceFileSystem.init(self, fo, target, ref_storage_args, target_protocol, target_options, remote_protocol, remote_options, fs, template_overrides, simple_templates, max_gap, max_block, cache_size, **kwargs)
709 self.fss[protocol] = fs
710 if remote_protocol is None:
711 # get single protocol from references
712 # TODO: warning here, since this can be very expensive?
--> 713 for ref in self.references.values():
714 if callable(ref):
715 ref = ref()

File :880, in iter(self)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/mapping.py:155, in FSMap.getitem(self, key, default)
153 k = self._key_to_str(key)
154 try:
--> 155 result = self.fs.cat(k)
156 except self.missing_exceptions:
157 if default is not None:

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/implementations/reference.py:892, in ReferenceFileSystem.cat(self, path, recursive, on_error, **kwargs)
883 # merge and fetch consolidated ranges
884 new_paths, new_starts, new_ends = merge_offset_ranges(
885 list(urls2),
886 list(starts2),
(...)
890 max_block=self.max_block,
891 )
--> 892 bytes_out = fs.cat_ranges(new_paths, new_starts, new_ends)
894 # unbundle from merged bytes - simple approach
895 for u, s, e, p in zip(urls, starts, ends, valid_paths):

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/asyn.py:118, in sync_wrapper..wrapper(*args, **kwargs)
115 @functools.wraps(func)
116 def wrapper(*args, **kwargs):
117 self = obj or args[0]
--> 118 return sync(self.loop, func, *args, **kwargs)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/asyn.py:91, in sync(loop, func, timeout, *args, **kwargs)
88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
89 while True:
90 # this loops allows thread to get interrupted
---> 91 if event.wait(1):
92 break
93 if timeout is not None:

File /srv/conda/envs/notebook/lib/python3.11/threading.py:629, in Event.wait(self, timeout)
627 signaled = self._flag
628 if not signaled:
--> 629 signaled = self._cond.wait(timeout)
630 return signaled

File /srv/conda/envs/notebook/lib/python3.11/threading.py:331, in Condition.wait(self, timeout)
329 else:
330 if timeout > 0:
--> 331 gotit = waiter.acquire(True, timeout)
332 else:
333 gotit = waiter.acquire(False)

KeyboardInterrupt:

I might be misinterpreting this but this looks exactly like the trace of the 'pangeo-forge-rechuning-stall' issue (can't find the original issue right now).

I am def too tired to dig deeper but I am wondering a few things:

  • Once this is written as a zarr, will the need to pass storage options go away?
  • Is there a way to not use fsspec to use the reference files at the moment?

Super happy to keep working on this!

@jbusecke
Copy link
Contributor Author

Oh I got it! (this is from @mgrover1 s notebook)

ds = xr.open_dataset("reference://",
                     engine="zarr",
                     backend_kwargs={
                         "consolidated": False,
                         "storage_options": {
                             "fo": 'combined.json',
                             "remote_protocol": "s3",
                             "remote_options":{'anon':True},
                         },
                         
                     }
                    )
ds

works brilliantly!

image

Honestly no clue what is happening here, but also not that important in the long term I guess hehe.

@TomNicholas
Copy link
Member

To me this indicates that somehow the required storage_options={'anon':True} is not properly passed.

That might well be the case.

I actually forgot we hadn't merged #67 yet - it would be great to have that tested and merged.

Once this is written as a zarr, will the need to pass storage options go away?

Once it's written using the chunk manifest specification, and zarr-python implements the same ZEP, then it will be read from S3 however zarr-python implements it. Which I think will be using the rust object-store crate. I don't know anything about what options have to be passed to that.

Is there a way to not use fsspec to use the reference files at the moment?

You need fsspec to understand the reference files if they written out following the kerchunk format.

@TomNicholas
Copy link
Member

@jbusecke nice! If that works but engine='kerchunk' doesn't work then presumably there is a bug with kerchunk's xarray backend...

I would double-check that you can load this data and that the values are as you expect (watch out for subtleties with encoding)...

@jbusecke
Copy link
Contributor Author

Ok I tried this for loading:

from dask.diagnostics import ProgressBar
import xarray as xr

ds = xr.open_dataset("reference://",
                     engine="zarr",
                     chunks={},
                     backend_kwargs={
                         "consolidated": False,
                         "storage_options": {
                             "fo": 'combined.json',
                             "remote_protocol": "s3",
                             "remote_options":{'anon':True},
                         },
                         
                     }
                    )
with ProgressBar():
    da_plot = ds.uo.mean(['time', 'lev']).load()
da_plot.plot()

image

That seems totally fine to me on the loading side.

Do you have recommendations how to check the encoding in a comprehensive manner?

@TomNicholas
Copy link
Member

That seems totally fine to me on the loading side.

Great!

Do you have recommendations how to check the encoding in a comprehensive manner?

Well the source of truth here would be to open the files directly using xarray without using kerchunk, i.e. open_mfdataset on the raw netCDFs.

@norlandrhagen
Copy link
Collaborator

Very cool to see a real world example @jbusecke!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
usage example Real world use case examples
Projects
None yet
Development

No branches or pull requests

3 participants