diff --git a/README.md b/README.md index 776ca42..70bac0e 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ The ancestry sweeper generates membership metadata for each product, i.e. which ``` PROV_CREDENTIALS={"admin": "admin"} // OpenSearch username/password PROV_ENDPOINT=https://localhost:9200 // OpenSearch host url and port +LOGLEVEL - an integer log level or anycase string matching a python log level like `INFO` (optional - defaults to `INFO`)) DEV_MODE=1 // disables host verification ``` diff --git a/docker/README.md b/docker/README.md index a4af75f..f510760 100644 --- a/docker/README.md +++ b/docker/README.md @@ -5,6 +5,8 @@ Requires a running deployment of registry #### Env Variables `PROV_ENDPOINT` - the URL of the registry OpenSearch http endpoint `PROV_CREDENTIALS` - a JSON string of format `{"$username": "$password"}` +`LOGLEVEL` - (optional - defaults to `INFO`) an integer log level or anycase string matching a python log level like `INFO` +`DEV_MODE=1` - (optional) in dev mode, host cert verification is disabled ### Development diff --git a/docker/sweepers_driver.py b/docker/sweepers_driver.py index 487f9c1..658458f 100755 --- a/docker/sweepers_driver.py +++ b/docker/sweepers_driver.py @@ -62,7 +62,7 @@ from typing import Callable, Iterable from pds.registrysweepers import provenance, ancestry -from pds.registrysweepers.utils import configure_logging, get_human_readable_elapsed_since +from pds.registrysweepers.utils import configure_logging, get_human_readable_elapsed_since, parse_log_level configure_logging(filepath=None, log_level=logging.INFO) log = logging.getLogger(__name__) @@ -92,6 +92,7 @@ logging.error(err) raise ValueError(f'Failed to parse username/password from PROV_CREDENTIALS value "{provCredentialsStr}": {err}') +log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO')) def run_factory(sweeper_f: Callable) -> Callable: return functools.partial( @@ -100,7 +101,7 @@ def run_factory(sweeper_f: Callable) -> Callable: username=username, password=password, log_filepath='provenance.log', - log_level=logging.INFO, # TODO: pull this from LOGLEVEL env var + log_level=log_level, verify_host_certs=True if not dev_mode else False ) diff --git a/src/pds/registrysweepers/utils/__init__.py b/src/pds/registrysweepers/utils/__init__.py index 599f29b..489e1df 100644 --- a/src/pds/registrysweepers/utils/__init__.py +++ b/src/pds/registrysweepers/utils/__init__.py @@ -77,7 +77,7 @@ def parse_log_level(input: str) -> int: try: result = int(input) except ValueError: - result = getattr(logging, input) + result = getattr(logging, input.upper()) return result @@ -123,6 +123,9 @@ def query_registry_db( path = ",".join([index_name] + cross_cluster_indexes) + f"/_search?scroll={scroll_validity_duration_minutes}m" served_hits = 0 + last_info_log_at_percentage = 0 + log.info("Query progress: 0%") + more_data_exists = True while more_data_exists: resp = retry_call( @@ -143,14 +146,11 @@ def query_registry_db( total_hits = data["hits"]["total"]["value"] log.debug(f" paging query ({served_hits} to {min(served_hits + page_size, total_hits)} of {total_hits})") - last_info_log_at_percentage = 0 - log.info("Query progress: 0%") - for hit in data["hits"]["hits"]: served_hits += 1 percentage_of_hits_served = int(served_hits / total_hits * 100) - if last_info_log_at_percentage is None or percentage_of_hits_served > (last_info_log_at_percentage + 5): + if last_info_log_at_percentage is None or percentage_of_hits_served >= (last_info_log_at_percentage + 5): last_info_log_at_percentage = percentage_of_hits_served log.info(f"Query progress: {percentage_of_hits_served}%") @@ -245,25 +245,55 @@ def _write_bulk_updates_chunk(host: Host, index_name: str, bulk_updates: Iterabl headers=headers, verify=host.verify, ) - response.raise_for_status() + # N.B. HTTP status 200 is insufficient as a success check for _bulk API. + # See: https://github.com/elastic/elasticsearch/issues/41434 + response.raise_for_status() response_content = response.json() if response_content.get("errors"): warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour - items_with_error = [item for item in response_content["items"] if "error" in item["update"]] - items_with_warnings = [item for item in items_with_error if item["update"]["error"]["type"] in warn_types] - items_with_errors = [item for item in items_with_error if item["update"]["error"]["type"] not in warn_types] - - for item in items_with_warnings: - error_type = item["update"]["error"]["type"] - log.warning(f'Attempt to update document {item["update"]["_id"]} failed due to {error_type}') - - for item in items_with_errors: - log.error( - f'Attempt to update document {item["update"]["_id"]} unexpectedly failed: {item["update"]["error"]}' - ) - - log.info("Successfully wrote bulk updates chunk") + items_with_problems = [item for item in response_content["items"] if "error" in item["update"]] + + if log.isEnabledFor(logging.WARNING): + items_with_warnings = [ + item for item in items_with_problems if item["update"]["error"]["type"] in warn_types + ] + warning_aggregates = aggregate_update_error_types(items_with_warnings) + for error_type, reason_aggregate in warning_aggregates.items(): + for error_reason, ids in reason_aggregate.items(): + log.warning( + f"Attempt to update the following documents failed due to {error_type} ({error_reason}): {ids}" + ) + + if log.isEnabledFor(logging.ERROR): + items_with_errors = [ + item for item in items_with_problems if item["update"]["error"]["type"] not in warn_types + ] + error_aggregates = aggregate_update_error_types(items_with_errors) + for error_type, reason_aggregate in error_aggregates.items(): + for error_reason, ids in reason_aggregate.items(): + log.error( + f"Attempt to update the following documents failed unexpectedly due to {error_type} ({error_reason}): {ids}" + ) + + +def aggregate_update_error_types(items: Iterable[Dict]) -> Mapping[str, Dict[str, List[str]]]: + """Return a nested aggregation of ids, aggregated first by error type, then by reason""" + agg: Dict[str, Dict[str, List[str]]] = {} + for item in items: + id = item["update"]["_id"] + error = item["update"]["error"] + error_type = error["type"] + error_reason = error["reason"] + if error_type not in agg: + agg[error_type] = {} + + if error_reason not in agg[error_type]: + agg[error_type][error_reason] = [] + + agg[error_type][error_reason].append(id) + + return agg def coerce_list_type(db_value: Any) -> List[Any]: