Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
alexdunnjpl committed Nov 26, 2024
1 parent ebc89ec commit 1b00788
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 49 deletions.
92 changes: 50 additions & 42 deletions src/pds/registrysweepers/reindexer/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
from datetime import datetime, timedelta
import math
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from time import sleep
from typing import Collection
Expand All @@ -8,7 +10,6 @@
from typing import Union

import dateutil.parser
import math
from opensearchpy import OpenSearch
from pds.registrysweepers.reindexer.constants import REINDEXER_FLAG_METADATA_KEY
from pds.registrysweepers.utils import configure_logging
Expand Down Expand Up @@ -87,27 +88,27 @@ def accumulate_missing_mappings(
# Anything with prefix 'ops:Provenance' is excluded, as these properties are the responsibility of their
# respective sweepers.
special_case_property_types_by_name = {
'@timestamp': None,
'@version': None,
'_package_id': None,
'description': 'text',
'lid': 'keyword',
'lidvid': 'keyword',
'ops:Harvest_Info/ops:harvest_date_time': 'date',
'ops:Label_File_Info/ops:json_blob': None,
'product_class': 'keyword',
'ref_lid_associate': 'keyword',
'ref_lid_collection': 'keyword',
'ref_lid_collection_secondary': 'keyword',
'ref_lid_data': 'keyword',
'ref_lid_document': 'keyword',
'ref_lid_facility': 'keyword',
'ref_lid_instrument': 'keyword',
'ref_lid_instrument_host': 'keyword',
'ref_lid_investigation': 'keyword',
'ref_lid_target': 'keyword',
'ref_lid_telescope': 'keyword',
'title': 'text',
"@timestamp": None,
"@version": None,
"_package_id": None,
"description": "text",
"lid": "keyword",
"lidvid": "keyword",
"ops:Harvest_Info/ops:harvest_date_time": "date",
"ops:Label_File_Info/ops:json_blob": None,
"product_class": "keyword",
"ref_lid_associate": "keyword",
"ref_lid_collection": "keyword",
"ref_lid_collection_secondary": "keyword",
"ref_lid_data": "keyword",
"ref_lid_document": "keyword",
"ref_lid_facility": "keyword",
"ref_lid_instrument": "keyword",
"ref_lid_instrument_host": "keyword",
"ref_lid_investigation": "keyword",
"ref_lid_target": "keyword",
"ref_lid_telescope": "keyword",
"title": "text",
# 'vid' # TODO: need to determine what this should be, as keyword lexical(?) sorting will be a problem
}

Expand All @@ -128,19 +129,25 @@ def accumulate_missing_mappings(

for property_name, value in doc["_source"].items():
# Resolve canonical type from data dictionary or - failing that - from the hardcoded types
canonical_type = dd_field_types_by_name.get(property_name) or special_case_property_types_by_name.get(property_name)
canonical_type = dd_field_types_by_name.get(property_name) or special_case_property_types_by_name.get(
property_name
)
current_mapping_type = mapping_field_types_by_field_name.get(property_name)

mapping_missing = property_name not in mapping_field_types_by_field_name
canonical_type_is_defined = canonical_type is not None
mapping_is_bad = canonical_type != current_mapping_type \
and canonical_type is not None \
and current_mapping_type is not None

if not canonical_type_is_defined \
and property_name not in special_case_property_types_by_name \
and not property_name.startswith('ops:Provenance') \
and property_name not in canonical_type_undefined_property_names:
mapping_is_bad = (
canonical_type != current_mapping_type
and canonical_type is not None
and current_mapping_type is not None
)

if (
not canonical_type_is_defined
and property_name not in special_case_property_types_by_name
and not property_name.startswith("ops:Provenance")
and property_name not in canonical_type_undefined_property_names
):
log.warning(
f"Property {property_name} does not have an entry in the data dictionary index or hardcoded mappings - this may indicate a problem"
)
Expand Down Expand Up @@ -183,11 +190,15 @@ def accumulate_missing_mappings(
f'Property {property_name} will be updated to type "{canonical_type}" from data dictionary'
)
missing_mapping_updates[property_name] = canonical_type # type: ignore
elif property_name.startswith('ops:Provenance'): # TODO: extract this to a constant, used by all metadata key definitions
elif property_name.startswith(
"ops:Provenance"
): # TODO: extract this to a constant, used by all metadata key definitions
# mappings for registry-sweepers are the responsibility of their respective sweepers and should not
# be touched by the reindexer sweeper
if property_name not in sweepers_missing_property_names:
log.warning(f'Property {property_name} is missing from the index mapping, but is a sweepers metadata attribute and will not be fixed here. Please run the full set of sweepers on this index')
log.warning(
f"Property {property_name} is missing from the index mapping, but is a sweepers metadata attribute and will not be fixed here. Please run the full set of sweepers on this index"
)
sweepers_missing_property_names.add(property_name)
else:
# if there is no canonical type and it is not a provenance metadata key, do nothing, per jpadams
Expand All @@ -199,8 +210,7 @@ def accumulate_missing_mappings(

if problem_docs_count > 0:
log.warning(
f"RESULT: Problems were detected with docs having harvest timestamps between {earliest_problem_doc_harvested_at.isoformat()} and {latest_problem_doc_harvested_at.isoformat()}"
# type: ignore
f"RESULT: Problems were detected with docs having harvest timestamps between {earliest_problem_doc_harvested_at.isoformat()} and {latest_problem_doc_harvested_at.isoformat()}" # type: ignore
)
log.warning(
f"RESULT: Problems were detected with docs having harvest versions {sorted(problematic_harvest_versions)}"
Expand Down Expand Up @@ -246,13 +256,13 @@ def format_hits_count(count: int) -> str:
return str(count)
elif count < 1e5:
adjusted_count = count / 1e3
return '{:,.1f}K'.format(adjusted_count)
return "{:,.1f}K".format(adjusted_count)
elif count < 1e6:
adjusted_count = count / 1e3
return '{:,.0f}K'.format(adjusted_count)
return "{:,.0f}K".format(adjusted_count)
else:
adjusted_count = count / 1e6
return '{:,.2f}M'.format(adjusted_count)
return "{:,.2f}M".format(adjusted_count)


def run(
Expand All @@ -266,7 +276,6 @@ def run(
products_index_name = resolve_multitenant_index_name("registry")
ensure_index_mapping(client, products_index_name, REINDEXER_FLAG_METADATA_KEY, "date")


dd_field_types_by_field_name = fetch_dd_field_types(client)

def get_updated_hits_count():
Expand All @@ -286,12 +295,11 @@ def get_updated_hits_count():

with tqdm(
total=total_outstanding_doc_count,
desc=f"Reindexer sweeper progress",
desc="Reindexer sweeper progress",
) as pbar:
current_batch_size = min(batch_size_limit, total_outstanding_doc_count)
final_batch_is_processed = False
while not final_batch_is_processed:

mapping_field_types_by_field_name = get_mapping_field_types_by_field_name(client, products_index_name)

missing_mappings = accumulate_missing_mappings(
Expand Down
19 changes: 12 additions & 7 deletions src/pds/registrysweepers/utils/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ def query_registry_db_with_search_after(
# https://discuss.elastic.co/t/problem-with-colon-in-fieldname-where-can-i-find-naming-guidelines/5437/4
# https://discuss.elastic.co/t/revisiting-colons-in-field-names/25005
# TODO: investigate and open ticket with opensearch-py if confirmed
special_characters = {'/', ':'}
query['sort'] = [f for f in sort_fields if any(c in f for c in special_characters)]
special_characters = {"/", ":"}
query["sort"] = [f for f in sort_fields if any(c in f for c in special_characters)]

if search_after_values is not None:
query["search_after"] = search_after_values
Expand Down Expand Up @@ -228,8 +228,9 @@ def fetch_func():
if len(value) == 1:
search_after_values[idx] = value[0]
else:
raise ValueError(f'Failed to flatten array-like search-after value {value} into single element')

raise ValueError(
f"Failed to flatten array-like search-after value {value} into single element"
)

# This is a temporary, ad-hoc guard against empty/erroneous responses which do not return non-200 status codes.
# Previously, this has cause infinite loops in production due to served_hits sticking and never reaching the
Expand Down Expand Up @@ -341,9 +342,13 @@ def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates:
if response_content.get("errors"):
warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour
items_with_problems = [item for item in response_content["items"] if "error" in item["update"]]
if any(item['update']['status'] == 429 and item['update']['error']['type'] == 'circuit_breaking_exception' for item in items_with_problems):
raise RuntimeWarning(f'Bulk updates response includes item with status HTTP429, circuit_breaking_exception/throttled - chunk will need to be resubmitted')

if any(
item["update"]["status"] == 429 and item["update"]["error"]["type"] == "circuit_breaking_exception"
for item in items_with_problems
):
raise RuntimeWarning(
"Bulk updates response includes item with status HTTP429, circuit_breaking_exception/throttled - chunk will need to be resubmitted"
)

def get_ids_list_str(ids: List[str]) -> str:
max_display_ids = 50
Expand Down

0 comments on commit 1b00788

Please sign in to comment.