Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi erddap search added #199

Merged
merged 14 commits into from
Jul 26, 2021
48 changes: 47 additions & 1 deletion erddapy/erddapy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@

import copy
import functools
import multiprocessing
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Union
from urllib.parse import quote_plus

import pandas as pd
import pytz
from joblib import Parallel, delayed

from erddapy.netcdf_handling import _nc_dataset, _tempnc
from erddapy.servers import servers
from erddapy.url_handling import _distinct, urlopen
from erddapy.url_handling import _distinct, multi_urlopen, urlopen

try:
from pandas.core.indexes.period import parse_time_string
Expand Down Expand Up @@ -136,6 +138,22 @@ def _griddap_check_variables(user_variables: ListLike, original_variables: ListL
)


def parse_results(url: str, key: str, protocol="tabledap") -> Optional[Dict]:
"""
Parse search results from multiple servers
"""
data = multi_urlopen(url)
ocefpaf marked this conversation as resolved.
Show resolved Hide resolved
if data is None:
return None
df = pd.read_csv(data)
try:
df.dropna(subset=[protocol], inplace=True)
except KeyError:
return None
df["Server url"] = url.split("search")[0]
return {key: df[["Title", "Institution", "Dataset ID", "Server url"]]}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest splitting out the parsing of data from your data fetching function.



class ERDDAP:
"""Creates an ERDDAP instance for a specific server endpoint.

Expand Down Expand Up @@ -371,6 +389,34 @@ def get_search_url(
url = url.replace("&minTime=(ANY)", "").replace("&maxTime=(ANY)", "")
return url

def search_all_servers(self, query="glider", servers_list=None):
"""
Search all servers for a query string
Returns a dataframe of details for all matching datasets
Args:
query: string to search for
servers_list: optional list of servers. Defaults to searching all servers
"""
if servers_list:
urls = {
server: f'{server}search/index.csv?page=1&itemsPerPage=100000&searchFor="{query}"'
for server in servers_list
}
else:
urls = {
key: f'{server.url}search/index.csv?page=1&itemsPerPage=100000&searchFor="{query}"'
for key, server in servers.items()
}
num_cores = multiprocessing.cpu_count()
returns = Parallel(n_jobs=num_cores)(
ocefpaf marked this conversation as resolved.
Show resolved Hide resolved
delayed(parse_results)(url, key, protocol="tabledap")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could protocol also be a kwarg? Or at least in the doc string? Right now you have to read the code to figure out that the function only searches for tabledap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Protocol added as kwarg in 4ab1e5a

for key, url in urls.items()
)
dfs = [x for x in returns if x is not None]
df_all = pd.concat([list(df.values())[0] for df in dfs])
df_all.reset_index(drop=True, inplace=True)
return df_all

ocefpaf marked this conversation as resolved.
Show resolved Hide resolved
def get_info_url(
self,
dataset_id: OptionalStr = None,
Expand Down
17 changes: 17 additions & 0 deletions erddapy/url_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,23 @@ def urlopen(url: str, auth: Optional[tuple] = None, **kwargs: Dict) -> BinaryIO:
return data


def multi_urlopen(url: str) -> BinaryIO:
Copy link
Member

@ocefpaf ocefpaf Jul 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's "fold" this one into a the canonical urlopen by making the latter a thin wrapper to this one. That will allow us to cache the results in that one.

Let's tackle this in another PR.

"""
A more simple url open to work with joblib and multiprocessing
callumrollo marked this conversation as resolved.
Show resolved Hide resolved
"""
try:
response = requests.get(url, allow_redirects=True)
except requests.exceptions.ConnectionError:
return None
try:
response.raise_for_status()
except requests.exceptions.HTTPError:
return None
data = io.BytesIO(response.content)
data.seek(0)
return data


@functools.lru_cache(maxsize=None)
def check_url_response(url: str, **kwargs: Dict) -> str:
"""
Expand Down