Skip to content

Commit

Permalink
Merge pull request #39 from CDJellen/user/cjellen/logging-refactor
Browse files Browse the repository at this point in the history
Logging Refactor
  • Loading branch information
CDJellen authored Aug 4, 2024
2 parents 605eb29 + 2e86253 commit b3dc6fb
Show file tree
Hide file tree
Showing 21 changed files with 28,835 additions and 143 deletions.
2 changes: 1 addition & 1 deletion ndbc_api/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
HTTP_RETRY = 5
HTTP_BACKOFF_FACTOR = 0.8
HTTP_DELAY = 2000
HTTP_DEBUG = True
HTTP_DEBUG = False
110 changes: 100 additions & 10 deletions ndbc_api/ndbc_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
ResponseException, TimestampException)
from .utilities.req_handler import RequestHandler
from .utilities.singleton import Singleton
from .utilities.log_formatter import LogFormatter
from concurrent.futures import ThreadPoolExecutor


Expand Down Expand Up @@ -72,12 +73,13 @@ class NdbcApi(metaclass=Singleton):
private `RequestHandler` instance.
"""

log = logging.getLogger(LOGGER_NAME)
logger = logging.getLogger(LOGGER_NAME)
warnings.simplefilter(action='ignore', category=FutureWarning)

def __init__(
self,
logging_level: int = logging.WARNING if HTTP_DEBUG else 0,
logging_level: int = logging.WARNING if HTTP_DEBUG else logging.ERROR,
filename: Any = None,
cache_limit: int = DEFAULT_CACHE_LIMIT,
headers: dict = {},
delay: int = HTTP_DELAY,
Expand All @@ -87,7 +89,6 @@ def __init__(
debug: bool = HTTP_DEBUG,
):
"""Initializes the singleton `NdbcApi`, sets associated handlers."""
self.log.setLevel(logging_level)
self.cache_limit = cache_limit
self.headers = headers
self._handler = self._get_request_handler(
Expand All @@ -101,6 +102,7 @@ def __init__(
)
self._stations_api = StationsHandler
self._data_api = DataHandler
self.configure_logging(level=logging_level, filename=filename)

def dump_cache(self, dest_fp: Union[str, None] = None) -> Union[dict, None]:
"""Dump the request cache to dict or the specified filepath.
Expand Down Expand Up @@ -167,6 +169,58 @@ def set_headers(self, request_headers: dict) -> None:
"""Reset the request headers using the new supplied headers."""
self._handler.set_headers(request_headers)

def configure_logging(self, level=logging.WARNING, filename=None) -> None:
"""Configures logging for the NdbcApi.
Args:
level (int, optional): The logging level. Defaults to logging.WARNING.
filename (str, optional): If provided, logs to the specified file.
"""
self.logger.setLevel(level)

handler: logging.Handler
formatter: logging.Formatter

for handler in self.logger.handlers[:]:
self.logger.removeHandler(handler)

if filename:
handler = logging.FileHandler(filename)
formatter = logging.Formatter(
'[%(asctime)s][%(levelname)s]: %(message)s')
else:
handler = logging.StreamHandler()
formatter = LogFormatter('[%(levelname)s]: %(message)s')

handler.setFormatter(formatter)
self.logger.addHandler(handler)

def log(self,
level: int,
station_id: Union[int, str, None] = None,
mode: Union[str, None] = None,
message: Union[str, None] = None,
**extra_data) -> None:
"""Logs a structured message with metadata.
Args:
level (int): The logging level.
station_id (str, optional): The NDBC station ID.
mode (str, optional): The data mode.
message (str, optional): The log message.
**extra_data: Additional key-value pairs to include in the log.
"""
log_data = {}
if station_id:
log_data['station_id'] = station_id
if mode:
log_data['mode'] = mode
if message:
log_data['message'] = message
for k, v in extra_data.items():
log_data[k] = v
self.logger.log(level, log_data)

def stations(self, as_df: bool = True) -> Union[pd.DataFrame, dict]:
"""Get all stations and station metadata from the NDBC.
Expand Down Expand Up @@ -421,6 +475,8 @@ def get_data(
HandlerException: There was an error in handling the returned data
as a `dict` or `pandas.DataFrame`.
"""
self.log(logging.DEBUG,
message=f"`get_data` called with arguments: {locals()}")
if station_id is None and station_ids is None:
raise ValueError('Both `station_id` and `station_ids` are `None`.')
if station_id is not None and station_ids is not None:
Expand All @@ -444,13 +500,18 @@ def get_data(
if modes is not None:
handle_modes.extend(modes)

self.log(logging.INFO,
message=(f"Processing request for station_ids "
f"{handle_station_ids} and modes "
f"{handle_modes}"))

# accumulated_data records the handled response and parsed station_id
# as a tuple, with the data as the first value and the id as the second.
accumulated_data: List[Tuple[Union[pd.DataFrame, dict], str]] = []
with ThreadPoolExecutor(max_workers=len(handle_station_ids) *
len(handle_modes)) as executor:
data_requests = list(itertools.product(
handle_station_ids, handle_modes))
data_requests = list(
itertools.product(handle_station_ids, handle_modes))
futures = [
executor.submit(self._handle_get_data,
station_id=station_id,
Expand All @@ -459,29 +520,58 @@ def get_data(
end_time=end_time,
use_timestamp=use_timestamp,
as_df=as_df,
cols=cols)
for station_id, mode in data_requests
cols=cols) for station_id, mode in data_requests
]
for i, future in enumerate(futures):
try:
data = future.result()
self.log(
level=logging.DEBUG,
station_id=data_requests[i][0],
mode=data_requests[i][1],
message=(
f"Successfully processed request for station_id "
f"{data_requests[i][0]} and mode "
f"{data_requests[i][1]}"))
accumulated_data.append(data)
except (RequestException, ResponseException,
HandlerException) as e:
self.log.error(
f"Failed to process request for station_id {data_requests[i][0]} "
f"and mode {data_requests[i][1]} with error: {e}")
self.log(
level=logging.WARN,
station_id=data_requests[i][0],
mode=data_requests[i][1],
message=(f"Failed to process request for station_id "
f"{data_requests[i][0]} and mode "
f"{data_requests[i][1]} with error: {e}"))
continue

self.log(logging.INFO,
message=(f"Finished processing request for "
f"station_ids {handle_station_ids} and "
f"modes {handle_modes} with "
f"{len(accumulated_data)} results"))

# check that we have some response
if len(accumulated_data) == 0:
self.log(logging.WARN,
message=(f"No data was returned for station_ids "
f"{handle_station_ids} and modes "
f"{handle_modes}"))
raise ResponseException(
f'No data was returned for station_ids {handle_station_ids} '
f'and modes {handle_modes}')
# handle the default case where a single station_id and mode are specified
if len(accumulated_data) == 1:
self.log(logging.DEBUG,
message=(f"Returning data for single station_id "
f"{handle_station_ids[0]} and mode "
f"{handle_modes[0]}"))
return accumulated_data[0][0]
# handle the case where multiple station_ids and modes are specified
self.log(logging.DEBUG,
message=(f"Returning data for multiple station_ids "
f"{handle_station_ids} and modes "
f"{handle_modes}"))
return self._handle_accumulate_data(accumulated_data,
station_id_as_column)

Expand Down
16 changes: 16 additions & 0 deletions ndbc_api/utilities/log_formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
A metaclass for singleton types.
"""

import pprint
from logging import Formatter


class LogFormatter(Formatter):
"""Formatter that pretty-prints dictionaries in log messages."""

def format(self, record):
formatted_message = super().format(record)
if isinstance(record.msg, dict):
formatted_message += "\n" + pprint.pformat(record.msg)
return formatted_message
38 changes: 30 additions & 8 deletions ndbc_api/utilities/req_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
stations (:obj:`list`): A list of `Station`s to which requests have
been made.
"""
from typing import List, Union
import logging
from typing import List, Union, Callable

import requests
from urllib3.util import Retry
Expand All @@ -49,7 +50,7 @@ class RequestHandler(metaclass=Singleton):
`NdbcApi` responses. This is implemented as a least-recently
used cache, designed to conserve NDBC resources when querying
measurements for a given station over similar time ranges.
log (:obj:`logging.Logger`): The logger at which to register HTTP
logger (:obj:`logging.Logger`): The logger at which to register HTTP
request and response status codes and headers used for debug
purposes.
delay (:int:): The HTTP(s) request delay parameter, in seconds.
Expand Down Expand Up @@ -92,7 +93,7 @@ def __init__(self, station_id: str, cache_limit: int) -> None:
def __init__(
self,
cache_limit: int,
log: 'logging.Logger',
log: Callable[[Union[str, int, dict]], None],
delay: int,
retries: int,
backoff_factor: float,
Expand Down Expand Up @@ -143,9 +144,15 @@ def get_station(self, station_id: Union[str, int]) -> Station:
if isinstance(station_id, int):
station_id = str(station_id)
if not self.has_station(station_id):
self.log(logging.DEBUG,
station_id=station_id,
message=f'Adding station {station_id} to cache.')
self.add_station(station_id=station_id)
for s in self.stations:
if s.id_ == station_id:
self.log(logging.DEBUG,
station_id=station_id,
message=f'Found station {station_id} in cache.')
return s

def add_station(self, station_id: Union[str, int]) -> None:
Expand All @@ -158,6 +165,9 @@ def handle_requests(self, station_id: Union[str, int],
reqs: List[str]) -> List[str]: # pragma: no cover
"""Handle many string-valued requests against a supplied station."""
responses = []
self.log(
logging.INFO,
message=f'Handling {len(reqs)} requests for station {station_id}.')
for req in reqs:
responses.append(self.handle_request(station_id=station_id,
req=req))
Expand All @@ -166,23 +176,33 @@ def handle_requests(self, station_id: Union[str, int],
def handle_request(self, station_id: Union[str, int], req: str) -> dict:
"""Handle a string-valued requests against a supplied station."""
stn = self.get_station(station_id=station_id)
self.log(logging.DEBUG, message=f'Handling request {req}.')
if req not in stn.reqs.cache:
resp = self.execute_request(url=req, headers=self._request_headers)
self.log(logging.DEBUG, message=f'Adding request {req} to cache.')
resp = self.execute_request(url=req,
station_id=station_id,
headers=self._request_headers)
stn.reqs.put(request=req, response=resp)
else:
self.log(logging.DEBUG, message=f'Request {req} already in cache.')
return stn.reqs.get(request=req)

def execute_request(self, url: str,
def execute_request(self, station_id: Union[str, int], url: str,
headers: dict) -> dict: # pragma: no cover
"""Execute a request with the current headers to NDBC data service."""
self.log(logging.DEBUG,
station_id=station_id,
message=f'GET: {url}',
extra_data={'headers': headers})
response = self._session.get(
url=url,
headers=headers,
allow_redirects=True,
verify=self._verify_https,
)
if self._debug:
self.log.debug(f'GET: {url}\n\theaders: {headers}')
self.log.debug(f'response: {response.status_code}.')
self.log(logging.DEBUG,
station_id=station_id,
message=f'Response status: {response.status_code}')
if response.status_code != 200: # web request did not succeed
return dict(status=response.status_code, body='')
return dict(status=response.status_code, body=response.text)
Expand All @@ -191,6 +211,7 @@ def execute_request(self, url: str,

def _create_session(self) -> requests.Session:
"""create a new `Session` using `RequestHandler` configuration."""
self.log(logging.DEBUG, message='Creating new session.')
session = requests.Session()
retry = Retry(
backoff_factor=self._backoff_factor,
Expand All @@ -199,4 +220,5 @@ def _create_session(self) -> requests.Session:
http_adapter = requests.adapters.HTTPAdapter(max_retries=retry)
session.mount('https://', http_adapter)
session.mount('http://', http_adapter)
self.log(logging.INFO, message='Created session.')
return session
Loading

0 comments on commit b3dc6fb

Please sign in to comment.