From 94c54d2f269a4e72605a0a68f28365cb4d20d779 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Thu, 7 Sep 2023 18:19:48 -0700 Subject: [PATCH 1/7] extract MajorMinorVersion --- .../utils/majorminorversion.py | 48 +++++++++++++++++++ .../utils/productidentifiers/pdsvid.py | 48 ++----------------- 2 files changed, 52 insertions(+), 44 deletions(-) create mode 100644 src/pds/registrysweepers/utils/majorminorversion.py diff --git a/src/pds/registrysweepers/utils/majorminorversion.py b/src/pds/registrysweepers/utils/majorminorversion.py new file mode 100644 index 0000000..c750711 --- /dev/null +++ b/src/pds/registrysweepers/utils/majorminorversion.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import functools + + +@functools.total_ordering +class MajorMinorVersion: + major_version_minimum = 0 + minor_version_minimum = 0 + + def __init__(self, major_version: int, minor_version: int): + if major_version < self.major_version_minimum: + raise ValueError(f"major_version must be 0 or higher (got {major_version})") + + if minor_version < self.minor_version_minimum: + raise ValueError(f"minor_version must be 0 or higher (got {minor_version})") + + self.major_version = major_version + self.minor_version = minor_version + + @classmethod + def from_string(cls, version_str: str): + major_version_chunk, minor_version_chunk = version_str.split(".") + + major_version = int(major_version_chunk) + minor_version = int(minor_version_chunk) + + return cls(major_version, minor_version) + + def __str__(self): + return f"{self.major_version}.{self.minor_version}" + + def __hash__(self): + return hash(str(self)) + + def __repr__(self): + return f"{self.__class__.__name__}({str(self)})" + + def __eq__(self, other): + return self.major_version == other.major_version and self.minor_version == other.minor_version + + def __lt__(self, other): + if self.major_version != other.major_version: + return self.major_version < other.major_version + elif self.minor_version != other.minor_version: + return self.minor_version < other.minor_version + else: + return False diff --git a/src/pds/registrysweepers/utils/productidentifiers/pdsvid.py b/src/pds/registrysweepers/utils/productidentifiers/pdsvid.py index 3971e98..cf2639b 100644 --- a/src/pds/registrysweepers/utils/productidentifiers/pdsvid.py +++ b/src/pds/registrysweepers/utils/productidentifiers/pdsvid.py @@ -1,46 +1,6 @@ -from __future__ import annotations +from pds.registrysweepers.utils.majorminorversion import MajorMinorVersion -import functools -import logging - -@functools.total_ordering -class PdsVid: - def __init__(self, major_version: int, minor_version: int): - if major_version < 0: - raise ValueError(f"major_version must be 0 or higher (got {major_version})") - - if minor_version < 0: - raise ValueError(f"minor_version must be 0 or higher (got {minor_version})") - - self.major_version = major_version - self.minor_version = minor_version - - @staticmethod - def from_string(vid_string: str) -> PdsVid: - major_version_chunk, minor_version_chunk = vid_string.split(".") - - major_version = int(major_version_chunk) - minor_version = int(minor_version_chunk) - - return PdsVid(major_version, minor_version) - - def __str__(self): - return f"{self.major_version}.{self.minor_version}" - - def __hash__(self): - return hash(str(self)) - - def __repr__(self): - return f"PdsVid({str(self)})" - - def __eq__(self, other): - return self.major_version == other.major_version and self.minor_version == other.minor_version - - def __lt__(self, other: PdsVid): - if self.major_version != other.major_version: - return self.major_version < other.major_version - elif self.minor_version != other.minor_version: - return self.minor_version < other.minor_version - else: - return False +class PdsVid(MajorMinorVersion): + major_version_minimum = 0 + minor_version_minimum = 0 From 5f79de42e792b5a83622280e23c01e428250b8c1 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Thu, 7 Sep 2023 18:32:44 -0700 Subject: [PATCH 2/7] move provenance to its own sub-package --- .../registrysweepers/{provenance.py => provenance/__init__.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/pds/registrysweepers/{provenance.py => provenance/__init__.py} (100%) mode change 100755 => 100644 diff --git a/src/pds/registrysweepers/provenance.py b/src/pds/registrysweepers/provenance/__init__.py old mode 100755 new mode 100644 similarity index 100% rename from src/pds/registrysweepers/provenance.py rename to src/pds/registrysweepers/provenance/__init__.py From 9910afb46fd3caaf9524217080a7cd9a863447ec Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Thu, 7 Sep 2023 18:35:57 -0700 Subject: [PATCH 3/7] remove cruft --- src/pds/registrysweepers/ancestry/__init__.py | 1 - src/pds/registrysweepers/ancestry/generation.py | 1 - src/pds/registrysweepers/ancestry/queries.py | 1 - src/pds/registrysweepers/provenance/__init__.py | 1 - src/pds/registrysweepers/repairkit/__init__.py | 1 - src/pds/registrysweepers/utils/__init__.py | 1 - src/pds/registrysweepers/utils/db/__init__.py | 6 ------ src/pds/registrysweepers/utils/db/host.py | 3 --- tests/pds/registrysweepers/test_ancestry.py | 4 +--- 9 files changed, 1 insertion(+), 18 deletions(-) delete mode 100644 src/pds/registrysweepers/utils/db/host.py diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index 0401e67..894fa1e 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -18,7 +18,6 @@ from pds.registrysweepers.utils import parse_args from pds.registrysweepers.utils.db import write_updated_docs from pds.registrysweepers.utils.db.client import get_opensearch_client -from pds.registrysweepers.utils.db.host import Host from pds.registrysweepers.utils.db.update import Update log = logging.getLogger(__name__) diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index 2a60979..81b16f0 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -12,7 +12,6 @@ from pds.registrysweepers.ancestry.queries import get_collection_ancestry_records_bundles_query from pds.registrysweepers.ancestry.queries import get_collection_ancestry_records_collections_query from pds.registrysweepers.ancestry.queries import get_nonaggregate_ancestry_records_query -from pds.registrysweepers.utils.db.host import Host from pds.registrysweepers.utils.misc import coerce_list_type from pds.registrysweepers.utils.productidentifiers.factory import PdsProductIdentifierFactory from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid diff --git a/src/pds/registrysweepers/ancestry/queries.py b/src/pds/registrysweepers/ancestry/queries.py index 926d58c..18cc68c 100644 --- a/src/pds/registrysweepers/ancestry/queries.py +++ b/src/pds/registrysweepers/ancestry/queries.py @@ -8,7 +8,6 @@ from opensearchpy import OpenSearch from pds.registrysweepers.utils.db import query_registry_db_or_mock -from pds.registrysweepers.utils.db.host import Host log = logging.getLogger(__name__) diff --git a/src/pds/registrysweepers/provenance/__init__.py b/src/pds/registrysweepers/provenance/__init__.py index 74b025f..7e25166 100644 --- a/src/pds/registrysweepers/provenance/__init__.py +++ b/src/pds/registrysweepers/provenance/__init__.py @@ -53,7 +53,6 @@ from pds.registrysweepers.utils.db import get_extant_lidvids from pds.registrysweepers.utils.db import write_updated_docs from pds.registrysweepers.utils.db.client import get_opensearch_client -from pds.registrysweepers.utils.db.host import Host from pds.registrysweepers.utils.db.update import Update log = logging.getLogger(__name__) diff --git a/src/pds/registrysweepers/repairkit/__init__.py b/src/pds/registrysweepers/repairkit/__init__.py index 2835bdd..ca26787 100644 --- a/src/pds/registrysweepers/repairkit/__init__.py +++ b/src/pds/registrysweepers/repairkit/__init__.py @@ -16,7 +16,6 @@ from pds.registrysweepers.utils import parse_args from pds.registrysweepers.utils import query_registry_db from pds.registrysweepers.utils.db.client import get_opensearch_client -from pds.registrysweepers.utils.db.host import Host from pds.registrysweepers.utils.db.update import Update from . import allarrays diff --git a/src/pds/registrysweepers/utils/__init__.py b/src/pds/registrysweepers/utils/__init__.py index 17f0b5f..b8c8043 100644 --- a/src/pds/registrysweepers/utils/__init__.py +++ b/src/pds/registrysweepers/utils/__init__.py @@ -5,7 +5,6 @@ from typing import Union from pds.registrysweepers.utils.db import query_registry_db -from pds.registrysweepers.utils.db.host import Host log = logging.getLogger(__name__) diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 77dedb4..548067a 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -1,9 +1,6 @@ -import functools import json import logging import sys -import urllib.parse -from datetime import timedelta from typing import Callable from typing import Dict from typing import Iterable @@ -11,12 +8,9 @@ from typing import Mapping from typing import Optional -import requests from opensearchpy import OpenSearch -from pds.registrysweepers.utils.db.host import Host from pds.registrysweepers.utils.db.update import Update from pds.registrysweepers.utils.misc import get_random_hex_id -from requests import HTTPError from retry import retry from retry.api import retry_call diff --git a/src/pds/registrysweepers/utils/db/host.py b/src/pds/registrysweepers/utils/db/host.py deleted file mode 100644 index 5bd25f1..0000000 --- a/src/pds/registrysweepers/utils/db/host.py +++ /dev/null @@ -1,3 +0,0 @@ -import collections - -Host = collections.namedtuple("Host", ["password", "url", "username", "verify"]) diff --git a/tests/pds/registrysweepers/test_ancestry.py b/tests/pds/registrysweepers/test_ancestry.py index 1f460b4..5bd44fe 100644 --- a/tests/pds/registrysweepers/test_ancestry.py +++ b/tests/pds/registrysweepers/test_ancestry.py @@ -8,7 +8,6 @@ from pds.registrysweepers import ancestry from pds.registrysweepers.ancestry import AncestryRecord from pds.registrysweepers.ancestry import get_collection_ancestry_records -from pds.registrysweepers.utils.db.host import Host from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid from tests.mocks.registryquerymock import RegistryQueryMock @@ -282,9 +281,8 @@ class AncestryLegacyTypesTestCase(unittest.TestCase): registry_query_mock = RegistryQueryMock(input_file_path) def test_collection_refs_parsing(self): - host_stub = Host(None, None, None, None) query_mock_f = self.registry_query_mock.get_mocked_query - collection_ancestry_records = list(get_collection_ancestry_records(host_stub, query_mock_f)) + collection_ancestry_records = list(get_collection_ancestry_records(None, query_mock_f)) self.assertEqual(1, len(collection_ancestry_records)) From 81cb14af7e1882483a980a18086592376a873e9c Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 11 Sep 2023 14:14:11 -0700 Subject: [PATCH 4/7] fix log error erroneously appearing when query has zero total hits --- src/pds/registrysweepers/utils/db/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 548067a..7a004ad 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -91,7 +91,7 @@ def fetch_func(_scroll_id: str = scroll_id): # expected total hits value. # TODO: Remove this upon implementation of https://github.com/NASA-PDS/registry-sweepers/issues/42 hits_data_present_in_response = len(response_hits) > 0 - if not hits_data_present_in_response: + if not hits_data_present_in_response and served_hits < total_hits: log.error( f"Response for query {query_id} contained no hits when hits were expected. Returned data is incomplete (got {served_hits} of {total_hits} total hits). Response was: {results}" ) From ae87e7c8cd0c1c63047c5317f8878a8039944d99 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 11 Sep 2023 14:55:00 -0700 Subject: [PATCH 5/7] implement data versioning optimization for repairkit sweeper this avoids redundant processing and should make the sweeper "free" if it doesn't have any updates to make --- .../registrysweepers/repairkit/__init__.py | 24 +++++++++++++++---- .../registrysweepers/repairkit/versioning.py | 5 ++++ src/pds/registrysweepers/utils/misc.py | 4 ++++ 3 files changed, 28 insertions(+), 5 deletions(-) create mode 100644 src/pds/registrysweepers/repairkit/versioning.py diff --git a/src/pds/registrysweepers/repairkit/__init__.py b/src/pds/registrysweepers/repairkit/__init__.py index ca26787..333853c 100644 --- a/src/pds/registrysweepers/repairkit/__init__.py +++ b/src/pds/registrysweepers/repairkit/__init__.py @@ -12,11 +12,14 @@ from typing import Union from opensearchpy import OpenSearch +from pds.registrysweepers.repairkit.versioning import SWEEPERS_REPAIRKIT_VERSION from pds.registrysweepers.utils import configure_logging from pds.registrysweepers.utils import parse_args from pds.registrysweepers.utils import query_registry_db from pds.registrysweepers.utils.db.client import get_opensearch_client +from pds.registrysweepers.utils.db.indexing import ensure_index_mapping from pds.registrysweepers.utils.db.update import Update +from pds.registrysweepers.utils.misc import get_sweeper_version_metadata_key from . import allarrays from ..utils.db import write_updated_docs @@ -50,12 +53,14 @@ def function_name (document:{}, fieldname:str)->{} log = logging.getLogger(__name__) -def generate_updates(docs: Iterable[Dict]) -> Iterable[Update]: +def generate_updates( + docs: Iterable[Dict], repairkit_version_metadata_key: str, repairkit_version: int +) -> Iterable[Update]: """Lazily generate necessary Update objects for a collection of db documents""" for document in docs: id = document["_id"] src = document["_source"] - repairs = {} + repairs = {repairkit_version_metadata_key: int(repairkit_version)} log.debug(f"applying repairkit sweeper to document: {id}") for fieldname, data in src.items(): for regex, funcs in REPAIR_TOOLS.items(): @@ -74,10 +79,19 @@ def run( log_level: int = logging.INFO, ): configure_logging(filepath=log_filepath, log_level=log_level) - log.info("Starting repairkit sweeper processing...") + log.info(f"Starting repairkit v{SWEEPERS_REPAIRKIT_VERSION} sweeper processing...") - all_docs = query_registry_db(client, {"query": {"match_all": {}}}, {}) - updates = generate_updates(all_docs) + repairkit_version_metadata_key = get_sweeper_version_metadata_key("repairkit") + + unprocessed_docs_query = { + "query": { + "bool": {"must_not": [{"range": {repairkit_version_metadata_key: {"gte": SWEEPERS_REPAIRKIT_VERSION}}}]} + } + } + + all_docs = query_registry_db(client, unprocessed_docs_query, {}) + updates = generate_updates(all_docs, repairkit_version_metadata_key, SWEEPERS_REPAIRKIT_VERSION) + ensure_index_mapping(client, "registry", repairkit_version_metadata_key, "integer") write_updated_docs(client, updates) log.info("Repairkit sweeper processing complete!") diff --git a/src/pds/registrysweepers/repairkit/versioning.py b/src/pds/registrysweepers/repairkit/versioning.py new file mode 100644 index 0000000..06cde8f --- /dev/null +++ b/src/pds/registrysweepers/repairkit/versioning.py @@ -0,0 +1,5 @@ +# Defines constants used for versioning updated documents with the in-use version of sweepers +# SWEEPERS_VERSION must be incremented any time sweepers is changed in a way which requires reprocessing of +# previously-processed data +SWEEPERS_REPAIRKIT_VERSION = 1 +SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY = "ops:Provenance/ops:registry_sweepers_repairkit_version" diff --git a/src/pds/registrysweepers/utils/misc.py b/src/pds/registrysweepers/utils/misc.py index 0656bd9..b5118b0 100644 --- a/src/pds/registrysweepers/utils/misc.py +++ b/src/pds/registrysweepers/utils/misc.py @@ -47,3 +47,7 @@ def wrapped_f(*args, **kwargs): return resp return wrapped_f + + +def get_sweeper_version_metadata_key(sweeper_name: str) -> str: + return f"ops:Provenance/ops:registry_sweepers_{sweeper_name}_version" From a0d0b2a3f05cbb6a63951f47457c2b9d35d99d6e Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 11 Sep 2023 21:35:28 -0700 Subject: [PATCH 6/7] demote noisy log --- src/pds/registrysweepers/repairkit/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pds/registrysweepers/repairkit/__init__.py b/src/pds/registrysweepers/repairkit/__init__.py index 333853c..25a65d4 100644 --- a/src/pds/registrysweepers/repairkit/__init__.py +++ b/src/pds/registrysweepers/repairkit/__init__.py @@ -69,7 +69,7 @@ def generate_updates( repairs.update(func(src, fieldname)) if repairs: - log.info(f"Writing repairs to document: {id}") + log.debug(f"Writing repairs to document: {id}") yield Update(id=id, content=repairs) From ecb261f519d54607a1cbb784e48d3ed4dd3c75f0 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 11 Sep 2023 21:37:16 -0700 Subject: [PATCH 7/7] bump up request timeout values if necessary, these can be extracted to runtime arguments --- src/pds/registrysweepers/utils/db/__init__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 7a004ad..a47bdd3 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -33,6 +33,7 @@ def query_registry_db( """ scroll_keepalive = f"{scroll_keepalive_minutes}m" + request_timeout = 20 query_id = get_random_hex_id() # This is just used to differentiate queries during logging log.info(f"Initiating query with id {query_id}: {query}") @@ -51,6 +52,7 @@ def fetch_func(): index=index_name, body=query, scroll=scroll_keepalive, + request_timeout=request_timeout, size=page_size, _source_includes=_source.get("includes", []), # TODO: Break out from the enclosing _source object _source_excludes=_source.get("excludes", []), # TODO: Break out from the enclosing _source object @@ -59,7 +61,7 @@ def fetch_func(): else: def fetch_func(_scroll_id: str = scroll_id): - return client.scroll(scroll_id=_scroll_id, scroll=scroll_keepalive) + return client.scroll(scroll_id=_scroll_id, scroll=scroll_keepalive, request_timeout=request_timeout) results = retry_call( fetch_func, @@ -174,7 +176,8 @@ def update_as_statements(update: Update) -> Iterable[str]: def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: Iterable[str]): bulk_data = "\n".join(bulk_updates) + "\n" - response_content = client.bulk(index=index_name, body=bulk_data) + request_timeout = 60 + response_content = client.bulk(index=index_name, body=bulk_data, request_timeout=request_timeout) if response_content.get("errors"): warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour