Skip to content

Commit

Permalink
Implement reindexer sweeper (#149)
Browse files Browse the repository at this point in the history
* [skeleton] implement reindexer sweeper

* update metadata flag to use date instead of boolean

* [revert me] add test init code

* implement reindexer sweeper, with types resolved from the *-dd index, or as "keyword" if not present there

Manual tests against docker registry:
 - logged counts are correct
 - missing mappings are added
 - types of missing mappings change according to the resolved typename
 - metadata updates are written to db
 - metadata updates are sufficient to trigger re-index, causing previously-unsearchable properties to become searchable.
 - presence of metadata attribute excludes document from document set on subsequent runs

* implement harvest-time filter to ensure that products harvested mid-sweep do not erroneously get flagged as processed

* implement logging of problematic harvest timestamp span and harvest software versions

* clean up code

* remove test code

* improve comment

* add mypy ignores - None-guard is provided by conditionals

* add registry-dd to allowed index types for resolve_multitenant_index_name()

* ensure reindexer sweeper captures all relevant documents with a single sweep

* improve logging

* implement batching approach in reindexer sweeper

* squash! implement batching approach in reindexer sweeper

* [weeeird bugfix] Patch apparent issues when paginating. See comments

* map special-case properties onto their types and incorporate them into the resolution logic

* implement stall while update indexing queue is backed up

* disable noisy log

* tweak stall time/log

* make reindexer hits count more human-friendly in logs

* clean up logging

* exclude ops:Provenance* properties from canonical_type_undefined_property_names

* bump hits_stall_tolerance from 5% to 10% of batch_size_limit
this should prevent unintended continuous looping

* fix stall logic

* fix format_hits_count()

* change type hint to indicate that consumable iterators are not appropriate
this is because the retry would pass the consumed iterator to subsequent calls

* Incorporate detection/log/retry of HTTP429 (circuit-breaking throttle)

* remove manual stall logic

* disable default typing, per jpadams

* re-enable generation of updates for docs having properties not in mappings
protection against race condition is provided by harvest-time constraint to LT sweeper execution timestamp

* support all ISO-formatted harvest timestamp strings
dateutil is the official third-party library for parsing

* correct erroneous log message

* bugfix edge-cases

* demote noisy log

* deduplicate missing sweepers property logs

* remove cruft

* flesh out static type mappings

* fix infinite loop when there are fewer hits than a full batch

* comment out log_filepath

* lint

* add explanation
  • Loading branch information
alexdunnjpl authored Nov 26, 2024
1 parent 4c3812d commit bd99a08
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 6 deletions.
7 changes: 5 additions & 2 deletions docker/sweepers_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from typing import Callable

from pds.registrysweepers import provenance, ancestry, repairkit, legacy_registry_sync
from pds.registrysweepers.reindexer import main as reindexer
from pds.registrysweepers.utils import configure_logging, parse_log_level
from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment
from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since
Expand All @@ -85,7 +86,8 @@ def run_factory(sweeper_f: Callable) -> Callable:
return functools.partial(
sweeper_f,
client=get_opensearch_client_from_environment(verify_certs=True if not dev_mode else False),
log_filepath='registry-sweepers.log',
# enable for development if required - not necessary in production
# log_filepath='registry-sweepers.log',
log_level=log_level
)

Expand All @@ -107,7 +109,8 @@ def run_factory(sweeper_f: Callable) -> Callable:
sweepers = [
repairkit.run,
provenance.run,
ancestry.run
ancestry.run,
reindexer.run
]

for option, sweeper in optional_sweepers.items():
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ install_requires =
botocore~=1.34.91
botocore-stubs~=1.34.94
requests-aws4auth~=1.2.3
python-dateutil~=2.9.0

# Change this to False if you use things like __file__ or __path__—which you
# shouldn't use anyway, because that's what ``pkg_resources`` is for 🙂
Expand Down
2 changes: 1 addition & 1 deletion src/pds/registrysweepers/ancestry/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def generate_nonaggregate_and_collection_records_iteratively(

for lid, collections_records_for_lid in collection_records_by_lid.items():
if all([record.skip_write for record in collections_records_for_lid]):
log.info(f"Skipping updates for up-to-date collection family: {str(lid)}")
log.debug(f"Skipping updates for up-to-date collection family: {str(lid)}")
continue
else:
log.info(
Expand Down
Empty file.
1 change: 1 addition & 0 deletions src/pds/registrysweepers/reindexer/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REINDEXER_FLAG_METADATA_KEY = "ops:Provenance/ops:reindexed_at"
383 changes: 383 additions & 0 deletions src/pds/registrysweepers/reindexer/main.py

Large diffs are not rendered by default.

36 changes: 35 additions & 1 deletion src/pds/registrysweepers/utils/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ def query_registry_db_with_search_after(

with tqdm(total=expected_hits, desc=f"Query {query_id}") as pbar:
while more_data_exists:
# Manually set sort - this is required for subsequent calls, despite being passed in fetch_func's call to
# client.search as sort kwarg.
# It is unclear why this issue is only presenting now - edunn 20241023
# It appears that OpenSearch.search() sort kwarg behaves inconsistently if the values contain certain
# characters. It is unclear which of /: is the issue but it is suggested that :-+^ may be problematic - edunn 20241105
# Related: https://discuss.elastic.co/t/query-a-field-that-has-a-colon/323966
# https://discuss.elastic.co/t/problem-with-colon-in-fieldname-where-can-i-find-naming-guidelines/5437/4
# https://discuss.elastic.co/t/revisiting-colons-in-field-names/25005
# TODO: investigate and open ticket with opensearch-py if confirmed
special_characters = {"/", ":"}
query["sort"] = [f for f in sort_fields if any(c in f for c in special_characters)]

if search_after_values is not None:
query["search_after"] = search_after_values
log.debug(
Expand Down Expand Up @@ -205,6 +217,21 @@ def fetch_func():
# simpler to set the value after every hit than worry about OBO errors detecting the last hit in the page
search_after_values = [hit["_source"].get(field) for field in sort_fields]

# Flatten single-element search-after-values. Attempting to sort/search-after on MCP AOSS by
# ops:Harvest_Info/ops:harvest_date_time is throwing
# RequestError(400, 'parsing_exception', 'Expected [VALUE_STRING] or [VALUE_NUMBER] or
# [VALUE_BOOLEAN] or [VALUE_NULL] but found [START_ARRAY] inside search_after.')
# It is unclear why this issue is only presenting now - edunn 20241023
if search_after_values is not None:
for idx, value in enumerate(search_after_values):
if isinstance(value, list):
if len(value) == 1:
search_after_values[idx] = value[0]
else:
raise ValueError(
f"Failed to flatten array-like search-after value {value} into single element"
)

# This is a temporary, ad-hoc guard against empty/erroneous responses which do not return non-200 status codes.
# Previously, this has cause infinite loops in production due to served_hits sticking and never reaching the
# expected total hits value.
Expand Down Expand Up @@ -306,7 +333,7 @@ def update_as_statements(update: Update) -> Iterable[str]:


@retry(tries=6, delay=15, backoff=2, logger=log)
def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: Iterable[str]):
def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: List[str]):
bulk_data = "\n".join(bulk_updates) + "\n"

request_timeout = 180
Expand All @@ -315,6 +342,13 @@ def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates:
if response_content.get("errors"):
warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour
items_with_problems = [item for item in response_content["items"] if "error" in item["update"]]
if any(
item["update"]["status"] == 429 and item["update"]["error"]["type"] == "circuit_breaking_exception"
for item in items_with_problems
):
raise RuntimeWarning(
"Bulk updates response includes item with status HTTP429, circuit_breaking_exception/throttled - chunk will need to be resubmitted"
)

def get_ids_list_str(ids: List[str]) -> str:
max_display_ids = 50
Expand Down
6 changes: 5 additions & 1 deletion src/pds/registrysweepers/utils/db/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@


def ensure_index_mapping(client: OpenSearch, index_name: str, property_name: str, property_type: str):
"""Provides an easy-to-use wrapper for ensuring the presence of a given property name/type in a given index"""
"""
Provides an easy-to-use wrapper for ensuring the presence of a given property name/type in a given index.
N.B. This cannot change the type of a mapping, as modification/deletion is impossible in ES/OS. If the mapping
already exists, matching type or not, the function will gracefully fail and log an HTTP400 error.
"""
client.indices.put_mapping(index=index_name, body={"properties": {property_name: {"type": property_type}}})
2 changes: 1 addition & 1 deletion src/pds/registrysweepers/utils/db/multitenancy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


def resolve_multitenant_index_name(index_type: str):
supported_index_types = {"registry", "registry-refs"}
supported_index_types = {"registry", "registry-refs", "registry-dd"}
node_id = os.environ.get("MULTITENANCY_NODE_ID", "").strip(" ")

if node_id == "":
Expand Down

0 comments on commit bd99a08

Please sign in to comment.