Skip to content

Commit

Permalink
ADD: Support for the ORNL Globus-based index
Browse files Browse the repository at this point in the history
  • Loading branch information
nocollier authored Feb 2, 2024
1 parent f425bf6 commit f311288
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 95 deletions.
2 changes: 1 addition & 1 deletion intake_esgf/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def combine_results(dfs: list[pd.DataFrame]) -> pd.DataFrame:
df = pd.concat(dfs).drop_duplicates(subset="id").reset_index(drop=True)
if len(df) == 0:
logger.info("\x1b[36;32msearch end \x1b[91;20mno results\033[0m")
raise ValueError("Search returned no results.")
raise NoSearchResults()
# now convert groups to list
for _, grp in df.groupby(list(df.columns[:-3])):
df = df.drop(grp.iloc[1:].index)
Expand Down
2 changes: 1 addition & 1 deletion intake_esgf/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(
self,
esgf1_indices: Union[bool, str, list[str]] = False,
):
self.indices = [GlobusESGFIndex()]
self.indices = [GlobusESGFIndex("anl-dev"), GlobusESGFIndex("ornl-dev")]
if isinstance(esgf1_indices, bool) and esgf1_indices:
self.indices += [
SolrESGFIndex(node, distrib=False)
Expand Down
149 changes: 56 additions & 93 deletions intake_esgf/core/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,29 @@
from typing import Any, Union

import pandas as pd
from globus_sdk import SearchClient
from globus_sdk import SearchClient, SearchQuery

from intake_esgf.base import get_dataset_pattern
from intake_esgf.exceptions import NoSearchResults, SearchError


def _form_path(content):
content["version"] = [content["dataset_id"].split("|")[0].split(".")[-1]]
file_path = content["directory_format_template_"][0]
return (
Path(
file_path.replace("%(root)s/", "")
.replace("%(", "{")
.replace(")s", "[0]}")
.format(**content)
)
/ content["title"]
)


class GlobusESGFIndex:
GLOBUS_INDEX_IDS = {
"anl-dev": "d927e2d9-ccdb-48e4-b05d-adbc3d97bbc5",
"ornl-dev": "ea4595f4-7b71-4da7-a1f0-e3f5d8f7f062",
}

def __init__(self, index_id="anl-dev"):
Expand All @@ -38,114 +52,75 @@ def search(self, **search: Union[str, list[str]]) -> pd.DataFrame:
"""
# process inputs
total_time = time.time()
if "project" in search:
project = search.pop("project")
if project != "CMIP6":
raise SearchError(f"{self} only contains project=CMIP6 data.")
search["type"] = "Dataset"
if "latest" not in search:
search["latest"] = True

# booleans need to be strings in the Globus sdk
for key, val in search.items():
if isinstance(val, bool):
search[key] = str(val)
# the ALCF index encodes booleans as strings
if "anl-dev" in self.repr:
for key, val in search.items():
if isinstance(val, bool):
search[key] = str(val)

# build up the query and search
query_data = {
"q": "",
"filters": [
{
"type": "match_any",
"field_name": key,
"values": [val] if isinstance(val, str) else val,
}
for key, val in search.items()
],
"facets": [],
"sort": [],
}
query_data = SearchQuery("")
for key, val in search.items():
query_data.add_filter(
key, val if isinstance(val, list) else [val], type="match_any"
)
response_time = time.time()
sc = SearchClient()
paginator = sc.paginated.post_search(self.index_id, query_data)
paginator.limit = 1000
response_time = time.time() - response_time
pattern = get_dataset_pattern()
df = []
for response in paginator:
if not response["total"]:
if self.logger is not None:
self.logger.info(f"└─{self} no results")
raise NoSearchResults()

# parse out the CMIP facets from the dataset_id
for g in response["gmeta"]:
m = re.search(pattern, g["subject"])
if m:
df.append(m.groupdict())
df[-1]["id"] = g["subject"]
df = pd.DataFrame(df)
response_time = time.time() - response_time

# logging
total_time = time.time() - total_time
if self.logger is not None:
self.logger.info(
f"└─{self} results={len(df)} {response_time=:.2f} {total_time=:.2f}"
)
self.logger.info(f"└─{self} results={len(df)} {response_time=:.2f}")
return df

def get_file_info(self, dataset_ids: list[str]) -> dict[str, Any]:
""""""
response_time = time.time()
response = SearchClient().post_search(
sc = SearchClient()
paginator = sc.paginated.post_search(
self.index_id,
{
"q": "",
"filters": [
{
"type": "match_any",
"field_name": "dataset_id",
"values": dataset_ids,
}
],
"facets": [],
"sort": [],
},
limit=1000,
SearchQuery("")
.add_filter("type", ["File"])
.add_filter("dataset_id", dataset_ids, type="match_any"),
)
paginator.limit = 1000
infos = []
for g in response["gmeta"]:
info = {}
assert len(g["entries"]) == 1
entry = g["entries"][0]
if entry["entry_id"] != "file":
continue
content = entry["content"]
info["dataset_id"] = content["dataset_id"]
info["checksum_type"] = content["checksum_type"][0]
info["checksum"] = content["checksum"][0]
info["size"] = content["size"]
for url in content["url"]:
link, link_type = url.split("|")
if link_type not in info:
info[link_type] = []
info[link_type].append(link)
# For some reason, the `version` in the globus response is just an integer
# and not what is used in the file path so I have to parse it out of the
# `dataset_id`
content["version"] = [content["dataset_id"].split("|")[0].split(".")[-1]]
file_path = content["directory_format_template_"][0]
info["path"] = (
Path(
file_path.replace("%(root)s/", "")
.replace("%(", "{")
.replace(")s", "[0]}")
.format(**content)
)
/ content["title"]
)
infos.append(info)
for response in paginator:
for g in response.get("gmeta"):
assert len(g["entries"]) == 1
content = g["entries"][0]["content"]
info = {
"dataset_id": content["dataset_id"],
"checksum_type": content["checksum_type"][0],
"checksum": content["checksum"][0],
"size": content["size"],
"HTTPServer": [
url.split("|")[0]
for url in content["url"]
if "HTTPServer" in url
],
}
# build the path from the template
content["version"] = [
content["dataset_id"].split("|")[0].split(".")[-1]
]
info["path"] = _form_path(content)
infos.append(info)
response_time = time.time() - response_time
if self.logger is not None:
self.logger.info(f"└─{self} results={len(infos)} {response_time=:.2f}")
Expand All @@ -156,19 +131,7 @@ def from_tracking_ids(self, tracking_ids: Union[str, list[str]]) -> pd.DataFrame
tracking_ids = [tracking_ids]
response = SearchClient().post_search(
self.index_id,
{
"q": "",
"filters": [
{
"type": "match_any",
"field_name": "tracking_id",
"values": tracking_ids,
}
],
"facets": [],
"sort": [],
},
limit=1000,
SearchQuery("").add_filter("tracking_id", tracking_ids, type="match_any"),
)
pattern = get_dataset_pattern()
df = []
Expand Down

0 comments on commit f311288

Please sign in to comment.