From 52c1529c430dfe9cf6fbf1b4d48eb1a9295e63ec Mon Sep 17 00:00:00 2001 From: Alex Dunn <75815303+alexdunnjpl@users.noreply.github.com> Date: Wed, 26 Jul 2023 15:55:48 -0700 Subject: [PATCH] 40 add logging aggregation, LOGLEVEL env var, other logging tweaks (#45) * implement aggregation of product update errors by error type and error reason this prevents the logs from getting blasted when batches full of similar errors present. does not aggregate when the product id appears in the reason * implement environment variable LOGLEVEL in sweepers_driver takes an int or string representation of a python standard log level like INFO * add lowercase support to utils.parse_log_level() * update README.md for LOGLEVEL env var * fix bug in query progress logging * add explanation for Opensearch returning HTTP200 when failures exist * remove redundant/misleading log message --- README.md | 1 + docker/README.md | 2 + docker/sweepers_driver.py | 5 +- src/pds/registrysweepers/utils/__init__.py | 70 +++++++++++++++------- 4 files changed, 56 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 776ca42..70bac0e 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ The ancestry sweeper generates membership metadata for each product, i.e. which ``` PROV_CREDENTIALS={"admin": "admin"} // OpenSearch username/password PROV_ENDPOINT=https://localhost:9200 // OpenSearch host url and port +LOGLEVEL - an integer log level or anycase string matching a python log level like `INFO` (optional - defaults to `INFO`)) DEV_MODE=1 // disables host verification ``` diff --git a/docker/README.md b/docker/README.md index a4af75f..f510760 100644 --- a/docker/README.md +++ b/docker/README.md @@ -5,6 +5,8 @@ Requires a running deployment of registry #### Env Variables `PROV_ENDPOINT` - the URL of the registry OpenSearch http endpoint `PROV_CREDENTIALS` - a JSON string of format `{"$username": "$password"}` +`LOGLEVEL` - (optional - defaults to `INFO`) an integer log level or anycase string matching a python log level like `INFO` +`DEV_MODE=1` - (optional) in dev mode, host cert verification is disabled ### Development diff --git a/docker/sweepers_driver.py b/docker/sweepers_driver.py index 487f9c1..658458f 100755 --- a/docker/sweepers_driver.py +++ b/docker/sweepers_driver.py @@ -62,7 +62,7 @@ from typing import Callable, Iterable from pds.registrysweepers import provenance, ancestry -from pds.registrysweepers.utils import configure_logging, get_human_readable_elapsed_since +from pds.registrysweepers.utils import configure_logging, get_human_readable_elapsed_since, parse_log_level configure_logging(filepath=None, log_level=logging.INFO) log = logging.getLogger(__name__) @@ -92,6 +92,7 @@ logging.error(err) raise ValueError(f'Failed to parse username/password from PROV_CREDENTIALS value "{provCredentialsStr}": {err}') +log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO')) def run_factory(sweeper_f: Callable) -> Callable: return functools.partial( @@ -100,7 +101,7 @@ def run_factory(sweeper_f: Callable) -> Callable: username=username, password=password, log_filepath='provenance.log', - log_level=logging.INFO, # TODO: pull this from LOGLEVEL env var + log_level=log_level, verify_host_certs=True if not dev_mode else False ) diff --git a/src/pds/registrysweepers/utils/__init__.py b/src/pds/registrysweepers/utils/__init__.py index 599f29b..489e1df 100644 --- a/src/pds/registrysweepers/utils/__init__.py +++ b/src/pds/registrysweepers/utils/__init__.py @@ -77,7 +77,7 @@ def parse_log_level(input: str) -> int: try: result = int(input) except ValueError: - result = getattr(logging, input) + result = getattr(logging, input.upper()) return result @@ -123,6 +123,9 @@ def query_registry_db( path = ",".join([index_name] + cross_cluster_indexes) + f"/_search?scroll={scroll_validity_duration_minutes}m" served_hits = 0 + last_info_log_at_percentage = 0 + log.info("Query progress: 0%") + more_data_exists = True while more_data_exists: resp = retry_call( @@ -143,14 +146,11 @@ def query_registry_db( total_hits = data["hits"]["total"]["value"] log.debug(f" paging query ({served_hits} to {min(served_hits + page_size, total_hits)} of {total_hits})") - last_info_log_at_percentage = 0 - log.info("Query progress: 0%") - for hit in data["hits"]["hits"]: served_hits += 1 percentage_of_hits_served = int(served_hits / total_hits * 100) - if last_info_log_at_percentage is None or percentage_of_hits_served > (last_info_log_at_percentage + 5): + if last_info_log_at_percentage is None or percentage_of_hits_served >= (last_info_log_at_percentage + 5): last_info_log_at_percentage = percentage_of_hits_served log.info(f"Query progress: {percentage_of_hits_served}%") @@ -245,25 +245,55 @@ def _write_bulk_updates_chunk(host: Host, index_name: str, bulk_updates: Iterabl headers=headers, verify=host.verify, ) - response.raise_for_status() + # N.B. HTTP status 200 is insufficient as a success check for _bulk API. + # See: https://github.com/elastic/elasticsearch/issues/41434 + response.raise_for_status() response_content = response.json() if response_content.get("errors"): warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour - items_with_error = [item for item in response_content["items"] if "error" in item["update"]] - items_with_warnings = [item for item in items_with_error if item["update"]["error"]["type"] in warn_types] - items_with_errors = [item for item in items_with_error if item["update"]["error"]["type"] not in warn_types] - - for item in items_with_warnings: - error_type = item["update"]["error"]["type"] - log.warning(f'Attempt to update document {item["update"]["_id"]} failed due to {error_type}') - - for item in items_with_errors: - log.error( - f'Attempt to update document {item["update"]["_id"]} unexpectedly failed: {item["update"]["error"]}' - ) - - log.info("Successfully wrote bulk updates chunk") + items_with_problems = [item for item in response_content["items"] if "error" in item["update"]] + + if log.isEnabledFor(logging.WARNING): + items_with_warnings = [ + item for item in items_with_problems if item["update"]["error"]["type"] in warn_types + ] + warning_aggregates = aggregate_update_error_types(items_with_warnings) + for error_type, reason_aggregate in warning_aggregates.items(): + for error_reason, ids in reason_aggregate.items(): + log.warning( + f"Attempt to update the following documents failed due to {error_type} ({error_reason}): {ids}" + ) + + if log.isEnabledFor(logging.ERROR): + items_with_errors = [ + item for item in items_with_problems if item["update"]["error"]["type"] not in warn_types + ] + error_aggregates = aggregate_update_error_types(items_with_errors) + for error_type, reason_aggregate in error_aggregates.items(): + for error_reason, ids in reason_aggregate.items(): + log.error( + f"Attempt to update the following documents failed unexpectedly due to {error_type} ({error_reason}): {ids}" + ) + + +def aggregate_update_error_types(items: Iterable[Dict]) -> Mapping[str, Dict[str, List[str]]]: + """Return a nested aggregation of ids, aggregated first by error type, then by reason""" + agg: Dict[str, Dict[str, List[str]]] = {} + for item in items: + id = item["update"]["_id"] + error = item["update"]["error"] + error_type = error["type"] + error_reason = error["reason"] + if error_type not in agg: + agg[error_type] = {} + + if error_reason not in agg[error_type]: + agg[error_type][error_reason] = [] + + agg[error_type][error_reason].append(id) + + return agg def coerce_list_type(db_value: Any) -> List[Any]: