diff --git a/src/pds/registrysweepers/reindexer/main.py b/src/pds/registrysweepers/reindexer/main.py index 4d01244..645b318 100644 --- a/src/pds/registrysweepers/reindexer/main.py +++ b/src/pds/registrysweepers/reindexer/main.py @@ -1,5 +1,7 @@ import logging -from datetime import datetime, timedelta +import math +from datetime import datetime +from datetime import timedelta from datetime import timezone from time import sleep from typing import Collection @@ -8,7 +10,6 @@ from typing import Union import dateutil.parser -import math from opensearchpy import OpenSearch from pds.registrysweepers.reindexer.constants import REINDEXER_FLAG_METADATA_KEY from pds.registrysweepers.utils import configure_logging @@ -87,27 +88,27 @@ def accumulate_missing_mappings( # Anything with prefix 'ops:Provenance' is excluded, as these properties are the responsibility of their # respective sweepers. special_case_property_types_by_name = { - '@timestamp': None, - '@version': None, - '_package_id': None, - 'description': 'text', - 'lid': 'keyword', - 'lidvid': 'keyword', - 'ops:Harvest_Info/ops:harvest_date_time': 'date', - 'ops:Label_File_Info/ops:json_blob': None, - 'product_class': 'keyword', - 'ref_lid_associate': 'keyword', - 'ref_lid_collection': 'keyword', - 'ref_lid_collection_secondary': 'keyword', - 'ref_lid_data': 'keyword', - 'ref_lid_document': 'keyword', - 'ref_lid_facility': 'keyword', - 'ref_lid_instrument': 'keyword', - 'ref_lid_instrument_host': 'keyword', - 'ref_lid_investigation': 'keyword', - 'ref_lid_target': 'keyword', - 'ref_lid_telescope': 'keyword', - 'title': 'text', + "@timestamp": None, + "@version": None, + "_package_id": None, + "description": "text", + "lid": "keyword", + "lidvid": "keyword", + "ops:Harvest_Info/ops:harvest_date_time": "date", + "ops:Label_File_Info/ops:json_blob": None, + "product_class": "keyword", + "ref_lid_associate": "keyword", + "ref_lid_collection": "keyword", + "ref_lid_collection_secondary": "keyword", + "ref_lid_data": "keyword", + "ref_lid_document": "keyword", + "ref_lid_facility": "keyword", + "ref_lid_instrument": "keyword", + "ref_lid_instrument_host": "keyword", + "ref_lid_investigation": "keyword", + "ref_lid_target": "keyword", + "ref_lid_telescope": "keyword", + "title": "text", # 'vid' # TODO: need to determine what this should be, as keyword lexical(?) sorting will be a problem } @@ -128,19 +129,25 @@ def accumulate_missing_mappings( for property_name, value in doc["_source"].items(): # Resolve canonical type from data dictionary or - failing that - from the hardcoded types - canonical_type = dd_field_types_by_name.get(property_name) or special_case_property_types_by_name.get(property_name) + canonical_type = dd_field_types_by_name.get(property_name) or special_case_property_types_by_name.get( + property_name + ) current_mapping_type = mapping_field_types_by_field_name.get(property_name) mapping_missing = property_name not in mapping_field_types_by_field_name canonical_type_is_defined = canonical_type is not None - mapping_is_bad = canonical_type != current_mapping_type \ - and canonical_type is not None \ - and current_mapping_type is not None - - if not canonical_type_is_defined \ - and property_name not in special_case_property_types_by_name \ - and not property_name.startswith('ops:Provenance') \ - and property_name not in canonical_type_undefined_property_names: + mapping_is_bad = ( + canonical_type != current_mapping_type + and canonical_type is not None + and current_mapping_type is not None + ) + + if ( + not canonical_type_is_defined + and property_name not in special_case_property_types_by_name + and not property_name.startswith("ops:Provenance") + and property_name not in canonical_type_undefined_property_names + ): log.warning( f"Property {property_name} does not have an entry in the data dictionary index or hardcoded mappings - this may indicate a problem" ) @@ -183,11 +190,15 @@ def accumulate_missing_mappings( f'Property {property_name} will be updated to type "{canonical_type}" from data dictionary' ) missing_mapping_updates[property_name] = canonical_type # type: ignore - elif property_name.startswith('ops:Provenance'): # TODO: extract this to a constant, used by all metadata key definitions + elif property_name.startswith( + "ops:Provenance" + ): # TODO: extract this to a constant, used by all metadata key definitions # mappings for registry-sweepers are the responsibility of their respective sweepers and should not # be touched by the reindexer sweeper if property_name not in sweepers_missing_property_names: - log.warning(f'Property {property_name} is missing from the index mapping, but is a sweepers metadata attribute and will not be fixed here. Please run the full set of sweepers on this index') + log.warning( + f"Property {property_name} is missing from the index mapping, but is a sweepers metadata attribute and will not be fixed here. Please run the full set of sweepers on this index" + ) sweepers_missing_property_names.add(property_name) else: # if there is no canonical type and it is not a provenance metadata key, do nothing, per jpadams @@ -199,8 +210,7 @@ def accumulate_missing_mappings( if problem_docs_count > 0: log.warning( - f"RESULT: Problems were detected with docs having harvest timestamps between {earliest_problem_doc_harvested_at.isoformat()} and {latest_problem_doc_harvested_at.isoformat()}" - # type: ignore + f"RESULT: Problems were detected with docs having harvest timestamps between {earliest_problem_doc_harvested_at.isoformat()} and {latest_problem_doc_harvested_at.isoformat()}" # type: ignore ) log.warning( f"RESULT: Problems were detected with docs having harvest versions {sorted(problematic_harvest_versions)}" @@ -246,13 +256,13 @@ def format_hits_count(count: int) -> str: return str(count) elif count < 1e5: adjusted_count = count / 1e3 - return '{:,.1f}K'.format(adjusted_count) + return "{:,.1f}K".format(adjusted_count) elif count < 1e6: adjusted_count = count / 1e3 - return '{:,.0f}K'.format(adjusted_count) + return "{:,.0f}K".format(adjusted_count) else: adjusted_count = count / 1e6 - return '{:,.2f}M'.format(adjusted_count) + return "{:,.2f}M".format(adjusted_count) def run( @@ -266,7 +276,6 @@ def run( products_index_name = resolve_multitenant_index_name("registry") ensure_index_mapping(client, products_index_name, REINDEXER_FLAG_METADATA_KEY, "date") - dd_field_types_by_field_name = fetch_dd_field_types(client) def get_updated_hits_count(): @@ -286,12 +295,11 @@ def get_updated_hits_count(): with tqdm( total=total_outstanding_doc_count, - desc=f"Reindexer sweeper progress", + desc="Reindexer sweeper progress", ) as pbar: current_batch_size = min(batch_size_limit, total_outstanding_doc_count) final_batch_is_processed = False while not final_batch_is_processed: - mapping_field_types_by_field_name = get_mapping_field_types_by_field_name(client, products_index_name) missing_mappings = accumulate_missing_mappings( diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 51dd333..521dd99 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -171,8 +171,8 @@ def query_registry_db_with_search_after( # https://discuss.elastic.co/t/problem-with-colon-in-fieldname-where-can-i-find-naming-guidelines/5437/4 # https://discuss.elastic.co/t/revisiting-colons-in-field-names/25005 # TODO: investigate and open ticket with opensearch-py if confirmed - special_characters = {'/', ':'} - query['sort'] = [f for f in sort_fields if any(c in f for c in special_characters)] + special_characters = {"/", ":"} + query["sort"] = [f for f in sort_fields if any(c in f for c in special_characters)] if search_after_values is not None: query["search_after"] = search_after_values @@ -228,8 +228,9 @@ def fetch_func(): if len(value) == 1: search_after_values[idx] = value[0] else: - raise ValueError(f'Failed to flatten array-like search-after value {value} into single element') - + raise ValueError( + f"Failed to flatten array-like search-after value {value} into single element" + ) # This is a temporary, ad-hoc guard against empty/erroneous responses which do not return non-200 status codes. # Previously, this has cause infinite loops in production due to served_hits sticking and never reaching the @@ -341,9 +342,13 @@ def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: if response_content.get("errors"): warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour items_with_problems = [item for item in response_content["items"] if "error" in item["update"]] - if any(item['update']['status'] == 429 and item['update']['error']['type'] == 'circuit_breaking_exception' for item in items_with_problems): - raise RuntimeWarning(f'Bulk updates response includes item with status HTTP429, circuit_breaking_exception/throttled - chunk will need to be resubmitted') - + if any( + item["update"]["status"] == 429 and item["update"]["error"]["type"] == "circuit_breaking_exception" + for item in items_with_problems + ): + raise RuntimeWarning( + "Bulk updates response includes item with status HTTP429, circuit_breaking_exception/throttled - chunk will need to be resubmitted" + ) def get_ids_list_str(ids: List[str]) -> str: max_display_ids = 50