diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 41d02e4..b3e65c7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -13,13 +13,13 @@ jobs: - uses: actions/checkout@main - uses: actions/setup-python@main with: - python-version: '3.x' + python-version: "3.x" - uses: actions/cache@main with: path: ${{ env.pythonLocation }} key: build-${{ runner.os }}-${{ env.pythonLocation }}-${{ hashFiles('pyproject.toml', 'setup.*') }} - run: pip wheel . --no-deps -w dist - - uses: pypa/gh-action-pypi-publish@master + - uses: pypa/gh-action-pypi-publish@release/v1 with: user: __token__ password: ${{ secrets.PYPI_TOKEN }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fdaf062..053a4d2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -41,7 +41,7 @@ repos: - id: "shellcheck" - repo: "https://github.com/python-jsonschema/check-jsonschema" - rev: "0.28.5" + rev: "0.28.6" hooks: - id: "check-github-workflows" - id: "check-readthedocs" @@ -60,7 +60,7 @@ repos: - repo: "https://github.com/charliermarsh/ruff-pre-commit" # Ruff version. - rev: 'v0.4.9' + rev: 'v0.4.10' hooks: - id: "ruff" diff --git a/README.md b/README.md index 14123a6..8842a7b 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ Searvey aims to provide the following functionality: - U.S. Center for Operational Oceanographic Products and Services (CO-OPS) - Flanders Marine Institute (VLIZ); Intergovernmental Oceanographic Commission (IOC) - U.S. Geological Survey (USGS) + - Army Corp WL ## Installation diff --git a/docs/source/usace.rst b/docs/source/usace.rst new file mode 100644 index 0000000..c94147c --- /dev/null +++ b/docs/source/usace.rst @@ -0,0 +1,15 @@ +USACE RiverGages +============== +The U.S. Army Corps of Engineers RiverGages _ +system provides water level data for rivers and waterways across the United States. +searvey uses the RiverGages REST API to access this data. Currently, water level +data is exposed in searvey. + +The data from an individual station can be retrieved with: +.. autofunction:: searvey.usace.get_usace_station + +You can fetch data from multiple stations and multiple different dates with: +.. autofunction:: searvey.usace.fetch_usace + +Note: The verify=False parameter in the httpx.Client() is used here to bypass +SSL verification, which is the only way to access the USACE RiverGages API. \ No newline at end of file diff --git a/examples/USACE_data.ipynb b/examples/USACE_data.ipynb new file mode 100644 index 0000000..8ca959c --- /dev/null +++ b/examples/USACE_data.ipynb @@ -0,0 +1,125 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Set up for Army Corps WL data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import logging\n", + "import geopandas as gpd\n", + "import matplotlib.pyplot as plt\n", + "import pandas as pd\n", + "import httpx\n", + "from searvey._usace_api import fetch_usace_station\n", + "\n", + "logging.basicConfig(\n", + " level=20,\n", + " style=\"{\",\n", + " format=\"{asctime:s}; {levelname:8s}; {threadName:23s}; {name:<25s} {lineno:5d}; {message:s}\",\n", + ")\n", + "\n", + "logging.getLogger(\"urllib3\").setLevel(30)\n", + "logging.getLogger(\"parso\").setLevel(30)\n", + "\n", + "logger = logging.getLogger(__name__)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Fetch WL data from a single station" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "# Define start and end dates for data retrieval, you can use either datetime.date or string for the dates\n", + "import datetime\n", + "df = fetch_usace_station(\"01300\", datetime.date(2020, 4, 5), end_date=\"2020-04-10\",http_client=httpx.Client(verify=False))\n", + "\n", + "df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Fetch Army Corps Water Level Data from multiple station" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from searvey._usace_api import _fetch_usace\n", + "import httpx\n", + "import pandas as pd\n", + "\n", + "df = _fetch_usace(\n", + " station_ids=[\"01300\"],\n", + " start_dates=[\"2020-04-05\"],\n", + " end_dates=[\"2020-04-10\"],\n", + " rate_limit=None,\n", + " http_client=httpx.Client(verify=False),\n", + " multiprocessing_executor=None,\n", + " multithreading_executor=None\n", + ")\n", + "df['01300']\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Graph the data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import hvplot.pandas\n", + "\n", + "df[\"01300\"].hvplot(title=\"Army Corps WL values\", xlabel=\"Index\", ylabel=\"Value\")\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/searvey/__init__.py b/searvey/__init__.py index 5d621e9..0aded10 100644 --- a/searvey/__init__.py +++ b/searvey/__init__.py @@ -4,6 +4,7 @@ from searvey._coops_api import fetch_coops_station from searvey._ioc_api import fetch_ioc_station +from searvey._usace_api import fetch_usace_station from searvey.coops import get_coops_stations from searvey.ioc import get_ioc_data from searvey.ioc import get_ioc_stations @@ -24,4 +25,5 @@ "get_usgs_stations", "Provider", "__version__", + "fetch_usace_station", ] diff --git a/searvey/_usace_api.py b/searvey/_usace_api.py new file mode 100644 index 0000000..23f12fb --- /dev/null +++ b/searvey/_usace_api.py @@ -0,0 +1,190 @@ +import logging +import xml.etree.ElementTree as ET +from collections import abc +from datetime import datetime +from typing import List +from typing import Union +from typing import Optional + +import httpx +import multifutures +import pandas as pd + +from ._common import _fetch_url +from ._common import _resolve_end_date +from ._common import _resolve_http_client +from ._common import _resolve_rate_limit +from ._common import _resolve_start_date +from .custom_types import DatetimeLike + +logger = logging.getLogger(__name__) + +BASE_URL = "https://rivergages.mvr.usace.army.mil/watercontrol/webservices/rest/webserviceWaterML.cfc?method=RGWML&meth=getValues&location={location}&site={site}&variable={variable}&beginDate={begin_date}&endDate={end_date}&authToken=RiverGages" + + +def _parse_xml_data(content: str, station_id: str) -> pd.DataFrame: + try: + namespace = {"wml": "http://www.cuahsi.org/waterML/1.0/"} + root = ET.fromstring(content) + values_element = root.find(".//wml:values", namespaces=namespace) + + if values_element is None: + logger.warning(f"{station_id}: No 'values' element found in the XML.") + return pd.DataFrame() + + data = [] + for value_element in values_element.findall("wml:value", namespaces=namespace): + date_time = value_element.get("dateTime") + value = value_element.text + date_time_obj = datetime.strptime(date_time, "%Y-%m-%dT%H:%M:%S") + data.append({"time": date_time_obj, "value": float(value)}) + + df = pd.DataFrame(data) + df.set_index("time", inplace=True) + df.index = pd.to_datetime(df.index, utc=True) + df.attrs["station_id"] = f"USACE-{station_id}" + return df + except ET.ParseError: + logger.error(f"{station_id}: Failed to parse XML data.") + return pd.DataFrame() + + +def _generate_urls( + station_id: str, + start_date: pd.Timestamp, + end_date: pd.Timestamp, +) -> list[str]: + if end_date < start_date: + raise ValueError(f"'end_date' must be after 'start_date': {end_date} vs {start_date}") + if end_date == start_date: + return [] + + url = BASE_URL.format( + location=station_id, + site=station_id, + variable="HG", + begin_date=start_date.strftime("%Y-%m-%dT%H:%M"), + end_date=end_date.strftime("%Y-%m-%dT%H:%M"), + ) + return [url] + + +def _retrieve_usace_data( + station_ids: abc.Collection[str], + start_dates: abc.Collection[pd.Timestamp], + end_dates: abc.Collection[pd.Timestamp], + rate_limit: multifutures.RateLimit, + http_client: httpx.Client, + executor: Optional[multifutures.ExecutorProtocol] = None, +) -> list[multifutures.FutureResult]: + kwargs = [] + for station_id, start_date, end_date in zip(station_ids, start_dates, end_dates): + for url in _generate_urls(station_id=station_id, start_date=start_date, end_date=end_date): + logger.info("USACE-%s: Starting scraping: %s - %s", station_id, start_date, end_date) + if url: + kwargs.append( + dict( + station_id=station_id, + url=url, + client=http_client, + rate_limit=rate_limit, + ), + ) + with http_client: + logger.debug("Starting data retrieval") + results = multifutures.multithread( + func=_fetch_url, func_kwargs=kwargs, check=False, executor=executor + ) + logger.debug("Finished data retrieval") + return results + + +def _fetch_usace( + station_ids: abc.Collection[str], + start_dates: Union[DatetimeLike, List[DatetimeLike]] = None, + end_dates: Union[DatetimeLike, List[DatetimeLike]] = None, + *, + rate_limit: Optional[multifutures.RateLimit] = None, + http_client: Optional[httpx.Client] = None, + multiprocessing_executor: Optional[multifutures.ExecutorProtocol] = None, + multithreading_executor: Optional[multifutures.ExecutorProtocol] = None, +) -> dict[str, pd.DataFrame]: + rate_limit = _resolve_rate_limit(rate_limit) + http_client = _resolve_http_client(http_client) + + now = pd.Timestamp.now("utc") + + start_dates = [start_dates] if not isinstance(start_dates, list) else start_dates + end_dates = [end_dates] if not isinstance(end_dates, list) else end_dates + + # we get the first index because the output is (DatetimeIndex(['2020-04-05'], dtype='datetime64[ns]', freq=None) + start_dates = [_resolve_start_date(now, date)[0] for date in start_dates] + end_dates = [_resolve_end_date(now, date)[0] for date in end_dates] + + usace_responses = _retrieve_usace_data( + station_ids=station_ids, + start_dates=start_dates, + end_dates=end_dates, + rate_limit=rate_limit, + http_client=http_client, + executor=multithreading_executor, + ) + + dataframes = {} + for response in usace_responses: + station_id = response.kwargs["station_id"] + if response.exception: + logger.error(f"USACE-{station_id}: Failed to retrieve data. Error: {response.exception}") + continue + df = _parse_xml_data(response.result, station_id) + if not df.empty: + dataframes[station_id] = df + else: + logger.warning(f"USACE-{station_id}: No data retrieved or parsed.") + + return dataframes + + +def fetch_usace_station( + station_id: str, + start_date: Optional[DatetimeLike] = None, + end_date: Optional[DatetimeLike] = None, + *, + rate_limit: Optional[multifutures.RateLimit] = None, + http_client: Optional[httpx.Client] = None, + multiprocessing_executor: Optional[multifutures.ExecutorProtocol] = None, + multithreading_executor: Optional[multifutures.ExecutorProtocol] = None, +) -> pd.DataFrame: + """ + Make a query to the USACE API for river gauge data for ``station_id`` + and return the results as a ``pandas.DataFrame``. + + :param station_id: The station identifier. + :param start_date: The starting date of the query. + :param end_date: The finishing date of the query. + :param rate_limit: The rate limit for making requests to the USACE servers. + :param http_client: The ``httpx.Client``, this should have the parameter verify=False. + :param multiprocessing_executor + :param multithreading_executor + """ + logger.info("USACE-%s: Starting scraping: %s - %s", station_id, start_date, end_date) + try: + df = _fetch_usace( + station_ids=[station_id], + start_dates=start_date, + end_dates=end_date, + rate_limit=rate_limit, + http_client=http_client, + multiprocessing_executor=multiprocessing_executor, + multithreading_executor=multithreading_executor, + ).get(station_id, pd.DataFrame()) + except Exception as e: + logger.error(f"USACE-{station_id}: An error occurred while fetching data: {str(e)}") + df = pd.DataFrame() + + if df.empty: + logger.warning(f"USACE-{station_id}: No data retrieved for the specified period.") + else: + logger.info("USACE-%s: Finished scraping: %s - %s", station_id, start_date, end_date) + + return df diff --git a/tests/usace_test.py b/tests/usace_test.py new file mode 100644 index 0000000..0e79fb2 --- /dev/null +++ b/tests/usace_test.py @@ -0,0 +1,70 @@ +from unittest.mock import patch + +import httpx +import pandas as pd +import pytest + +from searvey._usace_api import _fetch_usace +from searvey._usace_api import _generate_urls +from searvey._usace_api import fetch_usace_station + + +def test_generate_urls(): + station_id = "01300" + start_date = pd.Timestamp("2020-04-05") + end_date = pd.Timestamp("2020-04-10") + + urls = _generate_urls(station_id, start_date, end_date) + + assert len(urls) == 1 + assert station_id in urls[0] + assert "2020-04-05" in urls[0] + assert "2020-04-10" in urls[0] + + +def test_fetch_usace(): + result = _fetch_usace( + station_ids=["01300"], + start_dates=["2020-04-05"], + end_dates=["2020-04-10"], + rate_limit=None, + http_client=httpx.Client(verify=False), + multiprocessing_executor=None, + multithreading_executor=None, + ) + assert "01300" in result + assert isinstance(result["01300"], pd.DataFrame) + assert len(result) == 1 + + +@patch("searvey._usace_api._fetch_usace") +def test_fetch_usace_station(mock_fetch): + mock_df = pd.DataFrame( + {"value": [10.5, 11.2, 10.8]}, index=pd.date_range("2020-04-05", periods=3, freq="D") + ) + mock_df.index.name = "time" + mock_df.attrs["station_id"] = "USACE-01300" + + mock_fetch.return_value = {"01300": mock_df} + + result = fetch_usace_station( + "01300", start_date="2020-04-05", end_date="2020-04-10", http_client=httpx.Client(verify=False) + ) + + assert isinstance(result, pd.DataFrame) + assert len(result) == 3 + assert result.index.name == "time" + assert "value" in result.columns + assert result.attrs["station_id"] == "USACE-01300" + + +def test_fetch_usace_station_error_handling(): + with patch("searvey._usace_api._fetch_usace", side_effect=Exception("API Error")): + result = fetch_usace_station( + "01300", start_date="2020-04-05", end_date="2020-04-10", http_client=httpx.Client(verify=False) + ) + assert result.empty + + +if __name__ == "__main__": + pytest.main()