Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorporate suggestions from JOSS review #60

Merged
merged 6 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 80 additions & 27 deletions ndbc_api/ndbc_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from typing import Any, List, Sequence, Tuple, Union, Dict
from typing import Any, List, Sequence, Tuple, Union, Dict, Optional

import xarray
import pandas as pd
Expand Down Expand Up @@ -380,10 +380,13 @@ def station(self,
except (ResponseException, ValueError, KeyError) as e:
raise ResponseException('Failed to handle returned data.') from e

def available_realtime(self,
station_id: Union[str, int],
as_df: bool = False) -> Union[pd.DataFrame, dict]:
"""Get the available realtime measurements for a station.
def available_realtime(
self,
station_id: Union[str, int],
full_response: bool = False,
as_df: Optional[bool] = None,
) -> Union[List[str], pd.DataFrame, dict]:
"""Get the available realtime modalities for a station.

While most data buoy (station) measurements are available over
multi-year time ranges, some measurements depreciate or become
Expand All @@ -394,6 +397,10 @@ def available_realtime(self,
Args:
station_id: The NDBC station ID (e.g. `'tplm2'` or `41001`) for the
station of interest.
full_response: Whether to return the full response from the NDBC
API, defaults to `False` and a list of modes from `get_modes()`
is returned. If `True`, the full URL for each data mode is
included in the returned `dict` or `pandas.DataFrame`.
as_df: Whether to return station-level data as a `pandas.DataFrame`,
defaults to `False`, and a `dict` is returned.

Expand All @@ -408,9 +415,29 @@ def available_realtime(self,
"""
station_id = self._parse_station_id(station_id)
try:
data = self._stations_api.realtime(handler=self._handler,
station_id=station_id)
return self._handle_data(data, as_df, cols=None)
station_realtime = self._stations_api.realtime(
handler=self._handler, station_id=station_id)
full_data = {}
if full_response:
if as_df is None:
as_df = False
full_data = self._handle_data(station_realtime,
as_df,
cols=None)
return full_data
else:
full_data = self._handle_data(station_realtime,
as_df=False,
cols=None)

# Parse the modes from the full response
_modes = self.get_modes()
station_modes = set()
for k in full_data:
for m in _modes:
if m in full_data[k]['description']:
station_modes.add(m)
return list(station_modes)
except (ResponseException, ValueError, KeyError) as e:
raise ResponseException('Failed to handle returned data.') from e

Expand Down Expand Up @@ -456,7 +483,8 @@ def get_data(
cols: List[str] = None,
station_ids: Union[Sequence[Union[int, str]], None] = None,
modes: Union[List[str], None] = None,
use_opendap: bool = False,
as_xarray_dataset: bool = False,
use_opendap: Optional[bool] = None,
) -> Union[pd.DataFrame, xarray.Dataset, dict]:
"""Execute data query against the specified NDBC station(s).

Expand Down Expand Up @@ -487,10 +515,14 @@ def get_data(
service column headers as a timestamp, and to use this timestamp
as the index.
as_df: Whether to return station-level data as a `pandas.DataFrame`,
defaults to `False`, and a `dict` is returned.
defaults to `True`, if `False` a `dict` is returned unless
`as_xarray_dataset` is set to `True`.
as_xarray_dataset: Whether to return tbe data as an `xarray.Dataset`,
defaults to `False`.
cols: A list of columns of interest which are selected from the
available data columns, such that only the desired columns are
returned. All columns are returned if `None` is specified.
use_opendap: An alias for `as_xarray_dataset`.

Returns:
The available station(s) measurements for the specified modes, time
Expand All @@ -507,6 +539,11 @@ def get_data(
HandlerException: There was an error in handling the returned data
as a `dict` or `pandas.DataFrame`.
"""
if use_opendap is not None:
as_xarray_dataset = use_opendap

as_df = as_df and not as_xarray_dataset

self.log(logging.DEBUG,
message=f"`get_data` called with arguments: {locals()}")
if station_id is None and station_ids is None:
Expand All @@ -532,7 +569,7 @@ def get_data(
handle_modes.extend(modes)

for mode in handle_modes:
if mode not in self.get_modes(use_opendap):
if mode not in self.get_modes(use_opendap=as_xarray_dataset):
raise RequestException(f"Mode {mode} is not available.")

self.log(logging.INFO,
Expand All @@ -559,7 +596,7 @@ def get_data(
use_timestamp=use_timestamp,
as_df=as_df,
cols=cols,
use_opendap=use_opendap,
use_opendap=as_xarray_dataset,
)

for future in as_completed(station_futures.values()):
Expand All @@ -579,39 +616,50 @@ def get_data(
self.log(
level=logging.WARN,
station_id=station_id,
message=(f"Failed to process request for station_id "
f"{station_id} with error: {e}"))
message=(
f"Failed to process request for station_id "
f"{station_id} with error: {e}"))
self.log(logging.INFO, message="Finished processing request.")
return self._handle_accumulate_data(accumulated_data)


def get_modes(self, use_opendap: bool = False) -> List[str]:
def get_modes(self,
use_opendap: bool = False,
as_xarray_dataset: Optional[bool] = None) -> List[str]:
"""Get the list of supported modes for `get_data(...)`.

Args:
use_opendap (bool): Whether to return the available
modes for opendap (NetCDF) data.

modes for opendap `xarray.Dataset` data.
as_xarray_dataset (bool): An alias for `use_opendap`.

Returns:
(List[str]) the available modalities.
"""
if as_xarray_dataset is not None:
use_opendap = as_xarray_dataset

if use_opendap:
return [
v for v in vars(self._opendap_data_api) if not v.startswith('_')
]
return [v for v in vars(self._data_api) if not v.startswith('_')]

@staticmethod
def save_netcdf_dataset(dataset: xarray.Dataset, output_filepath: str):
def save_xarray_dataset(dataset: xarray.Dataset, output_filepath: str,
**kwargs) -> None:
"""
Saves a netCDF4 dataset from a temporary file to a user-specified file path.
Saves an `xarray.Dataset` to netCDF a user-specified file path.

Args:
dataset: The xarray dataset to save.
output_filepath: The path to save the dataset to.
**kwargs: Additional keyword arguments to pass to `dataset.to_netcdf`.

Returns:
None: The dataset is written to disk
"""
dataset.to_netcdf(output_filepath)
dataset.to_netcdf(output_filepath, **kwargs)

""" PRIVATE """

def _get_request_handler(
Expand Down Expand Up @@ -692,18 +740,23 @@ def _handle_data(data: pd.DataFrame,

def _handle_accumulate_data(
self,
accumulated_data: Dict[str, List[Union[pd.DataFrame, dict, xarray.Dataset]]],
accumulated_data: Dict[str, List[Union[pd.DataFrame, dict,
xarray.Dataset]]],
) -> Union[pd.DataFrame, dict]:
"""Accumulate the data from multiple stations and modes."""
for k in list(accumulated_data.keys()):
if not accumulated_data[k]:
del accumulated_data[k]

if not accumulated_data:
return {}

return_as_df = isinstance(accumulated_data[list(accumulated_data.keys())[-1]][0], pd.DataFrame)
use_opendap = isinstance(accumulated_data[list(accumulated_data.keys())[-1]][0], xarray.Dataset)

return_as_df = isinstance(
accumulated_data[list(accumulated_data.keys())[-1]][0],
pd.DataFrame)
use_opendap = isinstance(
accumulated_data[list(accumulated_data.keys())[-1]][0],
xarray.Dataset)

data: Union[List[pd.DataFrame], List[xarray.Dataset],
dict] = [] if return_as_df or use_opendap else {}
Expand Down
Loading
Loading