Skip to content

Commit

Permalink
Merge pull request #62 from NASA-PDS/repairkit-repair-kit
Browse files Browse the repository at this point in the history
improve repairkit sweeper performance
  • Loading branch information
tloubrieu-jpl authored Aug 30, 2023
2 parents 0d3a4e0 + e3ce550 commit 3f8b796
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 73 deletions.
21 changes: 14 additions & 7 deletions docker/sweepers_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#

import functools
import inspect
import json
import logging
import os
Expand Down Expand Up @@ -94,6 +95,7 @@

log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO'))


def run_factory(sweeper_f: Callable) -> Callable:
return functools.partial(
sweeper_f,
Expand All @@ -106,15 +108,20 @@ def run_factory(sweeper_f: Callable) -> Callable:
)


run_provenance = run_factory(provenance.run)
run_ancestry = run_factory(ancestry.run)
run_repairkit = run_factory(repairkit.run)
# Define sweepers to be run here, in order of execution
sweepers = [
repairkit.run,
provenance.run,
ancestry.run
]

sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers]
log.info(f'Running sweepers: {sweeper_descriptions}')

log.info('Running sweepers')
execution_begin = datetime.now()

run_repairkit()
run_provenance()
run_ancestry()
for sweeper in sweepers:
run_sweeper_f = run_factory(sweeper)
run_sweeper_f()

log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(execution_begin)}')
35 changes: 18 additions & 17 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,26 @@ python_requires = >= 3.9

[options.extras_require]
dev =
black
flake8
flake8-bugbear
flake8-docstrings
pep8-naming
mypy
pydocstyle
coverage
pytest
pytest-cov
pytest-watch
pytest-xdist
pre-commit
sphinx
sphinx-rtd-theme
tox
black~=23.7.0
flake8~=6.1.0
flake8-bugbear~=23.7.10
flake8-docstrings~=1.7.0
pep8-naming~=0.13.3
mypy~=1.5.1
pydocstyle~=6.3.0
coverage~=7.3.0
pytest~=7.4.0
pytest-cov~=4.1.0
pytest-watch~=4.2.0
pytest-xdist~=3.3.1
pre-commit~=3.3.3
sphinx~=3.2.1
sphinx-rtd-theme~=0.5.0
tox~=4.11.0
types_requests~=2.28
types-retry~=0.9.9.4
types-setuptools
types-setuptools~=68.1.0.0
Jinja2<3.1

[options.entry_points]
# Put your entry point scripts here
Expand Down
38 changes: 25 additions & 13 deletions src/pds/registrysweepers/ancestry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
from itertools import chain
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union

Expand All @@ -16,6 +16,7 @@
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils import Host
from pds.registrysweepers.utils import parse_args
from pds.registrysweepers.utils import Update
from pds.registrysweepers.utils import write_updated_docs

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -46,9 +47,26 @@ def run(
nonaggregate_records = get_nonaggregate_ancestry_records(host, collection_records, registry_mock_query_f)

ancestry_records = chain(bundle_records, collection_records, nonaggregate_records)
updates = generate_updates(ancestry_records, ancestry_records_accumulator, bulk_updates_sink)

if bulk_updates_sink is None:
log.info("Writing bulk updates to database...")
write_updated_docs(host, updates)
else:
# consume generator to dump bulk updates to sink
for _ in updates:
pass

log.info("Ancestry sweeper processing complete!")


def generate_updates(
ancestry_records: Iterable[AncestryRecord], ancestry_records_accumulator=None, bulk_updates_sink=None
) -> Iterable[Update]:
updates: Set[str] = set()

log.info("Generating document bulk updates for AncestryRecords...")
updates: Dict[str, Dict[str, Any]] = {}

for record in ancestry_records:
# Tee the stream of records into the accumulator, if one was provided (functional testing).
if ancestry_records_accumulator is not None:
Expand All @@ -58,29 +76,23 @@ def run(
log.warning(f"Collection {record.lidvid} is not referenced by any bundle.")

doc_id = str(record.lidvid)
update = {
update_content = {
METADATA_PARENT_BUNDLE_KEY: [str(id) for id in record.parent_bundle_lidvids],
METADATA_PARENT_COLLECTION_KEY: [str(id) for id in record.parent_collection_lidvids],
}

# Tee the stream of bulk update KVs into the accumulator, if one was provided (functional testing).
if bulk_updates_sink is not None:
bulk_updates_sink.append((doc_id, update))
bulk_updates_sink.append((doc_id, update_content))

if doc_id in updates:
existing_update = updates[doc_id]
log.error(
f"Multiple updates detected for doc_id {doc_id} - cannot create update! (got {update}, {existing_update} already exists)"
f"Multiple updates detected for doc_id {doc_id} - cannot create update! (new content {update_content} will not be written)"
)
continue

updates[doc_id] = update

if updates and bulk_updates_sink is None:
log.info("Writing bulk updates to database...")
write_updated_docs(host, updates)

log.info("Ancestry sweeper processing complete!")
updates.add(doc_id)
yield Update(id=doc_id, content=update_content)


if __name__ == "__main__":
Expand Down
17 changes: 12 additions & 5 deletions src/pds/registrysweepers/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from pds.registrysweepers.utils import get_extant_lidvids
from pds.registrysweepers.utils import Host
from pds.registrysweepers.utils import parse_args
from pds.registrysweepers.utils import Update
from pds.registrysweepers.utils import write_updated_docs

log = logging.getLogger(__name__)
Expand All @@ -68,18 +69,17 @@ def run(
):
configure_logging(filepath=log_filepath, log_level=log_level)

log.info("starting CLI processing")
log.info("Starting provenance sweeper processing...")

host = Host(password, base_url, username, verify_host_certs)

extant_lidvids = get_extant_lidvids(host)
successors = get_successors_by_lidvid(extant_lidvids)
updates = {id: {METADATA_SUCCESSOR_KEY: successor} for id, successor in successors.items()}
updates = generate_updates(successors)

if updates:
write_updated_docs(host, updates)
write_updated_docs(host, updates)

log.info("completed CLI processing")
log.info("Completed provenance sweeper processing!")


def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]:
Expand All @@ -106,6 +106,7 @@ def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]
lidvids.sort(key=_vid_as_tuple_of_int, reverse=True)

for successor_idx, lidvid in enumerate(lidvids[1:]):
# TODO: consider converting this dict accumulation to a tuple/dict generator (yield) for memory optimization
successors_by_lidvid[lidvid] = lidvids[successor_idx]

log.info(f"Successors will be updated for {len(successors_by_lidvid)} LIDVIDs!")
Expand All @@ -117,6 +118,12 @@ def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]
return successors_by_lidvid


def generate_updates(successors_by_id: Mapping[str, str]) -> Iterable[Update]:
for id, successor in successors_by_id.items():
update_content = {METADATA_SUCCESSOR_KEY: successor}
yield Update(id=id, content=update_content)


if __name__ == "__main__":
cli_description = f"""
Update registry records for non-latest LIDVIDs with up-to-date direct successor metadata ({METADATA_SUCCESSOR_KEY}).
Expand Down
43 changes: 28 additions & 15 deletions src/pds/registrysweepers/repairkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
"""
import logging
import re
from typing import Dict
from typing import Iterable
from typing import Union

from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils import Host
from pds.registrysweepers.utils import query_registry_db
from pds.registrysweepers.utils import Update
from pds.registrysweepers.utils import write_updated_docs

from . import allarrays
Expand Down Expand Up @@ -45,28 +48,38 @@ def function_name (document:{}, fieldname:str)->{}
log = logging.getLogger(__name__)


def run(
base_url: str,
username: str,
password: str,
verify_host_certs: bool = True,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
):
configure_logging(filepath=log_filepath, log_level=log_level)
log.info("starting CLI processing")
host = Host(password, base_url, username, verify_host_certs)
for document in query_registry_db(host, {"match_all": {}}, {}):
def generate_updates(docs: Iterable[Dict]) -> Iterable[Update]:
"""Lazily generate necessary Update objects for a collection of db documents"""
for document in docs:
id = document["_id"]
src = document["_source"]
repairs = {}
log.debug(f"working on document: {id}")
log.debug(f"applying repairkit sweeper to document: {id}")
for fieldname, data in src.items():
for regex, funcs in REPAIR_TOOLS.items():
if regex(fieldname):
for func in funcs:
repairs.update(func(src, fieldname))

if repairs:
log.info(f"Writing repairs to document: {id}")
write_updated_docs(host, {id: repairs})
return
yield Update(id=id, content=repairs)


def run(
base_url: str,
username: str,
password: str,
verify_host_certs: bool = True,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
):
configure_logging(filepath=log_filepath, log_level=log_level)
log.info("Starting repairkit sweeper processing...")
host = Host(password, base_url, username, verify_host_certs)

all_docs = query_registry_db(host, {"match_all": {}}, {})
updates = generate_updates(all_docs)
write_updated_docs(host, updates)

log.info("Repairkit sweeper processing complete!")
Loading

0 comments on commit 3f8b796

Please sign in to comment.