Skip to content

Commit

Permalink
ADD: streaming options to_dataset_dict and to_path_dict (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
nocollier authored Oct 11, 2024
1 parent a9571fd commit 1e359bc
Show file tree
Hide file tree
Showing 15 changed files with 938 additions and 405 deletions.
2 changes: 1 addition & 1 deletion ci/environment-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ dependencies:
- furo
- numpydoc
- xarray
- xarray-datatree
- dask
- distributed
- netCDF4
- matplotlib
- ipywidgets
- nc-time-axis
- pip
- pip:
- myst_nb
Expand Down
2 changes: 2 additions & 0 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ or through ``conda-forge``
:caption: Features

measures
stream
modelgroups
configure
paths
reproduce
dictkeys
logging
Expand Down
1 change: 0 additions & 1 deletion doc/operators.md

This file was deleted.

56 changes: 56 additions & 0 deletions doc/paths.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
jupytext:
text_representation:
format_name: myst
kernelspec:
display_name: Python 3
name: python3
---

```{code-cell}
:tags: [remove-cell]
from intake_esgf import ESGFCatalog
from pprint import pprint
```

# Just the Paths

While the basic paradigm of `intake-esgf` is to return xarray datasets for everything in your catalog, we recognize that you may wish to just get the paths back and use them in creative ways.

1. You may not want to use xarray datasets. We highly recommend learning the package and using it in your research, but you may have alternatives and we do not want to prohibit you from working as you see fit.
2. The analysis script you are running may not have been written to leverage xarray datasets.
3. You may need just the paths to pass into another tool or benchmark package.
4. You may have specific options you want to pass to `xarray.open_dataset()` that our interface does not support.

There is a catalog method we call `to_path_dict()`. This works just like `to_dataset_dict()` except we do not call xarray dataset constructors on the paths returned for you. Both functions even have most of the same keyword arguments. If we perform a search

```{code-cell}
cat = ESGFCatalog().search(
experiment_id="historical",
source_id="CanESM5",
frequency="mon",
variable_id=["gpp", "tas", "pr"],
member_id="r1i1p1f1",
)
```

Then we can call instead the path function and then print the local paths.

```{code-cell}
paths = cat.to_path_dict()
pprint(paths)
```

Note that this will also check first to see if data is available locally and download if not just as with `to_dataset_dict()`. In fact, internally our `to_dataset_dict()` function calls `to_path_dict()` first. You can also use this to obtain the OPenDAP links if you prefer.

```{code-cell}
cat = ESGFCatalog().search(
experiment_id="historical",
source_id="CanESM5",
frequency="mon",
variable_id=["cSoil"],
member_id="r1i1p1f1",
)
paths = cat.to_path_dict(prefer_streaming=True)
pprint(paths)
```
2 changes: 1 addition & 1 deletion doc/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ look at the keys of the resulting dictionary.
print(dsd.keys())
```

By defult the keys are populated using the different facet values in the
By default the keys are populated using the different facet values in the
dictionary. However, you have a lot of [control](dictkeys) on the form that they
take. During the download process, you may have also noticed that a progress bar
informed you that we were adding cell measures. We add [cell measures](measures)
Expand Down
58 changes: 58 additions & 0 deletions doc/stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
jupytext:
text_representation:
format_name: myst
kernelspec:
display_name: Python 3
name: python3
---

```{code-cell}
:tags: [remove-cell]
import matplotlib.pyplot as plt
from intake_esgf import ESGFCatalog
```

# Streaming Data

In addition to the transfer of entire files, data may be streamed to the user as it is required by their script. The benefit is that if only a small portion of the data is to be used, we avoid downloading the whole file. At the time of this writing, ESGF indices only contain [OPeNDAP](https://www.opendap.org/) access information. However, as we consider expanding support, the below interface will extend to other streaming/cloud-ready technologies such as [Zarr](https://zarr.dev/) stores, [kerchunk](https://github.com/fsspec/kerchunk), and [VirtualiZarr](https://github.com/zarr-developers/VirtualiZarr).

To demonstrate this functionality, consider the following search for some future surface air temperature data from the UKESM model.

```{code-cell}
cat = ESGFCatalog().search(
experiment_id="ssp585",
source_id="UKESM1-0-LL",
variable_id="tas",
frequency="mon",
)
cat.remove_ensembles()
```

To harvest the OPeNDAP access link from the index nodes, you tell the package that you `prefer_streaming=True`. Not all files will have this capability, but if they do, then this will tell `intake-esgf` to use them. Also, in this example we do not need any cell measures and so we will disable that in this search.

```{code-cell}
dsd = cat.to_dataset_dict(prefer_streaming=True, add_measures=False)
```

At this point, the dataset dictionary is returned but you will notice that no file download messages were received. The OPeNDAP access link was passed to the xarray constructor. We now proceed with our analysis as if the data is local. In this example, we wish to see what future temperatures will be under the SSP585 scenario over my hometown.

```{code-cell}
ds = dsd["tas"]
ds = ds.sel(lat=35.96, lon=-83.92 + 360, method="nearest")
```

Now we can plot this trace using matlotlib. When the xarray dataset needs data, it uses the OPeNDAP protocol to stream just the time trace at the specific location.

```{code-cell}
fig, ax = plt.subplots(figsize=(10, 3))
ds["tas"].plot(ax=ax);
```

This can be a very fast alternative if the data volume is relatively low. If you want to verify that data has indeed been streamed and not accessed locally, you may print the session log and look at what was accessed.

```{code-cell}
print(cat.session_log())
```

If you look towards the bottom of that log, you will see that a https link was accessed in place of a local file. Note that if a local file is present in your local cache, we will use that file even if you have preferred to use streaming.
215 changes: 175 additions & 40 deletions intake_esgf/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import pandas as pd
import requests
import xarray as xr
from globus_sdk import TransferAPIError

import intake_esgf
from intake_esgf.core.globus import get_authorized_transfer_client
from intake_esgf.database import (
get_download_rate_dataframe,
log_download_information,
Expand All @@ -28,6 +30,179 @@
bar_format = "{desc:>20}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt} [{rate_fmt:>15s}{postfix}]"


def get_local_file(path: Path, dataroots: list[Path]) -> Path:
"""
Return the local path to a file if it exists.
Parameters
----------
path : Path
The path of the file relative to a `esgroot`.
dataroots : list[Path]
A list of roots to prepend to `path` to check for existence.
Returns
-------
Path
A local path to a file which exists.
Raises
------
FileNotFoundError
If the file does not exist at any of the dataroots.
"""
for root in dataroots:
local_file = (root / path).expanduser()
if local_file.is_file():
return local_file
raise FileNotFoundError


def get_globus_endpoints(info: dict) -> list[str]:
"""
Return the Globus endpoints found in the file info.
Parameters
----------
info : dict
A file info record as returned by the Solr/Globus responses.
Returns
-------
list[str]
A list of the Globus endpoint UUIDs where this file may be found.
Raises
------
ValueError
If the UUID cannot be parsed from the Globus 'link'.
"""
if "Globus" not in info:
return []
globus_endpoints = []
for entry in info["Globus"]:
m = re.search(r"globus:/*([a-z0-9\-]+)/(.*)", entry)
if not m:
raise ValueError(f"Globus 'link' count not be parsed: {entry}")
uuid = m.group(1)
globus_endpoints.append(uuid)
return globus_endpoints


def partition_infos(
infos: list[dict], prefer_streaming: bool, prefer_globus: bool
) -> tuple[dict, dict]:
"""
Partition the file info based on how it will be handled.
Each file may have many access options. Here we partition the file infos according
to the access method we will use. This is based on a priority as well as user
options. The priority uses: (1) local files if present, (2) streaming if possible
and requested, (3) globus transfer if possible and request, and finally (4) https
download. If no transfer is needed, then we also begin building the output
dictionary of paths/datasets.
Parameters
----------
infos : list[dict]
A list of file info as returned in the Solr/Globus response.
prefer_streaming : bool
Enable to use OPENDAP/VirtualZarr options if present in the info.
prefer_globus : bool
Enable to use Globus transfer options if present in the info.
Returns
-------
dict[list]
A dicionary of partitioned infos based on the access method.
ds
A dictionary of access paths for use in xarray.open_dataset.
"""
# this routine will eventually return a dictionary of key -> list[paths]
ds = {}

# as we iterate through the infos we will partition them
infos_exist = []
infos_stream = []
infos_globus = []
infos_https = []

# to keep from checking globus endpoints active status too much, we will store them
client = None
active_endpoints = set()

# Partition and setup all the file infos based on a priority
for i, info in enumerate(infos):
key = info["key"]

# 1) does the file already exist locally?
try:
local_path = get_local_file(
info["path"],
intake_esgf.conf["esg_dataroot"] + intake_esgf.conf["local_cache"],
)
if key not in ds:
ds[key] = []
ds[key].append(local_path)
infos_exist.append(info) # maybe not needed
continue
except FileNotFoundError:
pass

# 2) does the user prefer to stream data?
if prefer_streaming:
# how do we choose a link?
preferred_sources = ["VirtualZarr", "OPENDAP"] # move to configure
links = [
link
for src in (set(preferred_sources) & set(info))
for link in info[src]
]
if links:
# for now just use first link, we need to do better
ds[key] = [links[0]]
infos_stream.append(info) # maybe not needed
continue

# 3) does the user prefer to use globus transfer?
if prefer_globus:
source_endpoints = get_globus_endpoints(info)
# before removing these from infos of what we will download, check that
# their endpoints actually work
for uuid in source_endpoints:
if uuid in active_endpoints:
continue
client = get_authorized_transfer_client() if client is None else client
try:
ep = client.get_endpoint(uuid)
if ep["acl_available"]:
active_endpoints = active_endpoints | set([uuid])
except TransferAPIError:
pass
# if at least one endpoint is active, then we will use globus
source_endpoints = list(active_endpoints & set(source_endpoints))
if source_endpoints:
# store this information for later
infos[i]["active_endpoints"] = source_endpoints
infos_globus.append(info)
continue

# 4) the rest we need to download using https, even if no https links are
# available. We will error this condititon later.
infos_https.append(info)

# was the data properly partitioned?
assert len(infos) == (
len(infos_exist) + len(infos_stream) + len(infos_globus) + len(infos_https)
)
return {
"exist": infos_exist,
"stream": infos_stream,
"globus": infos_globus,
"https": infos_https,
}, ds


def combine_results(dfs: list[pd.DataFrame]) -> pd.DataFrame:
"""Return a combined dataframe where ids are now a list."""
# combine and remove duplicate entries
Expand Down Expand Up @@ -282,46 +457,6 @@ def add_cell_measures(ds: xr.Dataset, catalog) -> xr.Dataset:
return ds


def get_cell_measure(var: str, ds: xr.Dataset) -> xr.DataArray | None:
"""Return the dataarray of the measures required by the given var.
This routine will examine the `cell_measures` attribute of the specified `var` as
well as the `cell_measures` applying any land/sea fractions that are necesary. This
assumes that these variables are already part of the input dataset.
Parameters
----------
var
The variable whose measures we will return.
ds
The dataset from which we will find the measures.
"""
# if the dataarray has a cell_measures attribute and 'area' in it, we can
# integrate it
da = ds[var]
if "cell_measures" not in da.attrs:
return None
m = re.search(r"area:\s(\w+)\s*", da.attrs["cell_measures"])
if not m:
return None
msr = m.group(1)
if msr not in ds:
raise ValueError(f"{var} cell_measures={msr} but not in dataset")
measure = ds[msr]
# apply land/sea fractions if applicable, this is messy and there are maybe
# others we need to find
for domain, vid in zip(["land", "sea"], ["sftlf", "sftof"]):
if "cell_methods" in da.attrs and f"where {domain}" in da.attrs["cell_methods"]:
if vid not in ds:
raise ValueError(f"{var} is land but {vid} not in dataset")
# if fractions aren't [0,1], convert % to 1
if ds[vid].max() > 2.0:
ds[vid] *= 0.01
measure *= ds[vid]
return measure


def expand_cmip5_record(
search_vars: list[str], content_vars: list[str], record: dict[str, Any]
) -> list[dict[str, Any]]:
Expand Down
Loading

0 comments on commit 1e359bc

Please sign in to comment.