Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve repairkit sweeper performance #62

Merged
merged 11 commits into from
Aug 30, 2023
Merged
21 changes: 14 additions & 7 deletions docker/sweepers_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#

import functools
import inspect
import json
import logging
import os
Expand Down Expand Up @@ -94,6 +95,7 @@

log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO'))


def run_factory(sweeper_f: Callable) -> Callable:
return functools.partial(
sweeper_f,
Expand All @@ -106,15 +108,20 @@ def run_factory(sweeper_f: Callable) -> Callable:
)


run_provenance = run_factory(provenance.run)
run_ancestry = run_factory(ancestry.run)
run_repairkit = run_factory(repairkit.run)
# Define sweepers to be run here, in order of execution
sweepers = [
repairkit.run,
provenance.run,
ancestry.run
]

sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers]
log.info(f'Running sweepers: {sweeper_descriptions}')

log.info('Running sweepers')
execution_begin = datetime.now()

run_repairkit()
run_provenance()
run_ancestry()
for sweeper in sweepers:
run_sweeper_f = run_factory(sweeper)
run_sweeper_f()

log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(execution_begin)}')
38 changes: 25 additions & 13 deletions src/pds/registrysweepers/ancestry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
from itertools import chain
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union

Expand All @@ -16,6 +16,7 @@
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils import Host
from pds.registrysweepers.utils import parse_args
from pds.registrysweepers.utils import Update
from pds.registrysweepers.utils import write_updated_docs

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -46,9 +47,26 @@ def run(
nonaggregate_records = get_nonaggregate_ancestry_records(host, collection_records, registry_mock_query_f)

ancestry_records = chain(bundle_records, collection_records, nonaggregate_records)
updates = generate_updates(ancestry_records, ancestry_records_accumulator, bulk_updates_sink)

if bulk_updates_sink is None:
log.info("Writing bulk updates to database...")
write_updated_docs(host, updates)
else:
# consume generator to dump bulk updates to sink
for _ in updates:
pass

log.info("Ancestry sweeper processing complete!")


def generate_updates(
ancestry_records: Iterable[AncestryRecord], ancestry_records_accumulator=None, bulk_updates_sink=None
) -> Iterable[Update]:
updates: Set[str] = set()

log.info("Generating document bulk updates for AncestryRecords...")
updates: Dict[str, Dict[str, Any]] = {}

for record in ancestry_records:
# Tee the stream of records into the accumulator, if one was provided (functional testing).
if ancestry_records_accumulator is not None:
Expand All @@ -58,29 +76,23 @@ def run(
log.warning(f"Collection {record.lidvid} is not referenced by any bundle.")

doc_id = str(record.lidvid)
update = {
update_content = {
METADATA_PARENT_BUNDLE_KEY: [str(id) for id in record.parent_bundle_lidvids],
METADATA_PARENT_COLLECTION_KEY: [str(id) for id in record.parent_collection_lidvids],
}

# Tee the stream of bulk update KVs into the accumulator, if one was provided (functional testing).
if bulk_updates_sink is not None:
bulk_updates_sink.append((doc_id, update))
bulk_updates_sink.append((doc_id, update_content))

if doc_id in updates:
existing_update = updates[doc_id]
log.error(
f"Multiple updates detected for doc_id {doc_id} - cannot create update! (got {update}, {existing_update} already exists)"
f"Multiple updates detected for doc_id {doc_id} - cannot create update! (new content {update_content} will not be written)"
)
continue

updates[doc_id] = update

if updates and bulk_updates_sink is None:
log.info("Writing bulk updates to database...")
write_updated_docs(host, updates)

log.info("Ancestry sweeper processing complete!")
updates.add(doc_id)
yield Update(id=doc_id, content=update_content)


if __name__ == "__main__":
Expand Down
17 changes: 12 additions & 5 deletions src/pds/registrysweepers/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from pds.registrysweepers.utils import get_extant_lidvids
from pds.registrysweepers.utils import Host
from pds.registrysweepers.utils import parse_args
from pds.registrysweepers.utils import Update
from pds.registrysweepers.utils import write_updated_docs

log = logging.getLogger(__name__)
Expand All @@ -68,18 +69,17 @@ def run(
):
configure_logging(filepath=log_filepath, log_level=log_level)

log.info("starting CLI processing")
log.info("Starting provenance sweeper processing...")

host = Host(password, base_url, username, verify_host_certs)

extant_lidvids = get_extant_lidvids(host)
successors = get_successors_by_lidvid(extant_lidvids)
updates = {id: {METADATA_SUCCESSOR_KEY: successor} for id, successor in successors.items()}
updates = generate_updates(successors)

if updates:
write_updated_docs(host, updates)
write_updated_docs(host, updates)

log.info("completed CLI processing")
log.info("Completed provenance sweeper processing!")


def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]:
Expand All @@ -106,6 +106,7 @@ def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]
lidvids.sort(key=_vid_as_tuple_of_int, reverse=True)

for successor_idx, lidvid in enumerate(lidvids[1:]):
# TODO: consider converting this dict accumulation to a tuple/dict generator (yield) for memory optimization
successors_by_lidvid[lidvid] = lidvids[successor_idx]

log.info(f"Successors will be updated for {len(successors_by_lidvid)} LIDVIDs!")
Expand All @@ -117,6 +118,12 @@ def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]
return successors_by_lidvid


def generate_updates(successors_by_id: Mapping[str, str]) -> Iterable[Update]:
for id, successor in successors_by_id.items():
update_content = {METADATA_SUCCESSOR_KEY: successor}
yield Update(id=id, content=update_content)


if __name__ == "__main__":
cli_description = f"""
Update registry records for non-latest LIDVIDs with up-to-date direct successor metadata ({METADATA_SUCCESSOR_KEY}).
Expand Down
43 changes: 28 additions & 15 deletions src/pds/registrysweepers/repairkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
"""
import logging
import re
from typing import Dict
from typing import Iterable
from typing import Union

from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils import Host
from pds.registrysweepers.utils import query_registry_db
from pds.registrysweepers.utils import Update
from pds.registrysweepers.utils import write_updated_docs

from . import allarrays
Expand Down Expand Up @@ -45,28 +48,38 @@ def function_name (document:{}, fieldname:str)->{}
log = logging.getLogger(__name__)


def run(
base_url: str,
username: str,
password: str,
verify_host_certs: bool = True,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
):
configure_logging(filepath=log_filepath, log_level=log_level)
log.info("starting CLI processing")
host = Host(password, base_url, username, verify_host_certs)
for document in query_registry_db(host, {"match_all": {}}, {}):
def generate_updates(docs: Iterable[Dict]) -> Iterable[Update]:
"""Lazily generate necessary Update objects for a collection of db documents"""
for document in docs:
id = document["_id"]
src = document["_source"]
repairs = {}
log.debug(f"working on document: {id}")
log.debug(f"applying repairkit sweeper to document: {id}")
for fieldname, data in src.items():
for regex, funcs in REPAIR_TOOLS.items():
if regex(fieldname):
for func in funcs:
repairs.update(func(src, fieldname))

if repairs:
log.info(f"Writing repairs to document: {id}")
write_updated_docs(host, {id: repairs})
return
yield Update(id=id, content=repairs)


def run(
base_url: str,
username: str,
password: str,
verify_host_certs: bool = True,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
):
configure_logging(filepath=log_filepath, log_level=log_level)
log.info("Starting repairkit sweeper processing...")
host = Host(password, base_url, username, verify_host_certs)

all_docs = query_registry_db(host, {"match_all": {}}, {})
updates = generate_updates(all_docs)
write_updated_docs(host, updates)

log.info("Repairkit sweeper processing complete!")
51 changes: 35 additions & 16 deletions src/pds/registrysweepers/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import functools
import json
import logging
import random
import sys
import urllib.parse
from argparse import Namespace
Expand All @@ -17,6 +18,7 @@
from typing import Union

import requests
from pds.registrysweepers.utils.db.update import Update
from requests.exceptions import HTTPError
from retry import retry
from retry.api import retry_call
Expand Down Expand Up @@ -111,14 +113,15 @@ def query_registry_db(
"size": page_size,
}

log.info(f"Initiating query: {req_content}")
query_id = get_random_hex_id() # This is just used to differentiate queries during logging
log.info(f"Initiating query with id {query_id}: {req_content}")

path = f"{index_name}/_search?scroll={scroll_keepalive_minutes}m"

served_hits = 0

last_info_log_at_percentage = 0
log.info("Query progress: 0%")
log.info(f"Query {query_id} progress: 0%")

more_data_exists = True
while more_data_exists:
Expand All @@ -138,7 +141,9 @@ def query_registry_db(
req_content = {"scroll": f"{scroll_keepalive_minutes}m", "scroll_id": data["_scroll_id"]}

total_hits = data["hits"]["total"]["value"]
log.debug(f" paging query ({served_hits} to {min(served_hits + page_size, total_hits)} of {total_hits})")
log.debug(
f" paging query {query_id} ({served_hits} to {min(served_hits + page_size, total_hits)} of {total_hits})"
)

response_hits = data["hits"]["hits"]
for hit in response_hits:
Expand All @@ -147,7 +152,7 @@ def query_registry_db(
percentage_of_hits_served = int(served_hits / total_hits * 100)
if last_info_log_at_percentage is None or percentage_of_hits_served >= (last_info_log_at_percentage + 5):
last_info_log_at_percentage = percentage_of_hits_served
log.info(f"Query progress: {percentage_of_hits_served}%")
log.info(f"Query {query_id} progress: {percentage_of_hits_served}%")

yield hit

Expand All @@ -158,7 +163,7 @@ def query_registry_db(
hits_data_present_in_response = len(response_hits) > 0
if not hits_data_present_in_response:
log.error(
f"Response contained no hits when hits were expected. Returned data is incomplete. Response was: {data}"
f"Response for query {query_id} contained no hits when hits were expected. Returned data is incomplete. Response was: {data}"
)
break

Expand All @@ -177,7 +182,7 @@ def query_registry_db(
logger=log,
)

log.info("Query complete!")
log.info(f"Query {query_id} complete!")


def query_registry_db_or_mock(mock_f: Optional[Callable[[str], Iterable[Dict]]], mock_query_id: str):
Expand Down Expand Up @@ -213,16 +218,14 @@ def get_extant_lidvids(host: Host) -> Iterable[str]:
return map(lambda doc: doc["_source"]["lidvid"], results)


def write_updated_docs(host: Host, ids_and_updates: Mapping[str, Dict], index_name: str = "registry"):
"""
Given an OpenSearch host and a mapping of doc ids onto updates to those docs, write bulk updates to documents in db.
"""
log.info(f"Updating documents for {len(ids_and_updates)} products...")
def write_updated_docs(host: Host, updates: Iterable[Update], index_name: str = "registry"):
log.info("Updating a lazily-generated collection of product documents...")
updated_doc_count = 0

bulk_buffer_max_size_mb = 30.0
bulk_buffer_size_mb = 0.0
bulk_updates_buffer: List[str] = []
for lidvid, update_content in ids_and_updates.items():
for update in updates:
if bulk_buffer_size_mb > bulk_buffer_max_size_mb:
pending_product_count = int(len(bulk_updates_buffer) / 2)
log.info(
Expand All @@ -232,18 +235,29 @@ def write_updated_docs(host: Host, ids_and_updates: Mapping[str, Dict], index_na
bulk_updates_buffer = []
bulk_buffer_size_mb = 0.0

update_objs = [{"update": {"_id": lidvid}}, {"doc": update_content}]
updates_strs = [json.dumps(obj) for obj in update_objs]
update_statement_strs = update_as_statements(update)

for s in updates_strs:
for s in update_statement_strs:
bulk_buffer_size_mb += sys.getsizeof(s) / 1024**2

bulk_updates_buffer.extend(updates_strs)
bulk_updates_buffer.extend(update_statement_strs)
updated_doc_count += 1

remaining_products_to_write_count = int(len(bulk_updates_buffer) / 2)
updated_doc_count += remaining_products_to_write_count

log.info(f"Writing documents updates for {remaining_products_to_write_count} remaining products to db...")
_write_bulk_updates_chunk(host, index_name, bulk_updates_buffer)

log.info(f"Updated documents for {updated_doc_count} total products!")


def update_as_statements(update: Update) -> Iterable[str]:
"""Given an Update, convert it to an ElasticSearch-style set of request body content strings"""
update_objs = [{"update": {"_id": update.id}}, {"doc": update.content}]
updates_strs = [json.dumps(obj) for obj in update_objs]
return updates_strs


@retry(exceptions=(HTTPError, RuntimeError), tries=6, delay=2, backoff=2, logger=log)
def _write_bulk_updates_chunk(host: Host, index_name: str, bulk_updates: Iterable[str]):
Expand Down Expand Up @@ -332,3 +346,8 @@ def get_human_readable_elapsed_since(begin: datetime) -> str:
m = int(elapsed_seconds % 3600 / 60)
s = int(elapsed_seconds % 60)
return (f"{h}h" if h else "") + (f"{m}m" if m else "") + f"{s}s"


def get_random_hex_id(id_len: int = 6) -> str:
val = random.randint(0, 16**id_len)
return hex(val)[2:]
Empty file.
10 changes: 10 additions & 0 deletions src/pds/registrysweepers/utils/db/update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dataclasses import dataclass
from typing import Dict


@dataclass
class Update:
"""Class representing an ES/OpenSearch database update to a single document"""

id: str
content: Dict