Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement multitenancy support #130

Merged
merged 6 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The ancestry sweeper generates membership metadata for each product, i.e. which

#### Environment Variables
```
MULTITENANCY_NODE_ID= // If running in a multitenant environment, the id of the node, used to distinguish registry/registry-refs index instances
PROV_CREDENTIALS={"admin": "admin"} // OpenSearch username/password
PROV_ENDPOINT=https://localhost:9200 // OpenSearch host url and port
LOGLEVEL - an integer log level or anycase string matching a python log level like `INFO` (optional - defaults to `INFO`))
Expand Down
20 changes: 1 addition & 19 deletions docker/sweepers_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Python driver for provenance
# Python driver for provenance (OUTDATED TODO: Update documentation)
# ============================
#
# This script is provided to support the scheduled execution of PDS Registry
Expand Down Expand Up @@ -78,24 +78,6 @@

urllib3.disable_warnings()

opensearch_endpoint = os.environ.get('PROV_ENDPOINT', '')
if opensearch_endpoint.strip() == '':
raise RuntimeError('Environment variable PROV_ENDPOINT must be provided')
log.info(f'Targeting OpenSearch endpoint "{opensearch_endpoint}"')

try:
provCredentialsStr = os.environ["PROV_CREDENTIALS"]
except KeyError:
raise RuntimeError('Environment variable PROV_CREDENTIALS must be provided')

try:
provCredentials = json.loads(provCredentialsStr)
username = list(provCredentials.keys())[0]
password = provCredentials[username]
except Exception as err:
logging.error(err)
raise ValueError(f'Failed to parse username/password from PROV_CREDENTIALS value "{provCredentialsStr}": {err}')

log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO'))


Expand Down
12 changes: 7 additions & 5 deletions src/pds/registrysweepers/ancestry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.client import get_userpass_opensearch_client
from pds.registrysweepers.utils.db.indexing import ensure_index_mapping
from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name
from pds.registrysweepers.utils.db.update import Update
from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid

Expand Down Expand Up @@ -70,18 +71,18 @@ def run(
METADATA_PARENT_COLLECTION_KEY,
SWEEPERS_ANCESTRY_VERSION_METADATA_KEY,
]:
ensure_index_mapping(client, "registry", metadata_key, "keyword")
ensure_index_mapping(client, resolve_multitenant_index_name("registry"), metadata_key, "keyword")

for metadata_key in [
SWEEPERS_ANCESTRY_VERSION_METADATA_KEY,
]:
ensure_index_mapping(client, "registry-refs", metadata_key, "keyword")
ensure_index_mapping(client, resolve_multitenant_index_name("registry-refs"), metadata_key, "keyword")

log.info("Writing bulk updates to database...")
write_updated_docs(
client,
updates,
index_name="registry",
index_name=resolve_multitenant_index_name("registry"),
)
log.info("Generating updates from deferred records...")
deferred_updates = generate_deferred_updates(client, deferred_records_file.name, registry_mock_query_f)
Expand All @@ -90,15 +91,16 @@ def run(
write_updated_docs(
client,
deferred_updates,
index_name="registry",
index_name=resolve_multitenant_index_name("registry"),
)
else:
# consume generator to dump bulk updates to sink
for _ in updates:
pass

log.info("Checking indexes for orphaned documents")
for index_name in ["registry", "registry-refs"]:
index_names = [resolve_multitenant_index_name(index_label) for index_label in ["registry", "registry-refs"]]
for index_name in index_names:
if log.isEnabledFor(logging.DEBUG):
orphaned_docs = get_orphaned_documents(client, registry_mock_query_f, index_name)
orphaned_doc_ids = [doc.get("_id") for doc in orphaned_docs]
Expand Down
7 changes: 5 additions & 2 deletions src/pds/registrysweepers/ancestry/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY
from pds.registrysweepers.utils.db import Update
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name
from pds.registrysweepers.utils.misc import bin_elements
from pds.registrysweepers.utils.misc import coerce_list_type
from pds.registrysweepers.utils.productidentifiers.factory import PdsProductIdentifierFactory
Expand Down Expand Up @@ -424,7 +425,7 @@ def _get_nonaggregate_ancestry_records_with_chunking(
isinstance(err, KeyError)
and most_recent_attempted_collection_lidvid not in bundle_ancestry_by_collection_lidvid
):
probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index "registry" for registry-refs doc with id "{doc.get("_id")}"'
probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index {resolve_multitenant_index_name("registry")} for {resolve_multitenant_index_name("registry-refs")} doc with id "{doc.get("_id")}"'
elif isinstance(err, ValueError):
probable_cause = f'[Probable Cause]: Failed to parse collection and/or product LIDVIDs from document with id "{doc.get("_id")}" in index "{doc.get("_index")}" due to {type(err).__name__}: {err}'
else:
Expand Down Expand Up @@ -488,5 +489,7 @@ def generate_update(doc: RefDocBookkeepingEntry) -> Update:
logging.info(
f"Updating {len(docs)} registry-refs docs with {SWEEPERS_ANCESTRY_VERSION_METADATA_KEY}={SWEEPERS_ANCESTRY_VERSION}"
)
write_updated_docs(client, updates, index_name="registry-refs", bulk_chunk_max_update_count=20000)
write_updated_docs(
client, updates, index_name=resolve_multitenant_index_name("registry-refs"), bulk_chunk_max_update_count=20000
)
logging.info("registry-refs metadata update complete")
15 changes: 8 additions & 7 deletions src/pds/registrysweepers/ancestry/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY
from pds.registrysweepers.utils.db import get_query_hits_count
from pds.registrysweepers.utils.db import query_registry_db_or_mock
from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name
from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid
from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid

Expand Down Expand Up @@ -41,7 +42,7 @@ def get_bundle_ancestry_records_query(client: OpenSearch, db_mock: DbMockTypeDef
query = product_class_query_factory(ProductClass.BUNDLE)
_source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]}
query_f = query_registry_db_or_mock(db_mock, "get_bundle_ancestry_records", use_search_after=True)
docs = query_f(client, "registry", query, _source)
docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source)

return docs

Expand All @@ -50,7 +51,7 @@ def get_collection_ancestry_records_bundles_query(client: OpenSearch, db_mock: D
query = product_class_query_factory(ProductClass.BUNDLE)
_source = {"includes": ["lidvid", "ref_lid_collection"]}
query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_bundles", use_search_after=True)
docs = query_f(client, "registry", query, _source)
docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source)

return docs

Expand All @@ -62,7 +63,7 @@ def get_collection_ancestry_records_collections_query(
query = product_class_query_factory(ProductClass.COLLECTION)
_source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]}
query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_collections", use_search_after=True)
docs = query_f(client, "registry", query, _source)
docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source)

return docs

Expand All @@ -83,7 +84,7 @@ def get_nonaggregate_ancestry_records_query(client: OpenSearch, registry_db_mock
# each document will have many product lidvids, so a smaller page size is warranted here
docs = query_f(
client,
"registry-refs",
resolve_multitenant_index_name("registry-refs"),
query,
_source,
page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size,
Expand Down Expand Up @@ -117,7 +118,7 @@ def get_nonaggregate_ancestry_records_for_collection_lid_query(
# each document will have many product lidvids, so a smaller page size is warranted here
docs = query_f(
client,
"registry-refs",
resolve_multitenant_index_name("registry-refs"),
query,
_source,
page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size,
Expand All @@ -142,7 +143,7 @@ def get_orphaned_documents(client: OpenSearch, registry_db_mock: DbMockTypeDef,
query_f = query_registry_db_or_mock(registry_db_mock, "get_orphaned_ancestry_docs", use_search_after=True)

sort_fields_override = (
["collection_lidvid", "batch_id"] if index_name == "registry-refs" else None
["collection_lidvid", "batch_id"] if "registry-refs" in index_name else None
) # use default for registry

docs = query_f(client, index_name, _orphaned_docs_query, _source, sort_fields=sort_fields_override)
Expand Down Expand Up @@ -184,7 +185,7 @@ def get_existing_ancestry_for_product(

docs = query_f(
client,
"registry",
resolve_multitenant_index_name("registry"),
query,
_source,
)
Expand Down
5 changes: 3 additions & 2 deletions src/pds/registrysweepers/provenance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from pds.registrysweepers.utils.db import query_registry_db_with_search_after
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.client import get_userpass_opensearch_client
from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name
from pds.registrysweepers.utils.db.update import Update
from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid

Expand All @@ -72,7 +73,7 @@ def get_records(client: OpenSearch) -> Iterable[ProvenanceRecord]:
}
_source = {"includes": ["lidvid", METADATA_SUCCESSOR_KEY, SWEEPERS_PROVENANCE_VERSION_METADATA_KEY]}

docs = query_registry_db_with_search_after(client, "registry", query, _source)
docs = query_registry_db_with_search_after(client, resolve_multitenant_index_name("registry"), query, _source)

for doc in docs:
try:
Expand Down Expand Up @@ -140,7 +141,7 @@ def run(
write_updated_docs(
client,
updates,
index_name="registry",
index_name=resolve_multitenant_index_name("registry"),
)

log.info("Completed provenance sweeper processing!")
Expand Down
16 changes: 13 additions & 3 deletions src/pds/registrysweepers/repairkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.client import get_userpass_opensearch_client
from pds.registrysweepers.utils.db.indexing import ensure_index_mapping
from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name
from pds.registrysweepers.utils.db.update import Update

"""
Expand Down Expand Up @@ -96,11 +97,20 @@ def run(
# page_size and bulk_chunk_max_update_count constraints are necessary to avoid choking nodes with very-large docs
# i.e. ATM and GEO
all_docs = query_registry_db_with_search_after(
client, "registry", unprocessed_docs_query, {}, page_size=500, request_timeout_seconds=180
client,
resolve_multitenant_index_name("registry"),
unprocessed_docs_query,
{},
page_size=500,
request_timeout_seconds=180,
)
updates = generate_updates(all_docs, SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, SWEEPERS_REPAIRKIT_VERSION)
ensure_index_mapping(client, "registry", SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, "integer")
write_updated_docs(client, updates, index_name="registry", bulk_chunk_max_update_count=20000)
ensure_index_mapping(
client, resolve_multitenant_index_name("registry"), SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, "integer"
)
write_updated_docs(
client, updates, index_name=resolve_multitenant_index_name("registry"), bulk_chunk_max_update_count=20000
)

log.info("Repairkit sweeper processing complete!")

Expand Down
37 changes: 30 additions & 7 deletions src/pds/registrysweepers/utils/db/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import os

import requests
Expand All @@ -13,13 +14,32 @@ def get_opensearch_client_from_environment(verify_certs: bool = True) -> OpenSea
"""Extract necessary details from the existing (at time of development) runtime environment and construct a client"""
# TODO: consider re-working these environment variables at some point

endpoint_url = os.environ["PROV_ENDPOINT"]
creds_str = os.environ["PROV_CREDENTIALS"]
creds_dict = json.loads(creds_str)
endpoint_url_env_var_key = "PROV_ENDPOINT"
userpass_env_var_key = "PROV_CREDENTIALS"
iam_role_env_var_key = "SWEEPERS_IAM_ROLE_NAME"

username, password = creds_dict.popitem()
endpoint_url = os.environ.get(endpoint_url_env_var_key) or None
if endpoint_url is None:
raise EnvironmentError(f'env var "{endpoint_url_env_var_key}" is required')

return get_userpass_opensearch_client(endpoint_url, username, password, verify_certs)
creds_str = os.environ.get("PROV_CREDENTIALS") or None
iam_role_name = os.environ.get(iam_role_env_var_key) or None

if creds_str is not None and iam_role_name is not None:
raise EnvironmentError(f'Only one of env vars ["{userpass_env_var_key}", "{iam_role_env_var_key}"] may be set')
if creds_str is not None:
try:
creds_dict = json.loads(creds_str)
username, password = creds_dict.popitem()
except Exception as err:
logging.error(err)
raise ValueError(f'Failed to parse username/password from PROV_CREDENTIALS value "{creds_str}": {err}')

return get_userpass_opensearch_client(endpoint_url, username, password, verify_certs)
elif iam_role_name is not None:
return get_aws_aoss_client_from_ssm(endpoint_url, iam_role_name)
else:
raise EnvironmentError(f'One of env vars ["{userpass_env_var_key}", "{iam_role_env_var_key}"] must be set')


def get_userpass_opensearch_client(
Expand All @@ -42,7 +62,10 @@ def get_userpass_opensearch_client(


def get_aws_credentials_from_ssm(iam_role_name: str) -> Credentials:
response = requests.get(f"http://169.254.169.254/latest/meta-data/iam/security-credentials/{iam_role_name}")
url = f"http://169.254.169.254/latest/meta-data/iam/security-credentials/{iam_role_name}"
response = requests.get(url)
if response.status_code != 200:
raise RuntimeError(f"Got HTTP{response.status_code} when attempting to retrieve SSM credentials from {url}")
content = response.json()

access_key_id = content["AccessKeyId"]
Expand All @@ -56,7 +79,7 @@ def get_aws_credentials_from_ssm(iam_role_name: str) -> Credentials:
def get_aws_aoss_client_from_ssm(endpoint_url: str, iam_role_name: str) -> OpenSearch:
# https://opensearch.org/blog/aws-sigv4-support-for-clients/
credentials = get_aws_credentials_from_ssm(iam_role_name)
auth = RequestsAWSV4SignerAuth(credentials, "us-west-2")
auth = RequestsAWSV4SignerAuth(credentials, "us-west-2", "aoss")
return get_aws_opensearch_client(endpoint_url, auth)


Expand Down
13 changes: 13 additions & 0 deletions src/pds/registrysweepers/utils/db/multitenancy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os


def resolve_multitenant_index_name(index_type: str):
supported_index_types = {"registry", "registry-refs"}
node_id = os.environ.get("MULTITENANCY_NODE_ID", "").strip(" ")

if node_id == "":
return index_type
elif index_type not in supported_index_types:
raise ValueError(f'index_type "{index_type}" not supported (expected one of {supported_index_types})')
else:
return f"{node_id}-{index_type}"
Loading