Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

34 43 remove broken cross-cluster remotes support #44

Merged
merged 7 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ When run against the production OpenSearch instance with ~1.1M products, no cros

The overwhelming bottleneck ops are the O(docs_count) db writes in ancestry.

When run on an 8vCPU, 56GB RAM ECS node, the runtime is significantly longer due to the presence of CCR nodes. Performance has not been benchmarked properly in this context due to the presence of errors preventing actual writes, but in that test, execution time was 1h15m. CPU utilization stayed at/below 1vCPU and memory utilization peaked at ~14GB.


## Code of Conduct

Expand Down
37 changes: 5 additions & 32 deletions docker/sweepers_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import logging
import os
from datetime import datetime
from typing import Callable, Iterable
from typing import Callable

from pds.registrysweepers import provenance, ancestry
from pds.registrysweepers.utils import configure_logging, get_human_readable_elapsed_since, parse_log_level
Expand All @@ -77,7 +77,7 @@
opensearch_endpoint = os.environ.get('PROV_ENDPOINT', '')
if opensearch_endpoint.strip() == '':
raise RuntimeError('Environment variable PROV_ENDPOINT must be provided')
log.info(f'Targeting base OpenSearch endpoint "{opensearch_endpoint}"')
log.info(f'Targeting OpenSearch endpoint "{opensearch_endpoint}"')

try:
provCredentialsStr = os.environ["PROV_CREDENTIALS"]
Expand Down Expand Up @@ -106,40 +106,13 @@ def run_factory(sweeper_f: Callable) -> Callable:
)


def parse_cross_cluster_remotes(env_var_value: str | None) -> Iterable[Iterable[str]] | None:
"""
Given the env var value specifying the CCS remote node-sets, return the value as a list of batches, where each batch
is a list of remotes to be processed at the same time. Returns None if the value is not set, empty, or specifies an
empty list of remotes.
"""

if not env_var_value:
return None

content = json.loads(env_var_value)
if len(content) < 1:
return None

return [batch.split() for batch in content]


run_provenance = run_factory(provenance.run)
run_ancestry = run_factory(ancestry.run)

cross_cluster_remote_node_batches = parse_cross_cluster_remotes(os.environ.get("PROV_REMOTES"))
log.info('Running sweepers')
execution_begin = datetime.now()
if cross_cluster_remote_node_batches is None:
log.info('No CCS remotes specified - running sweepers against base OpenSearch endpoint only')
run_provenance()
run_ancestry()
else:
log.info(f'CCS remotes specified: {json.dumps(cross_cluster_remote_node_batches)}')
for cross_cluster_remotes in cross_cluster_remote_node_batches:
targets_msg_str = f'base OpenSearch and the following remotes: {json.dumps(cross_cluster_remotes)}'
log.info(f'Running sweepers against {targets_msg_str}')
run_provenance(cross_cluster_remotes=cross_cluster_remotes)
run_ancestry(cross_cluster_remotes=cross_cluster_remotes)
log.info(f'Successfully ran sweepers against base OpenSearch and {targets_msg_str}')

run_provenance()
run_ancestry()

log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(execution_begin)}')
4 changes: 1 addition & 3 deletions src/pds/registrysweepers/ancestry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def run(
base_url: str,
username: str,
password: str,
cross_cluster_remotes=None,
verify_host_certs: bool = True,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
Expand All @@ -40,7 +39,7 @@ def run(

log.info("Starting ancestry sweeper")

host = Host(cross_cluster_remotes or [], password, base_url, username, verify_host_certs)
host = Host(password, base_url, username, verify_host_certs)

bundle_records = get_bundle_ancestry_records(host, registry_mock_query_f)
collection_records = list(get_collection_ancestry_records(host, registry_mock_query_f))
Expand Down Expand Up @@ -97,7 +96,6 @@ def run(
base_url=args.base_URL,
username=args.username,
password=args.password,
cross_cluster_remotes=args.cluster_nodes,
verify_host_certs=not args.insecure,
log_level=args.log_level,
log_filepath=args.log_file,
Expand Down
10 changes: 1 addition & 9 deletions src/pds/registrysweepers/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,13 @@
# It is important to note that the document is updated, not any dependent
# index.
#
import json
import logging
import urllib.parse
from typing import Dict
from typing import Iterable
from typing import List
from typing import Mapping
from typing import Union

import requests
from pds.registrysweepers.utils import _vid_as_tuple_of_int
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils import get_extant_lidvids
Expand All @@ -65,7 +62,6 @@ def run(
base_url: str,
username: str,
password: str,
cross_cluster_remotes=None,
verify_host_certs: bool = True,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
Expand All @@ -74,7 +70,7 @@ def run(

log.info("starting CLI processing")

host = Host(cross_cluster_remotes or [], password, base_url, username, verify_host_certs)
host = Host(password, base_url, username, verify_host_certs)

extant_lidvids = get_extant_lidvids(host)
successors = get_successors_by_lidvid(extant_lidvids)
Expand Down Expand Up @@ -138,9 +134,6 @@ def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]

registrysweepers.py --help

- command for opensearch running in a cluster

registrysweepers.py -b https://search.us-west-2.es.amazonaws.com -c remote1 remote2 remote3 remote4 -u admin -p admin
"""

args = parse_args(description=cli_description, epilog=cli_epilog)
Expand All @@ -149,7 +142,6 @@ def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]
base_url=args.base_URL,
username=args.username,
password=args.password,
cross_cluster_remotes=args.cluster_nodes,
verify_host_certs=not args.insecure,
log_level=args.log_level,
log_filepath=args.log_file,
Expand Down
16 changes: 4 additions & 12 deletions src/pds/registrysweepers/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from retry import retry
from retry.api import retry_call

Host = collections.namedtuple("Host", ["cross_cluster_remotes", "password", "url", "username", "verify"])
Host = collections.namedtuple("Host", ["password", "url", "username", "verify"])

log = logging.getLogger(__name__)

Expand All @@ -37,13 +37,6 @@ def parse_args(description: str = "", epilog: str = "") -> Namespace:
formatter_class=argparse.RawDescriptionHelpFormatter,
)
ap.add_argument("-b", "--base-URL", required=True, type=str)
ap.add_argument(
"-c",
"--ccs-remotes",
default=[],
nargs="*",
help="names of additional opensearch cross-cluster remotes, space-separated",
)
ap.add_argument("-l", "--log-file", default=None, required=False, help="file to write the log messages")
ap.add_argument(
"-L",
Expand Down Expand Up @@ -120,8 +113,8 @@ def query_registry_db(

log.info(f"Initiating query: {req_content}")

cross_cluster_indexes = [f"{node}:{index_name}" for node in host.cross_cluster_remotes]
path = ",".join([index_name] + cross_cluster_indexes) + f"/_search?scroll={scroll_keepalive_minutes}m"
path = f"{index_name}/_search?scroll={scroll_keepalive_minutes}m"

served_hits = 0

last_info_log_at_percentage = 0
Expand Down Expand Up @@ -242,9 +235,8 @@ def write_updated_docs(host: Host, ids_and_updates: Mapping[str, Dict], index_na

@retry(exceptions=(HTTPError, RuntimeError), tries=4, delay=2, backoff=2, logger=log)
def _write_bulk_updates_chunk(host: Host, index_name: str, bulk_updates: Iterable[str]):
cross_cluster_indexes = [f"{node}:{index_name}" for node in host.cross_cluster_remotes]
headers = {"Content-Type": "application/x-ndjson"}
path = ",".join([index_name] + cross_cluster_indexes) + "/_bulk"
path = f"{index_name}/_bulk"

bulk_data = "\n".join(bulk_updates) + "\n"

Expand Down
2 changes: 1 addition & 1 deletion tests/pds/registrysweepers/test_ancestry.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class AncestryLegacyTypesTestCase(unittest.TestCase):
registry_query_mock = RegistryQueryMock(input_file_path)

def test_collection_refs_parsing(self):
host_stub = Host(None, None, None, None, None)
host_stub = Host(None, None, None, None)
query_mock_f = self.registry_query_mock.get_mocked_query
collection_ancestry_records = list(get_collection_ancestry_records(host_stub, query_mock_f))

Expand Down