diff --git a/osbenchmark/telemetry.py b/osbenchmark/telemetry.py index d2f4e550b..69d74be45 100644 --- a/osbenchmark/telemetry.py +++ b/osbenchmark/telemetry.py @@ -27,7 +27,8 @@ import fnmatch import os import threading - +from collections import deque +import elasticsearch import tabulate from osbenchmark import metrics, time, exceptions @@ -35,7 +36,6 @@ from osbenchmark.utils import io, sysstats, console, opts, process from osbenchmark.utils.versions import components - def list_telemetry(): console.println("Available telemetry devices:\n") devices = [[device.command, device.human_name, device.help] for device in [JitCompiler, Gc, FlightRecorder, @@ -318,12 +318,12 @@ class CcrStats(TelemetryDevice): internal = False command = "ccr-stats" human_name = "CCR Stats" - help = "Regularly samples Cross Cluster Replication (CCR) related stats" + help = ("Regularly samples Cross Cluster Replication (CCR) leader and follower(s) checkpoint at index level" + "and calculates replication lag") """ Gathers CCR stats on a cluster level """ - def __init__(self, telemetry_params, clients, metrics_store): """ :param telemetry_params: The configuration object for telemetry_params. @@ -337,7 +337,6 @@ def __init__(self, telemetry_params, clients, metrics_store): :param metrics_store: The configured metrics store we write to. """ super().__init__() - self.telemetry_params = telemetry_params self.clients = clients self.sample_interval = telemetry_params.get("ccr-stats-sample-interval", 1) @@ -346,6 +345,7 @@ def __init__(self, telemetry_params, clients, metrics_store): "The telemetry parameter 'ccr-stats-sample-interval' must be greater than zero but was {}.".format(self.sample_interval)) self.specified_cluster_names = self.clients.keys() self.indices_per_cluster = self.telemetry_params.get("ccr-stats-indices", False) + self.max_replication_lag_seconds = self.telemetry_params.get("ccr-max-replication-lag-seconds", 60*60*60) if self.indices_per_cluster: for cluster_name in self.indices_per_cluster.keys(): if cluster_name not in clients: @@ -362,6 +362,7 @@ def on_benchmark_start(self): recorder = [] for cluster_name in self.specified_cluster_names: recorder = CcrStatsRecorder(cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval, + self.max_replication_lag_seconds, self.indices_per_cluster[cluster_name] if self.indices_per_cluster else None) sampler = SamplerThread(recorder) self.samplers.append(sampler) @@ -380,7 +381,7 @@ class CcrStatsRecorder: Collects and pushes CCR stats for the specified cluster to the metric store. """ - def __init__(self, cluster_name, client, metrics_store, sample_interval, indices=None): + def __init__(self, cluster_name, client, metrics_store, sample_interval, max_replication_lag_seconds, indices=None): """ :param cluster_name: The cluster_name that the client connects to, as specified in target.hosts. :param client: The OpenSearch client for this cluster. @@ -395,6 +396,9 @@ def __init__(self, cluster_name, client, metrics_store, sample_interval, indices self.sample_interval= sample_interval self.indices = indices self.logger = logging.getLogger(__name__) + self.leader_checkpoints = dict() + for index in self.indices: + self.leader_checkpoints[index] = deque(maxlen = int(max_replication_lag_seconds/sample_interval)) def __str__(self): return "ccr stats" @@ -403,54 +407,80 @@ def record(self): """ Collect CCR stats for indexes (optionally) specified in telemetry parameters and push to metrics store. """ + if self.cluster_name == "default": + self.log_leader_stats() + else: + self.log_ccr_lag_per_index() + self.log_follower_stats() - # ES returns all stats values in bytes or ms via "human: false" - - # pylint: disable=import-outside-toplevel - import elasticsearch + def log_leader_stats(self): + try: + stats = self.client.transport.perform_request("GET", "/_plugins/_replication/leader_stats") + self.record_cluster_level_stats(stats) + except elasticsearch.TransportError: + msg = "A transport error occurred while collecting CCR leader stats" + self.logger.exception(msg) + raise exceptions.BenchmarkError(msg) + def log_follower_stats(self): try: - ccr_stats_api_endpoint = "/_ccr/stats" - filter_path = "follow_stats" - stats = self.client.transport.perform_request("GET", ccr_stats_api_endpoint, params={"human": "false", - "filter_path": filter_path}) + stats = self.client.transport.perform_request("GET", "/_plugins/_replication/follower_stats") + self.record_cluster_level_stats(stats) except elasticsearch.TransportError: - msg = "A transport error occurred while collecting CCR stats from the endpoint [{}?filter_path={}] on " \ - "cluster [{}]".format(ccr_stats_api_endpoint, filter_path, self.cluster_name) + msg = "A transport error occurred while collecting follower stats for remote cluster: {}".format(self.cluster_name) self.logger.exception(msg) raise exceptions.BenchmarkError(msg) - if filter_path in stats and "indices" in stats[filter_path]: - for indices in stats[filter_path]["indices"]: - try: - if self.indices and indices["index"] not in self.indices: - # Skip metrics for indices not part of user supplied whitelist (ccr-stats-indices) in telemetry params. - continue - self.record_stats_per_index(indices["index"], indices["shards"]) - except KeyError: - self.logger.warning( - "The 'indices' key in %s does not contain an 'index' or 'shards' key " - "Maybe the output format of the %s endpoint has changed. Skipping.", ccr_stats_api_endpoint, ccr_stats_api_endpoint - ) + + def log_ccr_lag_per_index(self): + for index in self.indices: + try: + stats = self.client.transport.perform_request("GET", "/_plugins/_replication/" + index + "/_status") + if stats["status"] == "SYNCING": + self.record_stats_per_index(index, stats) + else: + self.logger.info("CCR Status is not syncing. Ignoring for now!") + except elasticsearch.TransportError: + msg = "A transport error occurred while collecting CCR stats for remote cluster: {}".format(self.cluster_name) + self.logger.exception(msg) + raise exceptions.BenchmarkError(msg) def record_stats_per_index(self, name, stats): """ :param name: The index name. :param stats: A dict with returned CCR stats for the index. """ + doc = { + "name": "ccr-status", + "index": name, + "leader_checkpoint": stats["syncing_details"]["leader_checkpoint"], + "follower_checkpoint": stats["syncing_details"]["follower_checkpoint"], + "replication_lag": self.calculate_lag(name, stats["syncing_details"]["leader_checkpoint"], + stats["syncing_details"]["follower_checkpoint"]) + } + index_metadata = { + "cluster": self.cluster_name, + "index": name + } + self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=index_metadata) + + def record_cluster_level_stats(self, data): + metadata = { + "cluster": self.cluster_name, + } + self.metrics_store.put_doc(data, level=MetaInfoScope.cluster, meta_data=metadata) + + def calculate_lag(self, index, leader_checkpoint, follower_checkpoint): + leader_checkpoint_queue = self.leader_checkpoints[index] + + while(leader_checkpoint_queue and leader_checkpoint_queue[0] <= follower_checkpoint): + leader_checkpoint_queue.popleft() + + if leader_checkpoint > follower_checkpoint: + leader_checkpoint_queue.append(leader_checkpoint) + return len(leader_checkpoint_queue) * self.sample_interval - for shard_stats in stats: - if "shard_id" in shard_stats: - doc = { - "name": "ccr-stats", - "shard": shard_stats - } - shard_metadata = { - "cluster": self.cluster_name, - "index": name - } - self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=shard_metadata) class RecoveryStats(TelemetryDevice): @@ -551,9 +581,6 @@ def record(self): """ Collect recovery stats for indexes (optionally) specified in telemetry parameters and push to metrics store. """ - # pylint: disable=import-outside-toplevel - import elasticsearch - try: stats = self.client.indices.recovery(index=self.indices, active_only=True, detailed=False) except elasticsearch.TransportError: @@ -751,8 +778,6 @@ def indexing_pressure(self, node_name, node_stats): return self.flatten_stats_fields(prefix="indexing_pressure", stats=node_stats["indexing_pressure"]) def sample(self): - # pylint: disable=import-outside-toplevel - import elasticsearch try: stats = self.client.nodes.stats(metric="_all") except elasticsearch.TransportError: @@ -862,10 +887,6 @@ def record_final(self): def _record(self, prefix=""): # ES returns all stats values in bytes or ms via "human: false" - - # pylint: disable=import-outside-toplevel - import elasticsearch - try: stats = self.client.transform.get_transform_stats("_all") @@ -1062,9 +1083,6 @@ def record(self): Collect searchable snapshots stats for indexes (optionally) specified in telemetry parameters and push to metrics store. """ - # pylint: disable=import-outside-toplevel - import elasticsearch - try: stats_api_endpoint = "/_searchable_snapshots/stats" level = "indices" if self.indices else "cluster" @@ -1410,8 +1428,6 @@ def on_benchmark_stop(self): def jvm_stats(self): self.logger.debug("Gathering JVM stats") jvm_stats = {} - # pylint: disable=import-outside-toplevel - import elasticsearch try: stats = self.client.nodes.stats(metric="_all") except elasticsearch.TransportError: @@ -1575,8 +1591,6 @@ def __init__(self, client, metrics_store): self.metrics_store = metrics_store def on_benchmark_stop(self): - # pylint: disable=import-outside-toplevel - import elasticsearch try: results = self.client.search(index=".ml-anomalies-*", body={ "size": 0, diff --git a/samples/ccr/docker-compose-metricstore.yml b/samples/ccr/docker-compose-metricstore.yml new file mode 100644 index 000000000..58989754f --- /dev/null +++ b/samples/ccr/docker-compose-metricstore.yml @@ -0,0 +1,44 @@ +# Creates an Opensearch cluster to publish the Opensearch Benchmark metrics. +version: '3' +services: + metricstore-node: + image: opensearchproject/opensearch:latest + container_name: metricstore-node + environment: + - cluster.name=opensearch-metricstore-cluster + - node.name=metricstore-node + - discovery.seed_hosts=metricstore-node + - cluster.initial_master_nodes=metricstore-node + - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping + - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems + hard: 65536 + volumes: + - metricstore-volume:/usr/share/opensearch/data + + ports: + - 9209:9200 + networks: + - opensearch-net-metrics + opensearch-dashboards: + image: opensearchproject/opensearch-dashboards:latest + container_name: opensearch-dashboards + ports: + - 5601:5601 + expose: + - "5601" + environment: + OPENSEARCH_HOSTS: '["https://metricstore-node:9200"]' + networks: + - opensearch-net-metrics + +volumes: + metricstore-volume: + +networks: + opensearch-net-metrics: \ No newline at end of file diff --git a/samples/ccr/docker-compose.yml b/samples/ccr/docker-compose.yml new file mode 100644 index 000000000..ab241b74f --- /dev/null +++ b/samples/ccr/docker-compose.yml @@ -0,0 +1,59 @@ +# Creates 2 single node Opensearch cluster(one leader and one follower). Leader cluster is used for ingesting the data which is then +# replicated to the follower cluster. +version: '3' +services: + leader-cluster: + image: opensearchproject/opensearch:latest + container_name: leader-cluster + environment: + - cluster.name=leader-cluster + - node.name=leader-node + - discovery.seed_hosts=leader-node + - cluster.initial_master_nodes=leader-node + - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping + - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems + hard: 65536 + volumes: + - leader-node-volume:/usr/share/opensearch/data + ports: + - 9200:9200 + - 9300:9300 # required for Performance Analyzer + networks: + - opensearch-net + follower-cluster: + image: opensearchproject/opensearch:latest + container_name: follower-cluster + environment: + - cluster.name=follower-cluster + - node.name=follower-node + - discovery.seed_hosts=follower-node + - cluster.initial_master_nodes=follower-node + - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping + - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems + hard: 65536 + volumes: + - follower-node-volume:/usr/share/opensearch/data + ports: + - 9201:9200 + - 9301:9300 # required for Performance Analyzer + networks: + - opensearch-net + +volumes: + leader-node-volume: + follower-node-volume: + +networks: + opensearch-net: \ No newline at end of file diff --git a/samples/ccr/start.sh b/samples/ccr/start.sh new file mode 100755 index 000000000..6cb8501b2 --- /dev/null +++ b/samples/ccr/start.sh @@ -0,0 +1,150 @@ +#!/usr/bin/env bash + +# This script can be used for creating a cross cluster replication setup between 2 domains and executing a benchmark run. +# Simply execute the script using ./start.sh under the python virtual environment(Refer DEVELOPER_GUIDE.md) +# +# Prerequisite: Docker installed locally. +# Steps: +# 1. Sets up 2 single node (leader and follower) clusters. +# 2. Starts a single node cluster for metrics store. We can use Kibana attached to the metric store cluster to see the metrics.. +# 3. Configures the seed nodes on the follower cluster and starts replication using autofollow pattern. +# 4. Runs the eventdata benchmark on the replication setup. Benchmark metrics can be seen on the Kiabana. +# 5. To tear down everything, execute ./stop.sh. +set -e + + +# Start Opensearch +docker-compose up -d --remove-orphans + +# Start metrics store +docker-compose -f ./docker-compose-metricstore.yml up -d + +printf "Waiting for clusters to get ready " + +# Wait until OS is up +ALL_CLUSTERS_READY=false + +while ! $ALL_CLUSTERS_READY; do + (curl -ks -u admin:admin https://localhost:9200 -o /dev/null && curl -ks -u admin:admin https://localhost:9201 -o /dev/null && ALL_CLUSTERS_READY=true) || (printf "." && sleep 5) +done + +echo + +# Configure the seed nodes on follower cluster +# TODO: Update the seed node to private IP. +echo "Configure remotes on follower" +curl -o /dev/null -H 'Content-Type: application/json' -k -u admin:admin -X PUT https://localhost:9201/_cluster/settings -d @- <<-EOF + { + "persistent" : { + "cluster" : { + "remote" : { + "source" : { + "seeds" : [ + "127.0.0.1:9300" + ] + } + } + } + } + } +EOF + +echo "Set auto-follow pattern on follower for every index on leader" +curl -H 'Content-Type: application/json' -k -u admin:admin https://localhost:9201/_plugins/_replication/_autofollow -d @- <<-EOF +{ + "leader_alias": "source", + "name": "all", + "pattern": "eventdata*", + "use_roles": { + "leader_cluster_role": "all_access", + "follower_cluster_role": "all_access" + } +} +EOF + + +# Create target-hosts file for OSB. +cat >ccr-target-hosts.json <<'EOF' +{ + "default": [ + "https://127.0.0.1:9200" + ], + "follower": [ + "https://127.0.0.1:9201" + ] +} +EOF + +cat >ccr-telemetry-param.json <<'EOF' +{ + "ccr-stats-sample-interval": 1, + "ccr-stats-indices": { + "follower": ["eventdata"] + }, + "ccr-max-replication-lag-seconds": 36000 +} +EOF + +cat >ccr-client-options.json <<'EOF' +{ + "default": { + "use_ssl":"true", + "basic_auth_user":"admin", + "basic_auth_password":"admin", + "verify_certs":"false" + }, + "follower": { + "use_ssl":"true", + "basic_auth_user":"admin", + "basic_auth_password":"admin", + "verify_certs":"false" + } +} +EOF + + +# Create metricstore ini file +cat >${HOME}/.benchmark/benchmark.ini <