Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sweeper data versioning for repairkit #73

Merged
merged 7 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/pds/registrysweepers/ancestry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from pds.registrysweepers.utils import parse_args
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.client import get_opensearch_client
from pds.registrysweepers.utils.db.host import Host
from pds.registrysweepers.utils.db.update import Update

log = logging.getLogger(__name__)
Expand Down
1 change: 0 additions & 1 deletion src/pds/registrysweepers/ancestry/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from pds.registrysweepers.ancestry.queries import get_collection_ancestry_records_bundles_query
from pds.registrysweepers.ancestry.queries import get_collection_ancestry_records_collections_query
from pds.registrysweepers.ancestry.queries import get_nonaggregate_ancestry_records_query
from pds.registrysweepers.utils.db.host import Host
from pds.registrysweepers.utils.misc import coerce_list_type
from pds.registrysweepers.utils.productidentifiers.factory import PdsProductIdentifierFactory
from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid
Expand Down
1 change: 0 additions & 1 deletion src/pds/registrysweepers/ancestry/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from opensearchpy import OpenSearch
from pds.registrysweepers.utils.db import query_registry_db_or_mock
from pds.registrysweepers.utils.db.host import Host

log = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
from pds.registrysweepers.utils.db import get_extant_lidvids
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.client import get_opensearch_client
from pds.registrysweepers.utils.db.host import Host
from pds.registrysweepers.utils.db.update import Update

log = logging.getLogger(__name__)
Expand Down
27 changes: 20 additions & 7 deletions src/pds/registrysweepers/repairkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
from typing import Union

from opensearchpy import OpenSearch
from pds.registrysweepers.repairkit.versioning import SWEEPERS_REPAIRKIT_VERSION
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils import parse_args
from pds.registrysweepers.utils import query_registry_db
from pds.registrysweepers.utils.db.client import get_opensearch_client
from pds.registrysweepers.utils.db.host import Host
from pds.registrysweepers.utils.db.indexing import ensure_index_mapping
from pds.registrysweepers.utils.db.update import Update
from pds.registrysweepers.utils.misc import get_sweeper_version_metadata_key

from . import allarrays
from ..utils.db import write_updated_docs
Expand Down Expand Up @@ -51,12 +53,14 @@ def function_name (document:{}, fieldname:str)->{}
log = logging.getLogger(__name__)


def generate_updates(docs: Iterable[Dict]) -> Iterable[Update]:
def generate_updates(
docs: Iterable[Dict], repairkit_version_metadata_key: str, repairkit_version: int
) -> Iterable[Update]:
"""Lazily generate necessary Update objects for a collection of db documents"""
for document in docs:
id = document["_id"]
src = document["_source"]
repairs = {}
repairs = {repairkit_version_metadata_key: int(repairkit_version)}
log.debug(f"applying repairkit sweeper to document: {id}")
for fieldname, data in src.items():
for regex, funcs in REPAIR_TOOLS.items():
Expand All @@ -65,7 +69,7 @@ def generate_updates(docs: Iterable[Dict]) -> Iterable[Update]:
repairs.update(func(src, fieldname))

if repairs:
log.info(f"Writing repairs to document: {id}")
log.debug(f"Writing repairs to document: {id}")
yield Update(id=id, content=repairs)


Expand All @@ -75,10 +79,19 @@ def run(
log_level: int = logging.INFO,
):
configure_logging(filepath=log_filepath, log_level=log_level)
log.info("Starting repairkit sweeper processing...")
log.info(f"Starting repairkit v{SWEEPERS_REPAIRKIT_VERSION} sweeper processing...")

all_docs = query_registry_db(client, {"query": {"match_all": {}}}, {})
updates = generate_updates(all_docs)
repairkit_version_metadata_key = get_sweeper_version_metadata_key("repairkit")

unprocessed_docs_query = {
"query": {
"bool": {"must_not": [{"range": {repairkit_version_metadata_key: {"gte": SWEEPERS_REPAIRKIT_VERSION}}}]}
}
}

all_docs = query_registry_db(client, unprocessed_docs_query, {})
updates = generate_updates(all_docs, repairkit_version_metadata_key, SWEEPERS_REPAIRKIT_VERSION)
ensure_index_mapping(client, "registry", repairkit_version_metadata_key, "integer")
write_updated_docs(client, updates)

log.info("Repairkit sweeper processing complete!")
Expand Down
5 changes: 5 additions & 0 deletions src/pds/registrysweepers/repairkit/versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Defines constants used for versioning updated documents with the in-use version of sweepers
# SWEEPERS_VERSION must be incremented any time sweepers is changed in a way which requires reprocessing of
# previously-processed data
SWEEPERS_REPAIRKIT_VERSION = 1
SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY = "ops:Provenance/ops:registry_sweepers_repairkit_version"
1 change: 0 additions & 1 deletion src/pds/registrysweepers/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import Union

from pds.registrysweepers.utils.db import query_registry_db
from pds.registrysweepers.utils.db.host import Host

log = logging.getLogger(__name__)

Expand Down
15 changes: 6 additions & 9 deletions src/pds/registrysweepers/utils/db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
import functools
import json
import logging
import sys
import urllib.parse
from datetime import timedelta
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Mapping
from typing import Optional

import requests
from opensearchpy import OpenSearch
from pds.registrysweepers.utils.db.host import Host
from pds.registrysweepers.utils.db.update import Update
from pds.registrysweepers.utils.misc import get_random_hex_id
from requests import HTTPError
from retry import retry
from retry.api import retry_call

Expand All @@ -39,6 +33,7 @@ def query_registry_db(
"""

scroll_keepalive = f"{scroll_keepalive_minutes}m"
request_timeout = 20
query_id = get_random_hex_id() # This is just used to differentiate queries during logging
log.info(f"Initiating query with id {query_id}: {query}")

Expand All @@ -57,6 +52,7 @@ def fetch_func():
index=index_name,
body=query,
scroll=scroll_keepalive,
request_timeout=request_timeout,
size=page_size,
_source_includes=_source.get("includes", []), # TODO: Break out from the enclosing _source object
_source_excludes=_source.get("excludes", []), # TODO: Break out from the enclosing _source object
Expand All @@ -65,7 +61,7 @@ def fetch_func():
else:

def fetch_func(_scroll_id: str = scroll_id):
return client.scroll(scroll_id=_scroll_id, scroll=scroll_keepalive)
return client.scroll(scroll_id=_scroll_id, scroll=scroll_keepalive, request_timeout=request_timeout)

results = retry_call(
fetch_func,
Expand Down Expand Up @@ -97,7 +93,7 @@ def fetch_func(_scroll_id: str = scroll_id):
# expected total hits value.
# TODO: Remove this upon implementation of https://github.com/NASA-PDS/registry-sweepers/issues/42
hits_data_present_in_response = len(response_hits) > 0
if not hits_data_present_in_response:
if not hits_data_present_in_response and served_hits < total_hits:
log.error(
f"Response for query {query_id} contained no hits when hits were expected. Returned data is incomplete (got {served_hits} of {total_hits} total hits). Response was: {results}"
)
Expand Down Expand Up @@ -180,7 +176,8 @@ def update_as_statements(update: Update) -> Iterable[str]:
def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: Iterable[str]):
bulk_data = "\n".join(bulk_updates) + "\n"

response_content = client.bulk(index=index_name, body=bulk_data)
request_timeout = 60
response_content = client.bulk(index=index_name, body=bulk_data, request_timeout=request_timeout)

if response_content.get("errors"):
warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour
Expand Down
3 changes: 0 additions & 3 deletions src/pds/registrysweepers/utils/db/host.py

This file was deleted.

48 changes: 48 additions & 0 deletions src/pds/registrysweepers/utils/majorminorversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from __future__ import annotations

import functools


@functools.total_ordering
class MajorMinorVersion:
major_version_minimum = 0
minor_version_minimum = 0

def __init__(self, major_version: int, minor_version: int):
if major_version < self.major_version_minimum:
raise ValueError(f"major_version must be 0 or higher (got {major_version})")

if minor_version < self.minor_version_minimum:
raise ValueError(f"minor_version must be 0 or higher (got {minor_version})")

self.major_version = major_version
self.minor_version = minor_version

@classmethod
def from_string(cls, version_str: str):
major_version_chunk, minor_version_chunk = version_str.split(".")

major_version = int(major_version_chunk)
minor_version = int(minor_version_chunk)

return cls(major_version, minor_version)

def __str__(self):
return f"{self.major_version}.{self.minor_version}"

def __hash__(self):
return hash(str(self))

def __repr__(self):
return f"{self.__class__.__name__}({str(self)})"

def __eq__(self, other):
return self.major_version == other.major_version and self.minor_version == other.minor_version

def __lt__(self, other):
if self.major_version != other.major_version:
return self.major_version < other.major_version
elif self.minor_version != other.minor_version:
return self.minor_version < other.minor_version
else:
return False
4 changes: 4 additions & 0 deletions src/pds/registrysweepers/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ def wrapped_f(*args, **kwargs):
return resp

return wrapped_f


def get_sweeper_version_metadata_key(sweeper_name: str) -> str:
return f"ops:Provenance/ops:registry_sweepers_{sweeper_name}_version"
48 changes: 4 additions & 44 deletions src/pds/registrysweepers/utils/productidentifiers/pdsvid.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,6 @@
from __future__ import annotations
from pds.registrysweepers.utils.majorminorversion import MajorMinorVersion

import functools
import logging


@functools.total_ordering
class PdsVid:
def __init__(self, major_version: int, minor_version: int):
if major_version < 0:
raise ValueError(f"major_version must be 0 or higher (got {major_version})")

if minor_version < 0:
raise ValueError(f"minor_version must be 0 or higher (got {minor_version})")

self.major_version = major_version
self.minor_version = minor_version

@staticmethod
def from_string(vid_string: str) -> PdsVid:
major_version_chunk, minor_version_chunk = vid_string.split(".")

major_version = int(major_version_chunk)
minor_version = int(minor_version_chunk)

return PdsVid(major_version, minor_version)

def __str__(self):
return f"{self.major_version}.{self.minor_version}"

def __hash__(self):
return hash(str(self))

def __repr__(self):
return f"PdsVid({str(self)})"

def __eq__(self, other):
return self.major_version == other.major_version and self.minor_version == other.minor_version

def __lt__(self, other: PdsVid):
if self.major_version != other.major_version:
return self.major_version < other.major_version
elif self.minor_version != other.minor_version:
return self.minor_version < other.minor_version
else:
return False
class PdsVid(MajorMinorVersion):
major_version_minimum = 0
minor_version_minimum = 0
4 changes: 1 addition & 3 deletions tests/pds/registrysweepers/test_ancestry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from pds.registrysweepers import ancestry
from pds.registrysweepers.ancestry import AncestryRecord
from pds.registrysweepers.ancestry import get_collection_ancestry_records
from pds.registrysweepers.utils.db.host import Host
from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid

from tests.mocks.registryquerymock import RegistryQueryMock
Expand Down Expand Up @@ -282,9 +281,8 @@ class AncestryLegacyTypesTestCase(unittest.TestCase):
registry_query_mock = RegistryQueryMock(input_file_path)

def test_collection_refs_parsing(self):
host_stub = Host(None, None, None, None)
query_mock_f = self.registry_query_mock.get_mocked_query
collection_ancestry_records = list(get_collection_ancestry_records(host_stub, query_mock_f))
collection_ancestry_records = list(get_collection_ancestry_records(None, query_mock_f))

self.assertEqual(1, len(collection_ancestry_records))

Expand Down
Loading