diff --git a/python/nwm_client_new/README.md b/python/nwm_client_new/README.md index f9b06c8a..3070f15c 100644 --- a/python/nwm_client_new/README.md +++ b/python/nwm_client_new/README.md @@ -27,7 +27,7 @@ The following example demonstrates how one might use `hydrotools.nwm_client_new` ### Code ```python # Import the nwm Client -from hydrotools.nwm_client_new.NWMClient import NWMFileClient +from hydrotools.nwm_client_new.NWMFileClient import NWMFileClient import pandas as pd # Instantiate model data client diff --git a/python/nwm_client_new/src/hydrotools/nwm_client_new/GCPFileCatalog.py b/python/nwm_client_new/src/hydrotools/nwm_client_new/GCPFileCatalog.py new file mode 100644 index 00000000..232152c2 --- /dev/null +++ b/python/nwm_client_new/src/hydrotools/nwm_client_new/GCPFileCatalog.py @@ -0,0 +1,91 @@ +""" +====================================== +NWM Google Cloud Platform File Catalog +====================================== +Concrete implementation of a National Water Model file client for discovering +files on Google Cloud Platform (GCP). + +GCP -- https://console.cloud.google.com/marketplace/details/noaa-public/national-water-model + +Classes +------- +GCPFileCatalog +""" +from .NWMFileCatalog import NWMFileCatalog +from google.cloud import storage +from typing import List + +class GCPFileCatalog(NWMFileCatalog): + """A Google Cloud client class for NWM data. + This GCPFileCatalog class provides various methods for discovering NWM + files on Google Cloud Platform. + """ + + def __init__( + self, + bucket_name: str = 'national-water-model' + ) -> None: + """Initialize catalog of NWM data source on Google Cloud Platform. + + Parameters + ---------- + bucket_name : str, required, default 'national-water-model' + Name of Google Cloud Bucket + + Returns + ------- + None + """ + super().__init__() + self.bucket_name = bucket_name + + def list_blobs( + self, + configuration: str, + reference_time: str, + must_contain: str = 'channel_rt' + ) -> List[str]: + """List available blobs with provided parameters. + + Parameters + ---------- + configuration : str, required + Particular model simulation or forecast configuration. For a list + of available configurations see NWMDataService.configurations + reference_time : str, required + Model simulation or forecast issuance/reference time in + YYYYmmddTHHZ format. + must_contain : str, optional, default 'channel_rt' + Optional substring found in each blob name. + + Returns + ------- + A list of blob names that satisfy the criteria set by the parameters. + """ + # Validate configuration + self.raise_invalid_configuration(configuration) + + # Break-up reference time + issue_date, issue_time = self.separate_datetime(reference_time) + + # Connect to bucket with anonymous client + client = storage.Client.create_anonymous_client() + bucket = client.bucket(self.bucket_name) + + # Get list of blobs + blobs = client.list_blobs( + bucket, + prefix=f'nwm.{issue_date}/{configuration}/nwm.t{issue_time}' + ) + + # Return blob names + return [b.public_url for b in list(blobs) if must_contain in b.name] + + @property + def bucket_name(self) -> str: + return self._bucket_name + + @bucket_name.setter + def bucket_name(self, bucket_name: str) -> None: + self._bucket_name = bucket_name + \ No newline at end of file diff --git a/python/nwm_client_new/src/hydrotools/nwm_client_new/HTTPFileCatalog.py b/python/nwm_client_new/src/hydrotools/nwm_client_new/HTTPFileCatalog.py new file mode 100644 index 00000000..2a64f2dc --- /dev/null +++ b/python/nwm_client_new/src/hydrotools/nwm_client_new/HTTPFileCatalog.py @@ -0,0 +1,151 @@ +""" +===================== +NWM HTTP File Catalog +===================== +Concrete implementation of a National Water Model file client for discovering +files on generic HTTP servers, for example: + +NOMADS -- https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwm/prod/ + +Classes +------- +HTTPFileCatalog +""" +from .NWMFileCatalog import NWMFileCatalog +import asyncio +import aiohttp +import ssl +from bs4 import BeautifulSoup +from typing import List + +class HTTPFileCatalog(NWMFileCatalog): + """An HTTP client class for NWM data. + This HTTPFileCatalog class provides various methods for discovering NWM + files on generic web servers. + """ + + def __init__( + self, + server: str, + ssl_context: ssl.SSLContext = ssl.create_default_context() + ) -> None: + """Initialize HTTP File Catalog of NWM data source. + + Parameters + ---------- + server : str, required + Fully qualified path to web server endpoint. Example: + "https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwm/prod/" + ssl_context : ssl.SSLContext, optional, default context + SSL configuration context. + + Returns + ------- + None + """ + super().__init__() + # Server path + self.server = server + + # Setup SSL context + self.ssl_context = ssl_context + + @staticmethod + async def get_html( + url: str, + ssl_context: ssl.SSLContext = ssl.create_default_context() + ) -> str: + """Retrieve an HTML document. + + Parameters + ---------- + url : str, required + Path to HTML document + ssl_context : ssl.SSLContext, optional, default context + SSL configuration context. + + Returns + ------- + HTML document retrieved from url. + """ + async with aiohttp.ClientSession() as session: + async with session.get(url, ssl=ssl_context) as response: + # Get html content + html_doc = await response.text() + + # Raise for no results found + if response.status >= 400: + raise FileNotFoundError(html_doc) + + # Otherwise return response content + return html_doc + + def list_blobs( + self, + configuration: str, + reference_time: str, + must_contain: str = 'channel_rt' + ) -> List[str]: + """List available blobs with provided parameters. + + Parameters + ---------- + configuration : str, required + Particular model simulation or forecast configuration. For a list + of available configurations see NWMDataService.configurations + reference_time : str, required + Model simulation or forecast issuance/reference time in + %Y%m%dT%HZ format. + must_contain : str, optional, default 'channel_rt' + Optional substring that must be found in each blob name. + + Returns + ------- + A list of blob names that satisfy the criteria set by the parameters. + """ + # Validate configuration + self.raise_invalid_configuration(configuration) + + # Break-up reference time + issue_date, issue_time = NWMFileCatalog.separate_datetime(reference_time) + + # Set prefix + prefix = f"nwm.{issue_date}/{configuration}/" + + # Generate url + directory = self.server + prefix + + # Get directory listing + html_doc = asyncio.run(self.get_html(directory, self.ssl_context)) + + # Parse content + soup = BeautifulSoup(html_doc, 'html.parser') + + # Get links + elements = soup.select("a[href]") + + # Generate list + blob_list = [] + for e in elements: + filename = e.get("href") + if filename.startswith(f"nwm.t{issue_time}"): + full_path = directory + filename + blob_list.append(full_path) + + return [b for b in blob_list if must_contain in b] + + @property + def server(self) -> str: + return self._server + + @server.setter + def server(self, server: str) -> None: + self._server = server + + @property + def ssl_context(self) -> ssl.SSLContext: + return self._ssl_context + + @ssl_context.setter + def ssl_context(self, ssl_context: ssl.SSLContext) -> None: + self._ssl_context = ssl_context \ No newline at end of file diff --git a/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMClient.py b/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMClient.py index 4176e792..620fa78d 100644 --- a/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMClient.py +++ b/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMClient.py @@ -7,25 +7,13 @@ Classes ------- NWMClient -NWMFileClient """ from abc import ABC, abstractmethod import pandas as pd import dask.dataframe as dd from typing import List, Union -from pathlib import Path -from dataclasses import dataclass -from tempfile import TemporaryDirectory -import ssl -import shutil -from urllib.parse import unquote -import warnings - -from .ParquetCache import ParquetCache -from .FileDownloader import FileDownloader -from .NWMFileProcessor import NWMFileProcessor -from .NWMFileCatalog import NWMFileCatalog, GCPFileCatalog +from .NWMClientDefaults import _NWMClientDefault class CacheNotFoundError(Exception): """Exception raised for methods that require a cache.""" @@ -36,70 +24,6 @@ class QueryError(Exception): times does not return any results.""" pass -@dataclass -class NWMClientDefaults: - """Stores application default options. - - CACHE: Configured ParquetCache instance. - CATALOG: Concrete NWM data source instance. - CANONICAL_COLUMN_MAPPING: Mapping from NWM output variable names to - hydrotools canonical names. - SSL_CONTEXT: ssl context instance. - ROUTELINK_URL: URL string path that points at an HDF5 file containing a - pandas.DataFrame with NWM crosswalk data. - CROSSWALK: A property that generates a pandas.DataFrame that maps between - point feature data source identifiers (i.e. USGS gage id -> NWM feature - ID). - DOWNLOAD_DIRECTORY: Local path to save downloaded NWM files. - """ - CACHE: ParquetCache = ParquetCache( - "nwm_cache.parquet", - write_index=False, - compression="snappy" - ) - CATALOG: NWMFileCatalog = GCPFileCatalog() - CANONICAL_COLUMN_MAPPING: pd.Series = pd.Series({ - "feature_id": "nwm_feature_id", - "time": "value_time", - "streamflow": "value" - }) - SSL_CONTEXT: ssl.SSLContext = ssl.create_default_context() - ROUTELINK_URL: str = "https://www.hydroshare.org/resource/d154f19f762c4ee9b74be55f504325d3/data/contents/RouteLink.h5" - - def _download_and_read_routelink_file(self) -> dd.DataFrame: - """Retrieve NWM RouteLink data from URL and return a - dask.dataframe.DataFrame. - - Returns - ------- - df: dask.dataframe.DataFrame - DataFrame containing associated location metadata. - """ - with TemporaryDirectory() as td: - # Setup downloader - downloader = FileDownloader( - output_directory=td, - create_directory=False, - ssl_context=self.SSL_CONTEXT - ) - - # Download files - downloader.get([(self.ROUTELINK_URL, "RouteLink.h5")]) - return dd.from_pandas(pd.read_hdf(Path(td)/"RouteLink.h5"), - npartitions=1) - - @property - def CROSSWALK(self) -> pd.DataFrame: - """Retrieve and cache a default crosswalk for use by a NWM client.""" - return self.CACHE.get( - function=self._download_and_read_routelink_file, - subdirectory="CROSSWALK" - ).compute()[["nwm_feature_id", "usgs_site_code"]].set_index( - "nwm_feature_id") - -# Initialize defaults -_NWMClientDefault = NWMClientDefaults() - class NWMClient(ABC): @abstractmethod @@ -153,7 +77,7 @@ def canonicalize_dask_dataframe( variable_name: str, optional Variable name used for "variable_name" column values measurement_unit: str, optional - Measurement unit used for "measurment_unit" column values + Measurement unit used for "measurement_unit" column values location_metadata_mapping : pandas.DataFrame with nwm_feature_id Index and columns of corresponding site metadata. Defaults to 7500+ usgs_site_code used by the NWM for data assimilation @@ -180,269 +104,3 @@ def canonicalize_dask_dataframe( df[col] = df['nwm_feature_id'].map(location_metadata_mapping[col]).astype("category") return df - -class NWMFileClient(NWMClient): - def __init__( - self, - file_directory: Union[str, Path] = "NWMFileClient_NetCDF_files", - dataframe_cache: Union[ParquetCache, None] = _NWMClientDefault.CACHE, - catalog: NWMFileCatalog = _NWMClientDefault.CATALOG, - location_metadata_mapping: pd.DataFrame = _NWMClientDefault.CROSSWALK, - ssl_context: ssl.SSLContext = _NWMClientDefault.SSL_CONTEXT, - cleanup_files: bool = True - ) -> None: - """Client class for retrieving data as dataframes from a remote - file-based source of National Water Model data. - - Parameters - ---------- - file_directory: str or pathlib.Path, optional, default None - Directory to save downloaded NetCDF files. If None, will use - temporary files. - dataframe_cache: ParquetCache, default ParquetCache("nwm_cache.parquet") - Local parquet directory used to locally cache retrieved dataframes. - catalog: NWMFileCatalog, optional, default GCPFileCatalog() - NWMFileCatalog object used to discover NWM files. - location_metadata_mapping: pandas.DataFrame with nwm_feature_id Index and - columns of corresponding site metadata. Defaults to 7500+ usgs_site_code - used by the NWM for data assimilation. - ssl_context: ssl.SSLContext, optional, default context - SSL configuration context. - cleanup_files: bool, default True - Delete downloaded NetCDF files upon program exit. - - Returns - ------- - NWMClient object - """ - super().__init__() - - # Set file output directory - self.file_directory = file_directory - - # Set dataframe cache - if dataframe_cache: - self.dataframe_cache = dataframe_cache - else: - self.dataframe_cache = None - - # Set file catalog - self.catalog = catalog - - # Set crosswalk - self.crosswalk = location_metadata_mapping - - # Set CA bundle - self.ssl_context = ssl_context - - # Set cleanup flag - self.cleanup_files = cleanup_files - - def get_cycle( - self, - configuration: str, - reference_time: str, - netcdf_dir: Union[str, Path] - ) -> dd.DataFrame: - """Retrieve a single National Water Model cycle as a - dask.dataframe.DataFrame. - - Parameters - ---------- - configuration: str, required - NWM configuration cycle. - reference_time: str, required - Reference time string in %Y%m%dT%HZ format. - e.g. '20210912T01Z' - netcdf_dir: str or pathlib.Path, required - Directory to save downloaded NetCDF files. - - Returns - ------- - dask.dataframe.DataFrame of NWM data - """ - # Generate list of urls - urls = self.catalog.list_blobs( - configuration=configuration, - reference_time=reference_time - ) - - # Check urls - if len(urls) == 0: - message = (f"No data found for configuration '{configuration}' and " + - f"reference time '{reference_time}'") - raise QueryError(message) - - # Generate local filenames - filenames = [unquote(url).split("/")[-1] for url in urls] - - # Setup downloader - downloader = FileDownloader( - output_directory=netcdf_dir, - create_directory=True, - ssl_context=self.ssl_context - ) - - # Download files - downloader.get(zip(urls,filenames)) - - # Get dataset - ds = NWMFileProcessor.get_dataset( - input_directory=netcdf_dir, - feature_id_filter=self.crosswalk.index - ) - - # Convert to dataframe - df = NWMFileProcessor.convert_to_dask_dataframe(ds) - - # Canonicalize - return NWMClient.canonicalize_dask_dataframe( - df=df, - configuration=configuration - ) - - def get( - self, - configuration: str, - reference_times: List[str] = None, - compute: bool = True - ) -> Union[pd.DataFrame, dd.DataFrame]: - """Retrieve National Water Model data as a DataFrame. - - Parameters - ---------- - configuration: str, required - NWM configuration cycle. - reference_times: List[str], optional, default None - List of reference time strings in %Y%m%dT%HZ format. - e.g. ['20210912T01Z',] - compute: bool, optional, default True - Return a pandas.DataFrame instead of a dask.dataframe.DataFrame. - - Returns - ------- - dask.dataframe.DataFrame of NWM data or a pandas.DataFrame in canonical - format. - """ - # Check for cache - if self.dataframe_cache == None: - raise CacheNotFoundError("get requires a cache. Set a cache or use get_cycle.") - - # List of individual parquet files - parquet_files = [] - - # Cache data - for reference_time in reference_times: - # Set subdirectory - subdirectory = f"{configuration}/RT{reference_time}" - - # Set download directory - netcdf_dir = self.file_directory / subdirectory - - # Get dask dataframe - try: - df = self.dataframe_cache.get( - function=self.get_cycle, - subdirectory=subdirectory, - configuration=configuration, - reference_time=reference_time, - netcdf_dir=netcdf_dir - ) - except QueryError: - message = (f"No data found for configuration '{configuration}' and " + - f"reference time '{reference_time}'") - warnings.warn(message, RuntimeWarning) - continue - - # Note file created - parquet_files.append(self.dataframe_cache.directory/subdirectory) - - # Check file list - if len(parquet_files) == 0: - message = (f"Unable to retrieve any data.") - raise QueryError(message) - - # Clean-up NetCDF files - if self.cleanup_files: - shutil.rmtree(self.file_directory) - - # Limit to canonical columns - # NOTE I could not keep dask from adding a "dir0" column using either - # fastparquet or pyarrow as backends - # A future version of dask or the backends may fix this - columns = [ - 'nwm_feature_id', - 'reference_time', - 'value_time', - 'value', - 'configuration', - 'variable_name', - 'measurement_unit', - 'usgs_site_code' - ] - - # Return all reference times - df = dd.read_parquet(parquet_files, columns=columns) - - # Return pandas dataframe - if compute: - # Compute - df = df.compute() - - # Downcast - df["value"] = pd.to_numeric(df["value"], downcast="float") - df["nwm_feature_id"] = pd.to_numeric(df["nwm_feature_id"], downcast="integer") - return df - return df - - @property - def file_directory(self) -> Path: - return self._file_directory - - @file_directory.setter - def file_directory(self, file_directory: Union[str, Path]) -> None: - self._file_directory = Path(file_directory).expanduser().resolve() - self._file_directory.mkdir(exist_ok=True, parents=True) - - @property - def dataframe_cache(self) -> ParquetCache: - return self._dataframe_cache - - @dataframe_cache.setter - def dataframe_cache(self, - dataframe_cache: Union[ParquetCache, None]) -> None: - self._dataframe_cache = dataframe_cache - - @property - def catalog(self) -> NWMFileCatalog: - return self._catalog - - @catalog.setter - def catalog(self, - catalog: NWMFileCatalog) -> None: - self._catalog = catalog - - @property - def crosswalk(self) -> pd.DataFrame: - return self._crosswalk - - @crosswalk.setter - def crosswalk(self, - crosswalk: pd.DataFrame) -> None: - self._crosswalk = crosswalk - - @property - def ssl_context(self) -> ssl.SSLContext: - return self._ssl_context - - @ssl_context.setter - def ssl_context(self, ssl_context: ssl.SSLContext) -> None: - self._ssl_context = ssl_context - - @property - def cleanup_files(self) -> bool: - return self._cleanup_files - - @cleanup_files.setter - def cleanup_files(self, cleanup_files: bool) -> None: - self._cleanup_files = cleanup_files diff --git a/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMClientDefaults.py b/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMClientDefaults.py new file mode 100644 index 00000000..dbd44acd --- /dev/null +++ b/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMClientDefaults.py @@ -0,0 +1,84 @@ +""" +=================== +NWM Client Defaults +=================== +Manages default options to build National Water Model clients. + +Classes +------- +NWMClientDefaults +""" +from dataclasses import dataclass +import pandas as pd +from .ParquetCache import ParquetCache +from .NWMFileCatalog import NWMFileCatalog +from .GCPFileCatalog import GCPFileCatalog +import ssl +import dask.dataframe as dd +from tempfile import TemporaryDirectory +from .FileDownloader import FileDownloader +from pathlib import Path + +@dataclass +class NWMClientDefaults: + """Stores application default options. + + CACHE: Configured ParquetCache instance. + CATALOG: Concrete NWM data source instance. + CANONICAL_COLUMN_MAPPING: Mapping from NWM output variable names to + hydrotools canonical names. + SSL_CONTEXT: ssl context instance. + ROUTELINK_URL: URL string path that points at an HDF5 file containing a + pandas.DataFrame with NWM crosswalk data. + CROSSWALK: A property that generates a pandas.DataFrame that maps between + point feature data source identifiers (i.e. USGS gage id -> NWM feature + ID). + DOWNLOAD_DIRECTORY: Local path to save downloaded NWM files. + """ + CACHE: ParquetCache = ParquetCache( + "nwm_cache.parquet", + write_index=False, + compression="snappy" + ) + CATALOG: NWMFileCatalog = GCPFileCatalog() + CANONICAL_COLUMN_MAPPING: pd.Series = pd.Series({ + "feature_id": "nwm_feature_id", + "time": "value_time", + "streamflow": "value" + }) + SSL_CONTEXT: ssl.SSLContext = ssl.create_default_context() + ROUTELINK_URL: str = "https://www.hydroshare.org/resource/d154f19f762c4ee9b74be55f504325d3/data/contents/RouteLink.h5" + + def _download_and_read_routelink_file(self) -> dd.DataFrame: + """Retrieve NWM RouteLink data from URL and return a + dask.dataframe.DataFrame. + + Returns + ------- + df: dask.dataframe.DataFrame + DataFrame containing associated location metadata. + """ + with TemporaryDirectory() as td: + # Setup downloader + downloader = FileDownloader( + output_directory=td, + create_directory=False, + ssl_context=self.SSL_CONTEXT + ) + + # Download files + downloader.get([(self.ROUTELINK_URL, "RouteLink.h5")]) + return dd.from_pandas(pd.read_hdf(Path(td)/"RouteLink.h5"), + npartitions=1) + + @property + def CROSSWALK(self) -> pd.DataFrame: + """Retrieve and cache a default crosswalk for use by a NWM client.""" + return self.CACHE.get( + function=self._download_and_read_routelink_file, + subdirectory="CROSSWALK" + ).compute()[["nwm_feature_id", "usgs_site_code"]].set_index( + "nwm_feature_id") + +# Initialize defaults +_NWMClientDefault = NWMClientDefaults() diff --git a/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMFileCatalog.py b/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMFileCatalog.py index ab4589a9..a99f1d10 100644 --- a/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMFileCatalog.py +++ b/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMFileCatalog.py @@ -2,27 +2,14 @@ ================ NWM File Catalog ================ -Tools for discovering operational NWM NetCDF data on generic web servers, for -example: -NOMADS -- https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwm/prod/ - -and Google Cloud Platform (GCP). - -GCP -- https://console.cloud.google.com/marketplace/details/noaa-public/national-water-model +Tools for discovering operational NWM NetCDF data from file-based sources. Classes ------- NWMFileCatalog -HTTPFileCatalog -GCPFileCatalog """ from abc import ABC, abstractmethod from typing import List, Tuple -import asyncio -import aiohttp -import ssl -from bs4 import BeautifulSoup -from google.cloud import storage class NWMFileCatalog(ABC): """Abstract base class for sources of NWM file data.""" @@ -122,210 +109,3 @@ def configurations(self) -> List[str]: 'short_range_hawaii_no_da', 'short_range_puertorico_no_da' ] - -class HTTPFileCatalog(NWMFileCatalog): - """An HTTP client class for NWM data. - This HTTPFileCatalog class provides various methods for discovering NWM - files on generic web servers. - """ - - def __init__( - self, - server: str, - ssl_context: ssl.SSLContext = ssl.create_default_context() - ) -> None: - """Initialize HTTP File Catalog of NWM data source. - - Parameters - ---------- - server : str, required - Fully qualified path to web server endpoint. Example: - "https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwm/prod/" - ssl_context : ssl.SSLContext, optional, default context - SSL configuration context. - - Returns - ------- - None - """ - super().__init__() - # Server path - self.server = server - - # Setup SSL context - self.ssl_context = ssl_context - - @staticmethod - async def get_html( - url: str, - ssl_context: ssl.SSLContext = ssl.create_default_context() - ) -> str: - """Retrieve an HTML document. - - Parameters - ---------- - url : str, required - Path to HTML document - ssl_context : ssl.SSLContext, optional, default context - SSL configuration context. - - Returns - ------- - HTML document retrieved from url. - """ - async with aiohttp.ClientSession() as session: - async with session.get(url, ssl=ssl_context) as response: - # Get html content - html_doc = await response.text() - - # Raise for no results found - if response.status >= 400: - raise FileNotFoundError(html_doc) - - # Otherwise return response content - return html_doc - - def list_blobs( - self, - configuration: str, - reference_time: str, - must_contain: str = 'channel_rt' - ) -> List[str]: - """List available blobs with provided parameters. - - Parameters - ---------- - configuration : str, required - Particular model simulation or forecast configuration. For a list - of available configurations see NWMDataService.configurations - reference_time : str, required - Model simulation or forecast issuance/reference time in - %Y%m%dT%HZ format. - must_contain : str, optional, default 'channel_rt' - Optional substring that must be found in each blob name. - - Returns - ------- - A list of blob names that satisfy the criteria set by the parameters. - """ - # Validate configuration - self.raise_invalid_configuration(configuration) - - # Break-up reference time - issue_date, issue_time = NWMFileCatalog.separate_datetime(reference_time) - - # Set prefix - prefix = f"nwm.{issue_date}/{configuration}/" - - # Generate url - directory = self.server + prefix - - # Get directory listing - html_doc = asyncio.run(self.get_html(directory, self.ssl_context)) - - # Parse content - soup = BeautifulSoup(html_doc, 'html.parser') - - # Get links - elements = soup.select("a[href]") - - # Generate list - blob_list = [] - for e in elements: - filename = e.get("href") - if filename.startswith(f"nwm.t{issue_time}"): - full_path = directory + filename - blob_list.append(full_path) - - return [b for b in blob_list if must_contain in b] - - @property - def server(self) -> str: - return self._server - - @server.setter - def server(self, server: str) -> None: - self._server = server - - @property - def ssl_context(self) -> ssl.SSLContext: - return self._ssl_context - - @ssl_context.setter - def ssl_context(self, ssl_context: ssl.SSLContext) -> None: - self._ssl_context = ssl_context - -class GCPFileCatalog(NWMFileCatalog): - """A Google Cloud client class for NWM data. - This GCPFileCatalog class provides various methods for discovering NWM - files on Google Cloud Platform. - """ - - def __init__( - self, - bucket_name: str = 'national-water-model' - ) -> None: - """Initialize catalog of NWM data source on Google Cloud Platform. - - Parameters - ---------- - bucket_name : str, required, default 'national-water-model' - Name of Google Cloud Bucket - - Returns - ------- - None - """ - super().__init__() - self.bucket_name = bucket_name - - def list_blobs( - self, - configuration: str, - reference_time: str, - must_contain: str = 'channel_rt' - ) -> List[str]: - """List available blobs with provided parameters. - - Parameters - ---------- - configuration : str, required - Particular model simulation or forecast configuration. For a list - of available configurations see NWMDataService.configurations - reference_time : str, required - Model simulation or forecast issuance/reference time in - YYYYmmddTHHZ format. - must_contain : str, optional, default 'channel_rt' - Optional substring found in each blob name. - - Returns - ------- - A list of blob names that satisfy the criteria set by the parameters. - """ - # Validate configuration - self.raise_invalid_configuration(configuration) - - # Break-up reference time - issue_date, issue_time = self.separate_datetime(reference_time) - - # Connect to bucket with anonymous client - client = storage.Client.create_anonymous_client() - bucket = client.bucket(self.bucket_name) - - # Get list of blobs - blobs = client.list_blobs( - bucket, - prefix=f'nwm.{issue_date}/{configuration}/nwm.t{issue_time}' - ) - - # Return blob names - return [b.public_url for b in list(blobs) if must_contain in b.name] - - @property - def bucket_name(self) -> str: - return self._bucket_name - - @bucket_name.setter - def bucket_name(self, bucket_name: str) -> None: - self._bucket_name = bucket_name - \ No newline at end of file diff --git a/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMFileClient.py b/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMFileClient.py new file mode 100644 index 00000000..13d4df64 --- /dev/null +++ b/python/nwm_client_new/src/hydrotools/nwm_client_new/NWMFileClient.py @@ -0,0 +1,291 @@ +""" +===================== +NWM File Client Tools +===================== +Client tools for retrieving National Water Model data from file-based sources + +Classes +------- +NWMFileClient +""" + +from .NWMClient import NWMClient, QueryError, CacheNotFoundError +from typing import Union, List +from pathlib import Path +from .NWMClientDefaults import _NWMClientDefault +from .ParquetCache import ParquetCache +from .NWMFileCatalog import NWMFileCatalog +import pandas as pd +import ssl +import dask.dataframe as dd +from urllib.parse import unquote +from .FileDownloader import FileDownloader +from .NWMFileProcessor import NWMFileProcessor +import warnings +import shutil + +class NWMFileClient(NWMClient): + def __init__( + self, + file_directory: Union[str, Path] = "NWMFileClient_NetCDF_files", + dataframe_cache: Union[ParquetCache, None] = _NWMClientDefault.CACHE, + catalog: NWMFileCatalog = _NWMClientDefault.CATALOG, + location_metadata_mapping: pd.DataFrame = _NWMClientDefault.CROSSWALK, + ssl_context: ssl.SSLContext = _NWMClientDefault.SSL_CONTEXT, + cleanup_files: bool = True + ) -> None: + """Client class for retrieving data as dataframes from a remote + file-based source of National Water Model data. + + Parameters + ---------- + file_directory: str or pathlib.Path, optional, default None + Directory to save downloaded NetCDF files. If None, will use + temporary files. + dataframe_cache: ParquetCache, default ParquetCache("nwm_cache.parquet") + Local parquet directory used to locally cache retrieved dataframes. + catalog: NWMFileCatalog, optional, default GCPFileCatalog() + NWMFileCatalog object used to discover NWM files. + location_metadata_mapping: pandas.DataFrame with nwm_feature_id Index and + columns of corresponding site metadata. Defaults to 7500+ usgs_site_code + used by the NWM for data assimilation. + ssl_context: ssl.SSLContext, optional, default context + SSL configuration context. + cleanup_files: bool, default True + Delete downloaded NetCDF files upon program exit. + + Returns + ------- + NWMClient object + """ + super().__init__() + + # Set file output directory + self.file_directory = file_directory + + # Set dataframe cache + if dataframe_cache: + self.dataframe_cache = dataframe_cache + else: + self.dataframe_cache = None + + # Set file catalog + self.catalog = catalog + + # Set crosswalk + self.crosswalk = location_metadata_mapping + + # Set CA bundle + self.ssl_context = ssl_context + + # Set cleanup flag + self.cleanup_files = cleanup_files + + def get_cycle( + self, + configuration: str, + reference_time: str, + netcdf_dir: Union[str, Path] + ) -> dd.DataFrame: + """Retrieve a single National Water Model cycle as a + dask.dataframe.DataFrame. + + Parameters + ---------- + configuration: str, required + NWM configuration cycle. + reference_time: str, required + Reference time string in %Y%m%dT%HZ format. + e.g. '20210912T01Z' + netcdf_dir: str or pathlib.Path, required + Directory to save downloaded NetCDF files. + + Returns + ------- + dask.dataframe.DataFrame of NWM data + """ + # Generate list of urls + urls = self.catalog.list_blobs( + configuration=configuration, + reference_time=reference_time + ) + + # Check urls + if len(urls) == 0: + message = (f"No data found for configuration '{configuration}' and " + + f"reference time '{reference_time}'") + raise QueryError(message) + + # Generate local filenames + filenames = [unquote(url).split("/")[-1] for url in urls] + + # Setup downloader + downloader = FileDownloader( + output_directory=netcdf_dir, + create_directory=True, + ssl_context=self.ssl_context + ) + + # Download files + downloader.get(zip(urls,filenames)) + + # Get dataset + ds = NWMFileProcessor.get_dataset( + input_directory=netcdf_dir, + feature_id_filter=self.crosswalk.index + ) + + # Convert to dataframe + df = NWMFileProcessor.convert_to_dask_dataframe(ds) + + # Canonicalize + return NWMClient.canonicalize_dask_dataframe( + df=df, + configuration=configuration + ) + + def get( + self, + configuration: str, + reference_times: List[str] = None, + compute: bool = True + ) -> Union[pd.DataFrame, dd.DataFrame]: + """Retrieve National Water Model data as a DataFrame. + + Parameters + ---------- + configuration: str, required + NWM configuration cycle. + reference_times: List[str], optional, default None + List of reference time strings in %Y%m%dT%HZ format. + e.g. ['20210912T01Z',] + compute: bool, optional, default True + Return a pandas.DataFrame instead of a dask.dataframe.DataFrame. + + Returns + ------- + dask.dataframe.DataFrame of NWM data or a pandas.DataFrame in canonical + format. + """ + # Check for cache + if self.dataframe_cache == None: + raise CacheNotFoundError("get requires a cache. Set a cache or use get_cycle.") + + # List of individual parquet files + parquet_files = [] + + # Cache data + for reference_time in reference_times: + # Set subdirectory + subdirectory = f"{configuration}/RT{reference_time}" + + # Set download directory + netcdf_dir = self.file_directory / subdirectory + + # Get dask dataframe + try: + df = self.dataframe_cache.get( + function=self.get_cycle, + subdirectory=subdirectory, + configuration=configuration, + reference_time=reference_time, + netcdf_dir=netcdf_dir + ) + except QueryError: + message = (f"No data found for configuration '{configuration}' and " + + f"reference time '{reference_time}'") + warnings.warn(message, RuntimeWarning) + continue + + # Note file created + parquet_files.append(self.dataframe_cache.directory/subdirectory) + + # Check file list + if len(parquet_files) == 0: + message = (f"Unable to retrieve any data.") + raise QueryError(message) + + # Clean-up NetCDF files + if self.cleanup_files: + shutil.rmtree(self.file_directory) + + # Limit to canonical columns + # NOTE I could not keep dask from adding a "dir0" column using either + # fastparquet or pyarrow as backends + # A future version of dask or the backends may fix this + columns = [ + 'nwm_feature_id', + 'reference_time', + 'value_time', + 'value', + 'configuration', + 'variable_name', + 'measurement_unit', + 'usgs_site_code' + ] + + # Return all reference times + df = dd.read_parquet(parquet_files, columns=columns) + + # Return pandas dataframe + if compute: + # Compute + df = df.compute() + + # Downcast + df["value"] = pd.to_numeric(df["value"], downcast="float") + df["nwm_feature_id"] = pd.to_numeric(df["nwm_feature_id"], downcast="integer") + return df + return df + + @property + def file_directory(self) -> Path: + return self._file_directory + + @file_directory.setter + def file_directory(self, file_directory: Union[str, Path]) -> None: + self._file_directory = Path(file_directory).expanduser().resolve() + self._file_directory.mkdir(exist_ok=True, parents=True) + + @property + def dataframe_cache(self) -> ParquetCache: + return self._dataframe_cache + + @dataframe_cache.setter + def dataframe_cache(self, + dataframe_cache: Union[ParquetCache, None]) -> None: + self._dataframe_cache = dataframe_cache + + @property + def catalog(self) -> NWMFileCatalog: + return self._catalog + + @catalog.setter + def catalog(self, + catalog: NWMFileCatalog) -> None: + self._catalog = catalog + + @property + def crosswalk(self) -> pd.DataFrame: + return self._crosswalk + + @crosswalk.setter + def crosswalk(self, + crosswalk: pd.DataFrame) -> None: + self._crosswalk = crosswalk + + @property + def ssl_context(self) -> ssl.SSLContext: + return self._ssl_context + + @ssl_context.setter + def ssl_context(self, ssl_context: ssl.SSLContext) -> None: + self._ssl_context = ssl_context + + @property + def cleanup_files(self) -> bool: + return self._cleanup_files + + @cleanup_files.setter + def cleanup_files(self, cleanup_files: bool) -> None: + self._cleanup_files = cleanup_files diff --git a/python/nwm_client_new/src/hydrotools/nwm_client_new/_version.py b/python/nwm_client_new/src/hydrotools/nwm_client_new/_version.py index bef9d8e6..b69b2190 100644 --- a/python/nwm_client_new/src/hydrotools/nwm_client_new/_version.py +++ b/python/nwm_client_new/src/hydrotools/nwm_client_new/_version.py @@ -1 +1 @@ -__version__ = "6.1.0b0" +__version__ = "6.2.0b0" diff --git a/python/nwm_client_new/tests/test_NWMClient.py b/python/nwm_client_new/tests/test_NWMClient.py index 5517dbcb..6d8dec24 100644 --- a/python/nwm_client_new/tests/test_NWMClient.py +++ b/python/nwm_client_new/tests/test_NWMClient.py @@ -1,6 +1,6 @@ import pytest -from hydrotools.nwm_client_new.NWMClient import NWMFileClient -from hydrotools.nwm_client_new.NWMFileCatalog import HTTPFileCatalog +from hydrotools.nwm_client_new.NWMFileClient import NWMFileClient +from hydrotools.nwm_client_new.HTTPFileCatalog import HTTPFileCatalog import pandas as pd from tempfile import TemporaryDirectory from hydrotools.nwm_client_new.NWMClient import CacheNotFoundError, QueryError diff --git a/python/nwm_client_new/tests/test_NWMFileCatalog.py b/python/nwm_client_new/tests/test_NWMFileCatalog.py index 1e5d8246..22dc9985 100644 --- a/python/nwm_client_new/tests/test_NWMFileCatalog.py +++ b/python/nwm_client_new/tests/test_NWMFileCatalog.py @@ -1,5 +1,6 @@ import pytest -from hydrotools.nwm_client_new.NWMFileCatalog import GCPFileCatalog, HTTPFileCatalog +from hydrotools.nwm_client_new.GCPFileCatalog import GCPFileCatalog +from hydrotools.nwm_client_new.HTTPFileCatalog import HTTPFileCatalog import pandas as pd # Set reference time