Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for intake-esm #1218

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions esmvalcore/_intake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Look for data using intake-esm."""
import logging
from pathlib import Path
from typing import Dict

import intake
import intake_esm

from ._config import get_project_config
from ._data_finder import select_files

logger = logging.getLogger(__name__)

_CACHE: Dict[Path, intake_esm.core.esm_datastore] = {}


def clear_catalog_cache():
"""Clear the catalog cache."""
_CACHE.clear()


def load_catalog(project, drs):
"""Load an intake-esm catalog and associated facet mapping."""
catalog_info = get_project_config(project).get('catalogs', {})
site = drs.get(project, 'default')
if site not in catalog_info:
return None, {}

catalog_url = Path(catalog_info[site]['file']).expanduser()

if catalog_url not in _CACHE:
logger.info("Loading intake-esm catalog (this may take some time): %s",
catalog_url)
_CACHE[catalog_url] = intake.open_esm_datastore(catalog_url)
logger.info("Done loading catalog")

catalog = _CACHE[catalog_url]
facets = catalog_info[site]['facets']
return catalog, facets


def find_files(variable, drs):
"""Find files for variable in intake-esm catalog."""
catalog, facets = load_catalog(variable['project'], drs)
if not catalog:
return []

query = {}
for ours, theirs in facets.items():
if ours == 'version' and 'version' not in variable:
# skip version if not specified in recipe
continue
query[theirs] = variable[ours]

selection = catalog.search(**query)

# Select latest version
if 'version' not in variable and 'version' in facets:
latest_version = selection.df[facets['version']].max()
variable['version'] = latest_version
query = {
facets['version']: latest_version,
}
selection = selection.search(**query)

filenames = list(selection.df['path'])

# Select only files within the time range
filenames = select_files(filenames, variable['start_year'],
variable['end_year'])

return filenames
5 changes: 5 additions & 0 deletions esmvalcore/_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
get_output_file,
get_statistic_output_file,
)
from ._intake import clear_catalog_cache, find_files
from ._provenance import TrackedFile, get_recipe_provenance
from ._recipe_checks import RecipeError
from ._task import DiagnosticTask, TaskSet
Expand Down Expand Up @@ -509,6 +510,9 @@ def _read_attributes(filename):

def _get_input_files(variable, config_user):
"""Get the input files for a single dataset (locally and via download)."""
input_files = find_files(variable, config_user['drs'])
if input_files:
return (input_files, [], [])
(input_files, dirnames,
filenames) = get_input_filelist(variable=variable,
rootpath=config_user['rootpath'],
Expand Down Expand Up @@ -1393,6 +1397,7 @@ def initialize_tasks(self):
task.initialize_provenance(self.entity)

# TODO: check that no loops are created (will throw RecursionError)
clear_catalog_cache()

# Return smallest possible set of tasks
return tasks.get_independent()
Expand Down
14 changes: 14 additions & 0 deletions esmvalcore/config-developer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ CMIP6:
DKRZ: '{activity}/{institute}/{dataset}/{exp}/{ensemble}/{mip}/{short_name}/{grid}/{latestversion}'
ETHZ: '{exp}/{mip}/{short_name}/{dataset}/{ensemble}/{grid}/'
input_file: '{short_name}_{mip}_{dataset}_{exp}_{ensemble}_{grid}*.nc'
catalogs:
DKRZ:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are these catalogs produced? We should get one done at JASMIN too, if I get told how to make one 😁

file: '/pool/data/Catalogs/mistral-cmip6.json'
facets:
# mapping from recipe facets to intake-esm catalog facets
activity: activity_id
dataset: source_id
ensemble: member_id
exp: experiment_id
grid: grid_label
institute: institution_id
mip: table_id
short_name: variable_id
version: version
output_file: '{project}_{dataset}_{mip}_{exp}_{ensemble}_{short_name}'
cmor_type: 'CMIP6'

Expand Down