diff --git a/README.md b/README.md index 2febcae..bc22ef7 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ The ancestry sweeper generates membership metadata for each product, i.e. which #### Environment Variables ``` +MULTITENANCY_NODE_ID= // If running in a multitenant environment, the id of the node, used to distinguish registry/registry-refs index instances PROV_CREDENTIALS={"admin": "admin"} // OpenSearch username/password PROV_ENDPOINT=https://localhost:9200 // OpenSearch host url and port LOGLEVEL - an integer log level or anycase string matching a python log level like `INFO` (optional - defaults to `INFO`)) diff --git a/docker/sweepers_driver.py b/docker/sweepers_driver.py index 4836b74..c15e165 100755 --- a/docker/sweepers_driver.py +++ b/docker/sweepers_driver.py @@ -30,7 +30,7 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # -# Python driver for provenance +# Python driver for provenance (OUTDATED TODO: Update documentation) # ============================ # # This script is provided to support the scheduled execution of PDS Registry @@ -78,24 +78,6 @@ urllib3.disable_warnings() -opensearch_endpoint = os.environ.get('PROV_ENDPOINT', '') -if opensearch_endpoint.strip() == '': - raise RuntimeError('Environment variable PROV_ENDPOINT must be provided') -log.info(f'Targeting OpenSearch endpoint "{opensearch_endpoint}"') - -try: - provCredentialsStr = os.environ["PROV_CREDENTIALS"] -except KeyError: - raise RuntimeError('Environment variable PROV_CREDENTIALS must be provided') - -try: - provCredentials = json.loads(provCredentialsStr) - username = list(provCredentials.keys())[0] - password = provCredentials[username] -except Exception as err: - logging.error(err) - raise ValueError(f'Failed to parse username/password from PROV_CREDENTIALS value "{provCredentialsStr}": {err}') - log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO')) diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index e10a137..7d85c0c 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -33,6 +33,7 @@ from pds.registrysweepers.utils.db import write_updated_docs from pds.registrysweepers.utils.db.client import get_userpass_opensearch_client from pds.registrysweepers.utils.db.indexing import ensure_index_mapping +from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name from pds.registrysweepers.utils.db.update import Update from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid @@ -70,18 +71,18 @@ def run( METADATA_PARENT_COLLECTION_KEY, SWEEPERS_ANCESTRY_VERSION_METADATA_KEY, ]: - ensure_index_mapping(client, "registry", metadata_key, "keyword") + ensure_index_mapping(client, resolve_multitenant_index_name("registry"), metadata_key, "keyword") for metadata_key in [ SWEEPERS_ANCESTRY_VERSION_METADATA_KEY, ]: - ensure_index_mapping(client, "registry-refs", metadata_key, "keyword") + ensure_index_mapping(client, resolve_multitenant_index_name("registry-refs"), metadata_key, "keyword") log.info("Writing bulk updates to database...") write_updated_docs( client, updates, - index_name="registry", + index_name=resolve_multitenant_index_name("registry"), ) log.info("Generating updates from deferred records...") deferred_updates = generate_deferred_updates(client, deferred_records_file.name, registry_mock_query_f) @@ -90,7 +91,7 @@ def run( write_updated_docs( client, deferred_updates, - index_name="registry", + index_name=resolve_multitenant_index_name("registry"), ) else: # consume generator to dump bulk updates to sink @@ -98,7 +99,8 @@ def run( pass log.info("Checking indexes for orphaned documents") - for index_name in ["registry", "registry-refs"]: + index_names = [resolve_multitenant_index_name(index_label) for index_label in ["registry", "registry-refs"]] + for index_name in index_names: if log.isEnabledFor(logging.DEBUG): orphaned_docs = get_orphaned_documents(client, registry_mock_query_f, index_name) orphaned_doc_ids = [doc.get("_id") for doc in orphaned_docs] diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index f50951e..3a49722 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -33,6 +33,7 @@ from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY from pds.registrysweepers.utils.db import Update from pds.registrysweepers.utils.db import write_updated_docs +from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name from pds.registrysweepers.utils.misc import bin_elements from pds.registrysweepers.utils.misc import coerce_list_type from pds.registrysweepers.utils.productidentifiers.factory import PdsProductIdentifierFactory @@ -424,7 +425,7 @@ def _get_nonaggregate_ancestry_records_with_chunking( isinstance(err, KeyError) and most_recent_attempted_collection_lidvid not in bundle_ancestry_by_collection_lidvid ): - probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index "registry" for registry-refs doc with id "{doc.get("_id")}"' + probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index {resolve_multitenant_index_name("registry")} for {resolve_multitenant_index_name("registry-refs")} doc with id "{doc.get("_id")}"' elif isinstance(err, ValueError): probable_cause = f'[Probable Cause]: Failed to parse collection and/or product LIDVIDs from document with id "{doc.get("_id")}" in index "{doc.get("_index")}" due to {type(err).__name__}: {err}' else: @@ -488,5 +489,7 @@ def generate_update(doc: RefDocBookkeepingEntry) -> Update: logging.info( f"Updating {len(docs)} registry-refs docs with {SWEEPERS_ANCESTRY_VERSION_METADATA_KEY}={SWEEPERS_ANCESTRY_VERSION}" ) - write_updated_docs(client, updates, index_name="registry-refs", bulk_chunk_max_update_count=20000) + write_updated_docs( + client, updates, index_name=resolve_multitenant_index_name("registry-refs"), bulk_chunk_max_update_count=20000 + ) logging.info("registry-refs metadata update complete") diff --git a/src/pds/registrysweepers/ancestry/queries.py b/src/pds/registrysweepers/ancestry/queries.py index 612fb03..288b4a6 100644 --- a/src/pds/registrysweepers/ancestry/queries.py +++ b/src/pds/registrysweepers/ancestry/queries.py @@ -13,6 +13,7 @@ from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY from pds.registrysweepers.utils.db import get_query_hits_count from pds.registrysweepers.utils.db import query_registry_db_or_mock +from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid @@ -41,7 +42,7 @@ def get_bundle_ancestry_records_query(client: OpenSearch, db_mock: DbMockTypeDef query = product_class_query_factory(ProductClass.BUNDLE) _source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]} query_f = query_registry_db_or_mock(db_mock, "get_bundle_ancestry_records", use_search_after=True) - docs = query_f(client, "registry", query, _source) + docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source) return docs @@ -50,7 +51,7 @@ def get_collection_ancestry_records_bundles_query(client: OpenSearch, db_mock: D query = product_class_query_factory(ProductClass.BUNDLE) _source = {"includes": ["lidvid", "ref_lid_collection"]} query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_bundles", use_search_after=True) - docs = query_f(client, "registry", query, _source) + docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source) return docs @@ -62,7 +63,7 @@ def get_collection_ancestry_records_collections_query( query = product_class_query_factory(ProductClass.COLLECTION) _source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]} query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_collections", use_search_after=True) - docs = query_f(client, "registry", query, _source) + docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source) return docs @@ -83,7 +84,7 @@ def get_nonaggregate_ancestry_records_query(client: OpenSearch, registry_db_mock # each document will have many product lidvids, so a smaller page size is warranted here docs = query_f( client, - "registry-refs", + resolve_multitenant_index_name("registry-refs"), query, _source, page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size, @@ -117,7 +118,7 @@ def get_nonaggregate_ancestry_records_for_collection_lid_query( # each document will have many product lidvids, so a smaller page size is warranted here docs = query_f( client, - "registry-refs", + resolve_multitenant_index_name("registry-refs"), query, _source, page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size, @@ -142,7 +143,7 @@ def get_orphaned_documents(client: OpenSearch, registry_db_mock: DbMockTypeDef, query_f = query_registry_db_or_mock(registry_db_mock, "get_orphaned_ancestry_docs", use_search_after=True) sort_fields_override = ( - ["collection_lidvid", "batch_id"] if index_name == "registry-refs" else None + ["collection_lidvid", "batch_id"] if "registry-refs" in index_name else None ) # use default for registry docs = query_f(client, index_name, _orphaned_docs_query, _source, sort_fields=sort_fields_override) @@ -184,7 +185,7 @@ def get_existing_ancestry_for_product( docs = query_f( client, - "registry", + resolve_multitenant_index_name("registry"), query, _source, ) diff --git a/src/pds/registrysweepers/provenance/__init__.py b/src/pds/registrysweepers/provenance/__init__.py index 77f0d65..3f99b50 100644 --- a/src/pds/registrysweepers/provenance/__init__.py +++ b/src/pds/registrysweepers/provenance/__init__.py @@ -58,6 +58,7 @@ from pds.registrysweepers.utils.db import query_registry_db_with_search_after from pds.registrysweepers.utils.db import write_updated_docs from pds.registrysweepers.utils.db.client import get_userpass_opensearch_client +from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name from pds.registrysweepers.utils.db.update import Update from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid @@ -72,7 +73,7 @@ def get_records(client: OpenSearch) -> Iterable[ProvenanceRecord]: } _source = {"includes": ["lidvid", METADATA_SUCCESSOR_KEY, SWEEPERS_PROVENANCE_VERSION_METADATA_KEY]} - docs = query_registry_db_with_search_after(client, "registry", query, _source) + docs = query_registry_db_with_search_after(client, resolve_multitenant_index_name("registry"), query, _source) for doc in docs: try: @@ -140,7 +141,7 @@ def run( write_updated_docs( client, updates, - index_name="registry", + index_name=resolve_multitenant_index_name("registry"), ) log.info("Completed provenance sweeper processing!") diff --git a/src/pds/registrysweepers/repairkit/__init__.py b/src/pds/registrysweepers/repairkit/__init__.py index f4b5e7f..e59ce7a 100644 --- a/src/pds/registrysweepers/repairkit/__init__.py +++ b/src/pds/registrysweepers/repairkit/__init__.py @@ -22,6 +22,7 @@ from pds.registrysweepers.utils.db import write_updated_docs from pds.registrysweepers.utils.db.client import get_userpass_opensearch_client from pds.registrysweepers.utils.db.indexing import ensure_index_mapping +from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name from pds.registrysweepers.utils.db.update import Update """ @@ -96,11 +97,20 @@ def run( # page_size and bulk_chunk_max_update_count constraints are necessary to avoid choking nodes with very-large docs # i.e. ATM and GEO all_docs = query_registry_db_with_search_after( - client, "registry", unprocessed_docs_query, {}, page_size=500, request_timeout_seconds=180 + client, + resolve_multitenant_index_name("registry"), + unprocessed_docs_query, + {}, + page_size=500, + request_timeout_seconds=180, ) updates = generate_updates(all_docs, SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, SWEEPERS_REPAIRKIT_VERSION) - ensure_index_mapping(client, "registry", SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, "integer") - write_updated_docs(client, updates, index_name="registry", bulk_chunk_max_update_count=20000) + ensure_index_mapping( + client, resolve_multitenant_index_name("registry"), SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, "integer" + ) + write_updated_docs( + client, updates, index_name=resolve_multitenant_index_name("registry"), bulk_chunk_max_update_count=20000 + ) log.info("Repairkit sweeper processing complete!") diff --git a/src/pds/registrysweepers/utils/db/client.py b/src/pds/registrysweepers/utils/db/client.py index f9c8490..e2a80f7 100644 --- a/src/pds/registrysweepers/utils/db/client.py +++ b/src/pds/registrysweepers/utils/db/client.py @@ -1,4 +1,5 @@ import json +import logging import os import requests @@ -13,13 +14,32 @@ def get_opensearch_client_from_environment(verify_certs: bool = True) -> OpenSea """Extract necessary details from the existing (at time of development) runtime environment and construct a client""" # TODO: consider re-working these environment variables at some point - endpoint_url = os.environ["PROV_ENDPOINT"] - creds_str = os.environ["PROV_CREDENTIALS"] - creds_dict = json.loads(creds_str) + endpoint_url_env_var_key = "PROV_ENDPOINT" + userpass_env_var_key = "PROV_CREDENTIALS" + iam_role_env_var_key = "SWEEPERS_IAM_ROLE_NAME" - username, password = creds_dict.popitem() + endpoint_url = os.environ.get(endpoint_url_env_var_key) or None + if endpoint_url is None: + raise EnvironmentError(f'env var "{endpoint_url_env_var_key}" is required') - return get_userpass_opensearch_client(endpoint_url, username, password, verify_certs) + creds_str = os.environ.get("PROV_CREDENTIALS") or None + iam_role_name = os.environ.get(iam_role_env_var_key) or None + + if creds_str is not None and iam_role_name is not None: + raise EnvironmentError(f'Only one of env vars ["{userpass_env_var_key}", "{iam_role_env_var_key}"] may be set') + if creds_str is not None: + try: + creds_dict = json.loads(creds_str) + username, password = creds_dict.popitem() + except Exception as err: + logging.error(err) + raise ValueError(f'Failed to parse username/password from PROV_CREDENTIALS value "{creds_str}": {err}') + + return get_userpass_opensearch_client(endpoint_url, username, password, verify_certs) + elif iam_role_name is not None: + return get_aws_aoss_client_from_ssm(endpoint_url, iam_role_name) + else: + raise EnvironmentError(f'One of env vars ["{userpass_env_var_key}", "{iam_role_env_var_key}"] must be set') def get_userpass_opensearch_client( @@ -42,7 +62,10 @@ def get_userpass_opensearch_client( def get_aws_credentials_from_ssm(iam_role_name: str) -> Credentials: - response = requests.get(f"http://169.254.169.254/latest/meta-data/iam/security-credentials/{iam_role_name}") + url = f"http://169.254.169.254/latest/meta-data/iam/security-credentials/{iam_role_name}" + response = requests.get(url) + if response.status_code != 200: + raise RuntimeError(f"Got HTTP{response.status_code} when attempting to retrieve SSM credentials from {url}") content = response.json() access_key_id = content["AccessKeyId"] @@ -56,7 +79,7 @@ def get_aws_credentials_from_ssm(iam_role_name: str) -> Credentials: def get_aws_aoss_client_from_ssm(endpoint_url: str, iam_role_name: str) -> OpenSearch: # https://opensearch.org/blog/aws-sigv4-support-for-clients/ credentials = get_aws_credentials_from_ssm(iam_role_name) - auth = RequestsAWSV4SignerAuth(credentials, "us-west-2") + auth = RequestsAWSV4SignerAuth(credentials, "us-west-2", "aoss") return get_aws_opensearch_client(endpoint_url, auth) diff --git a/src/pds/registrysweepers/utils/db/multitenancy.py b/src/pds/registrysweepers/utils/db/multitenancy.py new file mode 100644 index 0000000..f343b0b --- /dev/null +++ b/src/pds/registrysweepers/utils/db/multitenancy.py @@ -0,0 +1,13 @@ +import os + + +def resolve_multitenant_index_name(index_type: str): + supported_index_types = {"registry", "registry-refs"} + node_id = os.environ.get("MULTITENANCY_NODE_ID", "").strip(" ") + + if node_id == "": + return index_type + elif index_type not in supported_index_types: + raise ValueError(f'index_type "{index_type}" not supported (expected one of {supported_index_types})') + else: + return f"{node_id}-{index_type}"