diff --git a/docker/sweepers_driver.py b/docker/sweepers_driver.py index b7cb602..16e3e47 100755 --- a/docker/sweepers_driver.py +++ b/docker/sweepers_driver.py @@ -55,6 +55,7 @@ # import functools +import inspect import json import logging import os @@ -94,6 +95,7 @@ log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO')) + def run_factory(sweeper_f: Callable) -> Callable: return functools.partial( sweeper_f, @@ -106,15 +108,20 @@ def run_factory(sweeper_f: Callable) -> Callable: ) -run_provenance = run_factory(provenance.run) -run_ancestry = run_factory(ancestry.run) -run_repairkit = run_factory(repairkit.run) +# Define sweepers to be run here, in order of execution +sweepers = [ + repairkit.run, + provenance.run, + ancestry.run +] + +sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers] +log.info(f'Running sweepers: {sweeper_descriptions}') -log.info('Running sweepers') execution_begin = datetime.now() -run_repairkit() -run_provenance() -run_ancestry() +for sweeper in sweepers: + run_sweeper_f = run_factory(sweeper) + run_sweeper_f() log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(execution_begin)}') diff --git a/setup.cfg b/setup.cfg index 6ce54c9..160e3e9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -42,25 +42,26 @@ python_requires = >= 3.9 [options.extras_require] dev = - black - flake8 - flake8-bugbear - flake8-docstrings - pep8-naming - mypy - pydocstyle - coverage - pytest - pytest-cov - pytest-watch - pytest-xdist - pre-commit - sphinx - sphinx-rtd-theme - tox + black~=23.7.0 + flake8~=6.1.0 + flake8-bugbear~=23.7.10 + flake8-docstrings~=1.7.0 + pep8-naming~=0.13.3 + mypy~=1.5.1 + pydocstyle~=6.3.0 + coverage~=7.3.0 + pytest~=7.4.0 + pytest-cov~=4.1.0 + pytest-watch~=4.2.0 + pytest-xdist~=3.3.1 + pre-commit~=3.3.3 + sphinx~=3.2.1 + sphinx-rtd-theme~=0.5.0 + tox~=4.11.0 types_requests~=2.28 types-retry~=0.9.9.4 - types-setuptools + types-setuptools~=68.1.0.0 + Jinja2<3.1 [options.entry_points] # Put your entry point scripts here diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index f0883e7..983cdc7 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -1,11 +1,11 @@ import logging from itertools import chain -from typing import Any from typing import Callable from typing import Dict from typing import Iterable from typing import List from typing import Optional +from typing import Set from typing import Tuple from typing import Union @@ -16,6 +16,7 @@ from pds.registrysweepers.utils import configure_logging from pds.registrysweepers.utils import Host from pds.registrysweepers.utils import parse_args +from pds.registrysweepers.utils import Update from pds.registrysweepers.utils import write_updated_docs log = logging.getLogger(__name__) @@ -46,9 +47,26 @@ def run( nonaggregate_records = get_nonaggregate_ancestry_records(host, collection_records, registry_mock_query_f) ancestry_records = chain(bundle_records, collection_records, nonaggregate_records) + updates = generate_updates(ancestry_records, ancestry_records_accumulator, bulk_updates_sink) + + if bulk_updates_sink is None: + log.info("Writing bulk updates to database...") + write_updated_docs(host, updates) + else: + # consume generator to dump bulk updates to sink + for _ in updates: + pass + + log.info("Ancestry sweeper processing complete!") + + +def generate_updates( + ancestry_records: Iterable[AncestryRecord], ancestry_records_accumulator=None, bulk_updates_sink=None +) -> Iterable[Update]: + updates: Set[str] = set() log.info("Generating document bulk updates for AncestryRecords...") - updates: Dict[str, Dict[str, Any]] = {} + for record in ancestry_records: # Tee the stream of records into the accumulator, if one was provided (functional testing). if ancestry_records_accumulator is not None: @@ -58,29 +76,23 @@ def run( log.warning(f"Collection {record.lidvid} is not referenced by any bundle.") doc_id = str(record.lidvid) - update = { + update_content = { METADATA_PARENT_BUNDLE_KEY: [str(id) for id in record.parent_bundle_lidvids], METADATA_PARENT_COLLECTION_KEY: [str(id) for id in record.parent_collection_lidvids], } # Tee the stream of bulk update KVs into the accumulator, if one was provided (functional testing). if bulk_updates_sink is not None: - bulk_updates_sink.append((doc_id, update)) + bulk_updates_sink.append((doc_id, update_content)) if doc_id in updates: - existing_update = updates[doc_id] log.error( - f"Multiple updates detected for doc_id {doc_id} - cannot create update! (got {update}, {existing_update} already exists)" + f"Multiple updates detected for doc_id {doc_id} - cannot create update! (new content {update_content} will not be written)" ) continue - updates[doc_id] = update - - if updates and bulk_updates_sink is None: - log.info("Writing bulk updates to database...") - write_updated_docs(host, updates) - - log.info("Ancestry sweeper processing complete!") + updates.add(doc_id) + yield Update(id=doc_id, content=update_content) if __name__ == "__main__": diff --git a/src/pds/registrysweepers/provenance.py b/src/pds/registrysweepers/provenance.py index 5c01a3a..ba029fc 100755 --- a/src/pds/registrysweepers/provenance.py +++ b/src/pds/registrysweepers/provenance.py @@ -51,6 +51,7 @@ from pds.registrysweepers.utils import get_extant_lidvids from pds.registrysweepers.utils import Host from pds.registrysweepers.utils import parse_args +from pds.registrysweepers.utils import Update from pds.registrysweepers.utils import write_updated_docs log = logging.getLogger(__name__) @@ -68,18 +69,17 @@ def run( ): configure_logging(filepath=log_filepath, log_level=log_level) - log.info("starting CLI processing") + log.info("Starting provenance sweeper processing...") host = Host(password, base_url, username, verify_host_certs) extant_lidvids = get_extant_lidvids(host) successors = get_successors_by_lidvid(extant_lidvids) - updates = {id: {METADATA_SUCCESSOR_KEY: successor} for id, successor in successors.items()} + updates = generate_updates(successors) - if updates: - write_updated_docs(host, updates) + write_updated_docs(host, updates) - log.info("completed CLI processing") + log.info("Completed provenance sweeper processing!") def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]: @@ -106,6 +106,7 @@ def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str] lidvids.sort(key=_vid_as_tuple_of_int, reverse=True) for successor_idx, lidvid in enumerate(lidvids[1:]): + # TODO: consider converting this dict accumulation to a tuple/dict generator (yield) for memory optimization successors_by_lidvid[lidvid] = lidvids[successor_idx] log.info(f"Successors will be updated for {len(successors_by_lidvid)} LIDVIDs!") @@ -117,6 +118,12 @@ def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str] return successors_by_lidvid +def generate_updates(successors_by_id: Mapping[str, str]) -> Iterable[Update]: + for id, successor in successors_by_id.items(): + update_content = {METADATA_SUCCESSOR_KEY: successor} + yield Update(id=id, content=update_content) + + if __name__ == "__main__": cli_description = f""" Update registry records for non-latest LIDVIDs with up-to-date direct successor metadata ({METADATA_SUCCESSOR_KEY}). diff --git a/src/pds/registrysweepers/repairkit/__init__.py b/src/pds/registrysweepers/repairkit/__init__.py index 38243bf..e8059ec 100644 --- a/src/pds/registrysweepers/repairkit/__init__.py +++ b/src/pds/registrysweepers/repairkit/__init__.py @@ -7,11 +7,14 @@ """ import logging import re +from typing import Dict +from typing import Iterable from typing import Union from pds.registrysweepers.utils import configure_logging from pds.registrysweepers.utils import Host from pds.registrysweepers.utils import query_registry_db +from pds.registrysweepers.utils import Update from pds.registrysweepers.utils import write_updated_docs from . import allarrays @@ -45,28 +48,38 @@ def function_name (document:{}, fieldname:str)->{} log = logging.getLogger(__name__) -def run( - base_url: str, - username: str, - password: str, - verify_host_certs: bool = True, - log_filepath: Union[str, None] = None, - log_level: int = logging.INFO, -): - configure_logging(filepath=log_filepath, log_level=log_level) - log.info("starting CLI processing") - host = Host(password, base_url, username, verify_host_certs) - for document in query_registry_db(host, {"match_all": {}}, {}): +def generate_updates(docs: Iterable[Dict]) -> Iterable[Update]: + """Lazily generate necessary Update objects for a collection of db documents""" + for document in docs: id = document["_id"] src = document["_source"] repairs = {} - log.debug(f"working on document: {id}") + log.debug(f"applying repairkit sweeper to document: {id}") for fieldname, data in src.items(): for regex, funcs in REPAIR_TOOLS.items(): if regex(fieldname): for func in funcs: repairs.update(func(src, fieldname)) + if repairs: log.info(f"Writing repairs to document: {id}") - write_updated_docs(host, {id: repairs}) - return + yield Update(id=id, content=repairs) + + +def run( + base_url: str, + username: str, + password: str, + verify_host_certs: bool = True, + log_filepath: Union[str, None] = None, + log_level: int = logging.INFO, +): + configure_logging(filepath=log_filepath, log_level=log_level) + log.info("Starting repairkit sweeper processing...") + host = Host(password, base_url, username, verify_host_certs) + + all_docs = query_registry_db(host, {"match_all": {}}, {}) + updates = generate_updates(all_docs) + write_updated_docs(host, updates) + + log.info("Repairkit sweeper processing complete!") diff --git a/src/pds/registrysweepers/utils/__init__.py b/src/pds/registrysweepers/utils/__init__.py index d89eec1..1e00cd0 100644 --- a/src/pds/registrysweepers/utils/__init__.py +++ b/src/pds/registrysweepers/utils/__init__.py @@ -3,6 +3,7 @@ import functools import json import logging +import random import sys import urllib.parse from argparse import Namespace @@ -17,6 +18,7 @@ from typing import Union import requests +from pds.registrysweepers.utils.db.update import Update from requests.exceptions import HTTPError from retry import retry from retry.api import retry_call @@ -111,14 +113,15 @@ def query_registry_db( "size": page_size, } - log.info(f"Initiating query: {req_content}") + query_id = get_random_hex_id() # This is just used to differentiate queries during logging + log.info(f"Initiating query with id {query_id}: {req_content}") path = f"{index_name}/_search?scroll={scroll_keepalive_minutes}m" served_hits = 0 last_info_log_at_percentage = 0 - log.info("Query progress: 0%") + log.info(f"Query {query_id} progress: 0%") more_data_exists = True while more_data_exists: @@ -138,7 +141,9 @@ def query_registry_db( req_content = {"scroll": f"{scroll_keepalive_minutes}m", "scroll_id": data["_scroll_id"]} total_hits = data["hits"]["total"]["value"] - log.debug(f" paging query ({served_hits} to {min(served_hits + page_size, total_hits)} of {total_hits})") + log.debug( + f" paging query {query_id} ({served_hits} to {min(served_hits + page_size, total_hits)} of {total_hits})" + ) response_hits = data["hits"]["hits"] for hit in response_hits: @@ -147,7 +152,7 @@ def query_registry_db( percentage_of_hits_served = int(served_hits / total_hits * 100) if last_info_log_at_percentage is None or percentage_of_hits_served >= (last_info_log_at_percentage + 5): last_info_log_at_percentage = percentage_of_hits_served - log.info(f"Query progress: {percentage_of_hits_served}%") + log.info(f"Query {query_id} progress: {percentage_of_hits_served}%") yield hit @@ -158,7 +163,7 @@ def query_registry_db( hits_data_present_in_response = len(response_hits) > 0 if not hits_data_present_in_response: log.error( - f"Response contained no hits when hits were expected. Returned data is incomplete. Response was: {data}" + f"Response for query {query_id} contained no hits when hits were expected. Returned data is incomplete. Response was: {data}" ) break @@ -177,7 +182,7 @@ def query_registry_db( logger=log, ) - log.info("Query complete!") + log.info(f"Query {query_id} complete!") def query_registry_db_or_mock(mock_f: Optional[Callable[[str], Iterable[Dict]]], mock_query_id: str): @@ -213,16 +218,14 @@ def get_extant_lidvids(host: Host) -> Iterable[str]: return map(lambda doc: doc["_source"]["lidvid"], results) -def write_updated_docs(host: Host, ids_and_updates: Mapping[str, Dict], index_name: str = "registry"): - """ - Given an OpenSearch host and a mapping of doc ids onto updates to those docs, write bulk updates to documents in db. - """ - log.info(f"Updating documents for {len(ids_and_updates)} products...") +def write_updated_docs(host: Host, updates: Iterable[Update], index_name: str = "registry"): + log.info("Updating a lazily-generated collection of product documents...") + updated_doc_count = 0 bulk_buffer_max_size_mb = 30.0 bulk_buffer_size_mb = 0.0 bulk_updates_buffer: List[str] = [] - for lidvid, update_content in ids_and_updates.items(): + for update in updates: if bulk_buffer_size_mb > bulk_buffer_max_size_mb: pending_product_count = int(len(bulk_updates_buffer) / 2) log.info( @@ -232,18 +235,29 @@ def write_updated_docs(host: Host, ids_and_updates: Mapping[str, Dict], index_na bulk_updates_buffer = [] bulk_buffer_size_mb = 0.0 - update_objs = [{"update": {"_id": lidvid}}, {"doc": update_content}] - updates_strs = [json.dumps(obj) for obj in update_objs] + update_statement_strs = update_as_statements(update) - for s in updates_strs: + for s in update_statement_strs: bulk_buffer_size_mb += sys.getsizeof(s) / 1024**2 - bulk_updates_buffer.extend(updates_strs) + bulk_updates_buffer.extend(update_statement_strs) + updated_doc_count += 1 remaining_products_to_write_count = int(len(bulk_updates_buffer) / 2) + updated_doc_count += remaining_products_to_write_count + log.info(f"Writing documents updates for {remaining_products_to_write_count} remaining products to db...") _write_bulk_updates_chunk(host, index_name, bulk_updates_buffer) + log.info(f"Updated documents for {updated_doc_count} total products!") + + +def update_as_statements(update: Update) -> Iterable[str]: + """Given an Update, convert it to an ElasticSearch-style set of request body content strings""" + update_objs = [{"update": {"_id": update.id}}, {"doc": update.content}] + updates_strs = [json.dumps(obj) for obj in update_objs] + return updates_strs + @retry(exceptions=(HTTPError, RuntimeError), tries=6, delay=2, backoff=2, logger=log) def _write_bulk_updates_chunk(host: Host, index_name: str, bulk_updates: Iterable[str]): @@ -332,3 +346,8 @@ def get_human_readable_elapsed_since(begin: datetime) -> str: m = int(elapsed_seconds % 3600 / 60) s = int(elapsed_seconds % 60) return (f"{h}h" if h else "") + (f"{m}m" if m else "") + f"{s}s" + + +def get_random_hex_id(id_len: int = 6) -> str: + val = random.randint(0, 16**id_len) + return hex(val)[2:] diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pds/registrysweepers/utils/db/update.py b/src/pds/registrysweepers/utils/db/update.py new file mode 100644 index 0000000..1bb3e53 --- /dev/null +++ b/src/pds/registrysweepers/utils/db/update.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass +from typing import Dict + + +@dataclass +class Update: + """Class representing an ES/OpenSearch database update to a single document""" + + id: str + content: Dict