Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use opensearchPy instead of direct opensearch API requests #72

Merged
merged 8 commits into from
Sep 8, 2023
8 changes: 3 additions & 5 deletions docker/sweepers_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

from pds.registrysweepers import provenance, ancestry, repairkit
from pds.registrysweepers.utils import configure_logging, parse_log_level
from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment
from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since

configure_logging(filepath=None, log_level=logging.INFO)
Expand Down Expand Up @@ -100,12 +101,9 @@
def run_factory(sweeper_f: Callable) -> Callable:
return functools.partial(
sweeper_f,
base_url=opensearch_endpoint,
username=username,
password=password,
client=get_opensearch_client_from_environment(verify_certs=True if not dev_mode else False),
log_filepath='provenance.log',
log_level=log_level,
verify_host_certs=True if not dev_mode else False
log_level=log_level
)


Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ classifiers =

[options]
install_requires =
# opensearch-py 2.X is compatible with OpenSearch 1.X provided no deprecated OpenSearch features are used
opensearch-py~=2.3.1
requests~=2.28
retry~=0.9.2

Expand Down
25 changes: 11 additions & 14 deletions src/pds/registrysweepers/ancestry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
from typing import Tuple
from typing import Union

from opensearchpy import OpenSearch
from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord
from pds.registrysweepers.ancestry.generation import get_bundle_ancestry_records
from pds.registrysweepers.ancestry.generation import get_collection_ancestry_records
from pds.registrysweepers.ancestry.generation import get_nonaggregate_ancestry_records
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils import parse_args
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.client import get_opensearch_client
from pds.registrysweepers.utils.db.host import Host
from pds.registrysweepers.utils.db.update import Update

Expand All @@ -26,10 +28,7 @@


def run(
base_url: str,
username: str,
password: str,
verify_host_certs: bool = True,
client: OpenSearch,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
registry_mock_query_f: Optional[Callable[[str], Iterable[Dict]]] = None,
Expand All @@ -40,18 +39,16 @@ def run(

log.info("Starting ancestry sweeper")

host = Host(password, base_url, username, verify_host_certs)

bundle_records = get_bundle_ancestry_records(host, registry_mock_query_f)
collection_records = list(get_collection_ancestry_records(host, registry_mock_query_f))
nonaggregate_records = get_nonaggregate_ancestry_records(host, collection_records, registry_mock_query_f)
bundle_records = get_bundle_ancestry_records(client, registry_mock_query_f)
collection_records = list(get_collection_ancestry_records(client, registry_mock_query_f))
nonaggregate_records = get_nonaggregate_ancestry_records(client, collection_records, registry_mock_query_f)

ancestry_records = chain(bundle_records, collection_records, nonaggregate_records)
updates = generate_updates(ancestry_records, ancestry_records_accumulator, bulk_updates_sink)

if bulk_updates_sink is None:
log.info("Writing bulk updates to database...")
write_updated_docs(host, updates)
write_updated_docs(client, updates)
else:
# consume generator to dump bulk updates to sink
for _ in updates:
Expand Down Expand Up @@ -103,12 +100,12 @@ def generate_updates(
"""

args = parse_args(description=cli_description)
client = get_opensearch_client(
endpoint_url=args.base_URL, username=args.username, password=args.password, verify_certs=not args.insecure
)

run(
base_url=args.base_URL,
username=args.username,
password=args.password,
verify_host_certs=not args.insecure,
client=client,
log_level=args.log_level,
log_filepath=args.log_file,
)
17 changes: 10 additions & 7 deletions src/pds/registrysweepers/ancestry/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Mapping
from typing import Set

from opensearchpy import OpenSearch
from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord
from pds.registrysweepers.ancestry.queries import DbMockTypeDef
from pds.registrysweepers.ancestry.queries import get_bundle_ancestry_records_query
Expand All @@ -20,9 +21,9 @@
log = logging.getLogger(__name__)


def get_bundle_ancestry_records(host: Host, db_mock: DbMockTypeDef = None) -> Iterable[AncestryRecord]:
def get_bundle_ancestry_records(client: OpenSearch, db_mock: DbMockTypeDef = None) -> Iterable[AncestryRecord]:
log.info("Generating AncestryRecords for bundles...")
docs = get_bundle_ancestry_records_query(host, db_mock)
docs = get_bundle_ancestry_records_query(client, db_mock)
for doc in docs:
try:
yield AncestryRecord(lidvid=PdsLidVid.from_string(doc["_source"]["lidvid"]))
Expand Down Expand Up @@ -85,10 +86,12 @@ def get_ancestry_by_collection_lid(
return ancestry_by_collection_lid


def get_collection_ancestry_records(host: Host, registry_db_mock: DbMockTypeDef = None) -> Iterable[AncestryRecord]:
def get_collection_ancestry_records(
client: OpenSearch, registry_db_mock: DbMockTypeDef = None
) -> Iterable[AncestryRecord]:
log.info("Generating AncestryRecords for collections...")
bundles_docs = get_collection_ancestry_records_bundles_query(host, registry_db_mock)
collections_docs = list(get_collection_ancestry_records_collections_query(host, registry_db_mock))
bundles_docs = get_collection_ancestry_records_bundles_query(client, registry_db_mock)
collections_docs = list(get_collection_ancestry_records_collections_query(client, registry_db_mock))

# Prepare LID alias sets for every LID
collection_aliases_by_lid: Dict[PdsLid, Set[PdsLid]] = get_collection_aliases_by_lid(collections_docs)
Expand Down Expand Up @@ -149,7 +152,7 @@ def get_collection_ancestry_records(host: Host, registry_db_mock: DbMockTypeDef


def get_nonaggregate_ancestry_records(
host: Host,
client: OpenSearch,
collection_ancestry_records: Iterable[AncestryRecord],
registry_db_mock: DbMockTypeDef = None,
) -> Iterable[AncestryRecord]:
Expand All @@ -160,7 +163,7 @@ def get_nonaggregate_ancestry_records(
record.lidvid: record.parent_bundle_lidvids for record in collection_ancestry_records
}

collection_refs_query_docs = get_nonaggregate_ancestry_records_query(host, registry_db_mock)
collection_refs_query_docs = get_nonaggregate_ancestry_records_query(client, registry_db_mock)

nonaggregate_ancestry_records_by_lidvid = {}
# For each collection, add the collection and its bundle ancestry to all products the collection contains
Expand Down
24 changes: 13 additions & 11 deletions src/pds/registrysweepers/ancestry/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from typing import Iterable
from typing import Optional

from opensearchpy import OpenSearch
from pds.registrysweepers.utils.db import query_registry_db_or_mock
from pds.registrysweepers.utils.db.host import Host

log = logging.getLogger(__name__)


DbMockTypeDef = Optional[Callable[[str], Iterable[Dict]]]


Expand All @@ -30,42 +30,44 @@ def product_class_query_factory(cls: ProductClass) -> Dict:
},
}

return queries[cls]
return {"query": queries[cls]}


def get_bundle_ancestry_records_query(host: Host, db_mock: DbMockTypeDef = None) -> Iterable[Dict]:
def get_bundle_ancestry_records_query(client: OpenSearch, db_mock: DbMockTypeDef = None) -> Iterable[Dict]:
query = product_class_query_factory(ProductClass.BUNDLE)
_source = {"includes": ["lidvid"]}
query_f = query_registry_db_or_mock(db_mock, "get_bundle_ancestry_records")
docs = query_f(host, query, _source)
docs = query_f(client, query, _source)

return docs


def get_collection_ancestry_records_bundles_query(host: Host, db_mock: DbMockTypeDef = None) -> Iterable[Dict]:
def get_collection_ancestry_records_bundles_query(client: OpenSearch, db_mock: DbMockTypeDef = None) -> Iterable[Dict]:
query = product_class_query_factory(ProductClass.BUNDLE)
_source = {"includes": ["lidvid", "ref_lid_collection"]}
query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_bundles")
docs = query_f(host, query, _source)
docs = query_f(client, query, _source)

return docs


def get_collection_ancestry_records_collections_query(host: Host, db_mock: DbMockTypeDef = None) -> Iterable[Dict]:
def get_collection_ancestry_records_collections_query(
client: OpenSearch, db_mock: DbMockTypeDef = None
) -> Iterable[Dict]:
# Query the registry for all collection identifiers
query = product_class_query_factory(ProductClass.COLLECTION)
_source = {"includes": ["lidvid", "alternate_ids"]}
query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_collections")
docs = query_f(host, query, _source)
docs = query_f(client, query, _source)

return docs


def get_nonaggregate_ancestry_records_query(host: Host, registry_db_mock: DbMockTypeDef) -> Iterable[Dict]:
def get_nonaggregate_ancestry_records_query(client: OpenSearch, registry_db_mock: DbMockTypeDef) -> Iterable[Dict]:
# Query the registry-refs index for the contents of all collections
query: Dict = {"match_all": {}}
query: Dict = {"query": {"match_all": {}}}
_source = {"includes": ["collection_lidvid", "product_lidvid"]}
query_f = query_registry_db_or_mock(registry_db_mock, "get_nonaggregate_ancestry_records")
docs = query_f(host, query, _source, index_name="registry-refs")
docs = query_f(client, query, _source, index_name="registry-refs")

return docs
21 changes: 10 additions & 11 deletions src/pds/registrysweepers/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@
from typing import Mapping
from typing import Union

from opensearchpy import OpenSearch
from pds.registrysweepers.utils import _vid_as_tuple_of_int
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils import parse_args
from pds.registrysweepers.utils.db import get_extant_lidvids
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.client import get_opensearch_client
from pds.registrysweepers.utils.db.host import Host
from pds.registrysweepers.utils.db.update import Update

Expand All @@ -60,24 +62,21 @@


def run(
base_url: str,
username: str,
password: str,
verify_host_certs: bool = True,
client: OpenSearch,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
):
configure_logging(filepath=log_filepath, log_level=log_level)

log.info("Starting provenance sweeper processing...")

host = Host(password, base_url, username, verify_host_certs)
# host = Host(password, base_url, username, verify_host_certs)

extant_lidvids = get_extant_lidvids(host)
extant_lidvids = get_extant_lidvids(client)
successors = get_successors_by_lidvid(extant_lidvids)
updates = generate_updates(successors)

write_updated_docs(host, updates)
write_updated_docs(client, updates)

log.info("Completed provenance sweeper processing!")

Expand Down Expand Up @@ -144,12 +143,12 @@ def generate_updates(successors_by_id: Mapping[str, str]) -> Iterable[Update]:
"""

args = parse_args(description=cli_description, epilog=cli_epilog)
client = get_opensearch_client(
endpoint_url=args.base_URL, username=args.username, password=args.password, verify_certs=not args.insecure
)

run(
base_url=args.base_URL,
username=args.username,
password=args.password,
verify_host_certs=not args.insecure,
client=client,
log_level=args.log_level,
log_filepath=args.log_file,
)
26 changes: 19 additions & 7 deletions src/pds/registrysweepers/repairkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
from typing import Iterable
from typing import Union

from opensearchpy import OpenSearch
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils import parse_args
from pds.registrysweepers.utils import query_registry_db
from pds.registrysweepers.utils.db.client import get_opensearch_client
from pds.registrysweepers.utils.db.host import Host
from pds.registrysweepers.utils.db.update import Update

Expand Down Expand Up @@ -67,19 +70,28 @@ def generate_updates(docs: Iterable[Dict]) -> Iterable[Update]:


def run(
base_url: str,
username: str,
password: str,
verify_host_certs: bool = True,
client: OpenSearch,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
):
configure_logging(filepath=log_filepath, log_level=log_level)
log.info("Starting repairkit sweeper processing...")
host = Host(password, base_url, username, verify_host_certs)

all_docs = query_registry_db(host, {"match_all": {}}, {})
all_docs = query_registry_db(client, {"query": {"match_all": {}}}, {})
updates = generate_updates(all_docs)
write_updated_docs(host, updates)
write_updated_docs(client, updates)

log.info("Repairkit sweeper processing complete!")


if __name__ == "__main__":
args = parse_args(description="sweep through the registry documents and fix common errors")
client = get_opensearch_client(
endpoint_url=args.base_URL, username=args.username, password=args.password, verify_certs=not args.insecure
)

run(
client=client,
log_level=args.log_level,
log_filepath=args.log_file,
)
13 changes: 0 additions & 13 deletions src/pds/registrysweepers/repairkit/__main__.py

This file was deleted.

Loading