Skip to content

Commit

Permalink
initial work to abstract ESGF projects and their facets
Browse files Browse the repository at this point in the history
  • Loading branch information
nocollier committed Jul 29, 2024
1 parent d3a41d7 commit ac6f504
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 57 deletions.
46 changes: 0 additions & 46 deletions intake_esgf/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,52 +313,6 @@ def get_cell_measure(var: str, ds: xr.Dataset) -> Union[xr.DataArray, None]:
return measure


def get_dataframe_columns(content: dict[str, Any]) -> list[str]:
"""Get the columns to be populated in a pandas dataframe.
We determine the columns that will be part of the search dataframe by parsing out
facets from the `dataset_id_template_` found in the query response. We look for
facets between the sentinels `%(...)s` and then assume that they will have values in
the response. CMIP5 has many inconsistencies and so we hard code it here. We also
postpend `version` and `data_node` to the facets. Any facets that do not appear in
the content will show up as `nan` in the dataframe.
Parameters
----------
content
The content (Globus) or document (Solr) returned from the query.
"""
# Required columns
req = ["version", "data_node"]

# Additional columns from the configuration
extra = intake_esgf.conf.get("additional_df_cols", [])

# Project dependent columns
if "project" in content and content["project"][0] == "CMIP5":
proj = [
"institute",
"model",
"experiment",
"time_frequency",
"realm",
"cmor_table",
"ensemble",
"variable",
]
# ...everything else (so far) behaves nicely so...
elif "dataset_id_template_" not in content:
raise ValueError(f"No `dataset_id_template_` in {content[id]}")
else:
proj = re.findall(
r"%\((\w+)\)s",
content["dataset_id_template_"][0],
)

columns = list(set(proj).union(req + extra))
return columns


def expand_cmip5_record(
search_vars: list[str], content_vars: list[str], record: dict[str, Any]
) -> list[dict[str, Any]]:
Expand Down
16 changes: 9 additions & 7 deletions intake_esgf/core/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
)
from globus_sdk.tokenstorage import SimpleJSONFileAdapter

from intake_esgf.base import (
expand_cmip5_record,
get_content_path,
get_dataframe_columns,
)
import intake_esgf
from intake_esgf.base import expand_cmip5_record, get_content_path
from intake_esgf.projects import get_project_facets

CLIENT_ID = "81a13009-8326-456e-a487-2d1557d8eb11" # intake-esgf

Expand Down Expand Up @@ -63,6 +61,10 @@ def search(self, **search: Union[str, list[str]]) -> pd.DataFrame:
query_data.add_filter(
key, val if isinstance(val, list) else [val], type="match_any"
)

facets = get_project_facets(search) + intake_esgf.conf.get(
"additional_df_cols", []
)
response_time = time.time()
sc = SearchClient()
paginator = sc.paginated.post_search(self.index_id, query_data)
Expand All @@ -77,7 +79,7 @@ def search(self, **search: Union[str, list[str]]) -> pd.DataFrame:
if isinstance(content[facet], list)
else content[facet]
)
for facet in get_dataframe_columns(content)
for facet in facets
if facet in content
}
record["project"] = content["project"][0]
Expand Down Expand Up @@ -155,7 +157,7 @@ def from_tracking_ids(self, tracking_ids: list[str]) -> pd.DataFrame:
if isinstance(content[facet], list)
else content[facet]
)
for facet in get_dataframe_columns(content)
for facet in get_project_facets(content)
if facet in content
}
record["project"] = content["project"][0]
Expand Down
12 changes: 8 additions & 4 deletions intake_esgf/core/solr.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import pandas as pd
import requests

import intake_esgf
from intake_esgf.base import (
expand_cmip5_record,
get_content_path,
get_dataframe_columns,
)
from intake_esgf.exceptions import NoSearchResults
from intake_esgf.projects import get_project_facets


def esg_search(base_url, **search):
Expand Down Expand Up @@ -44,8 +45,10 @@ def __repr__(self):

def search(self, **search: Union[str, list[str]]) -> pd.DataFrame:
search["distrib"] = search["distrib"] if "distrib" in search else self.distrib
facets = get_project_facets(search) + intake_esgf.conf.get(
"additional_df_cols", []
)
total_time = time.time()

df = []
for response in esg_search(self.url, **search):
response = response["response"]
Expand All @@ -56,7 +59,7 @@ def search(self, **search: Union[str, list[str]]) -> pd.DataFrame:
for doc in response["docs"]:
record = {
facet: doc[facet][0] if isinstance(doc[facet], list) else doc[facet]
for facet in get_dataframe_columns(doc)
for facet in facets
if facet in doc
}
record["project"] = doc["project"][0]
Expand Down Expand Up @@ -87,9 +90,10 @@ def from_tracking_ids(self, tracking_ids: list[str]) -> pd.DataFrame:
self.logger.info(f"└─{self} no results")
raise NoSearchResults
for doc in response["docs"]:
facets = get_project_facets(doc)
record = {
facet: doc[facet][0] if isinstance(doc[facet], list) else doc[facet]
for facet in get_dataframe_columns(doc)
for facet in facets
if facet in doc
}
record["project"] = doc["project"][0]
Expand Down
10 changes: 10 additions & 0 deletions intake_esgf/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,13 @@ def __init__(self, paths: list[Path]):

def __str__(self):
return f"You do not have write permission in the cache directories specified: {self.paths}"


class ProjectNotSupported(IntakeESGFException):
"""You searched for a project that we do not yet support."""

def __init__(self, project: str):
self.project = project

def __str__(self):
return f"The '{self.project}' project is not yet supported by intake-esgf"
Loading

0 comments on commit ac6f504

Please sign in to comment.