Skip to content

Commit

Permalink
Updating CCR telemetry device for OpenSearch.
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala committed Feb 1, 2022
1 parent 68e2a40 commit 29b1c48
Show file tree
Hide file tree
Showing 7 changed files with 673 additions and 383 deletions.
124 changes: 69 additions & 55 deletions osbenchmark/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
import fnmatch
import os
import threading

from collections import deque
import elasticsearch
import tabulate

from osbenchmark import metrics, time, exceptions
from osbenchmark.metrics import MetaInfoScope
from osbenchmark.utils import io, sysstats, console, opts, process
from osbenchmark.utils.versions import components


def list_telemetry():
console.println("Available telemetry devices:\n")
devices = [[device.command, device.human_name, device.help] for device in [JitCompiler, Gc, FlightRecorder,
Expand Down Expand Up @@ -318,12 +318,12 @@ class CcrStats(TelemetryDevice):
internal = False
command = "ccr-stats"
human_name = "CCR Stats"
help = "Regularly samples Cross Cluster Replication (CCR) related stats"
help = ("Regularly samples Cross Cluster Replication (CCR) leader and follower(s) checkpoint at index level"
"and calculates replication lag")

"""
Gathers CCR stats on a cluster level
"""

def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
Expand All @@ -337,7 +337,6 @@ def __init__(self, telemetry_params, clients, metrics_store):
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()

self.telemetry_params = telemetry_params
self.clients = clients
self.sample_interval = telemetry_params.get("ccr-stats-sample-interval", 1)
Expand All @@ -346,6 +345,7 @@ def __init__(self, telemetry_params, clients, metrics_store):
"The telemetry parameter 'ccr-stats-sample-interval' must be greater than zero but was {}.".format(self.sample_interval))
self.specified_cluster_names = self.clients.keys()
self.indices_per_cluster = self.telemetry_params.get("ccr-stats-indices", False)
self.max_replication_lag_seconds = self.telemetry_params.get("ccr-max-replication-lag-seconds", 60*60*60)
if self.indices_per_cluster:
for cluster_name in self.indices_per_cluster.keys():
if cluster_name not in clients:
Expand All @@ -362,6 +362,7 @@ def on_benchmark_start(self):
recorder = []
for cluster_name in self.specified_cluster_names:
recorder = CcrStatsRecorder(cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval,
self.max_replication_lag_seconds,
self.indices_per_cluster[cluster_name] if self.indices_per_cluster else None)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
Expand All @@ -380,7 +381,7 @@ class CcrStatsRecorder:
Collects and pushes CCR stats for the specified cluster to the metric store.
"""

def __init__(self, cluster_name, client, metrics_store, sample_interval, indices=None):
def __init__(self, cluster_name, client, metrics_store, sample_interval, max_replication_lag_seconds, indices=None):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts.
:param client: The OpenSearch client for this cluster.
Expand All @@ -395,6 +396,9 @@ def __init__(self, cluster_name, client, metrics_store, sample_interval, indices
self.sample_interval= sample_interval
self.indices = indices
self.logger = logging.getLogger(__name__)
self.leader_checkpoints = dict()
for index in self.indices:
self.leader_checkpoints[index] = deque(maxlen = int(max_replication_lag_seconds/sample_interval))

def __str__(self):
return "ccr stats"
Expand All @@ -403,54 +407,80 @@ def record(self):
"""
Collect CCR stats for indexes (optionally) specified in telemetry parameters and push to metrics store.
"""
if self.cluster_name == "default":
self.log_leader_stats()
else:
self.log_ccr_lag_per_index()
self.log_follower_stats()

# ES returns all stats values in bytes or ms via "human: false"

# pylint: disable=import-outside-toplevel
import elasticsearch
def log_leader_stats(self):
try:
stats = self.client.transport.perform_request("GET", "/_plugins/_replication/leader_stats")
self.record_cluster_level_stats(stats)
except elasticsearch.TransportError:
msg = "A transport error occurred while collecting CCR leader stats"
self.logger.exception(msg)
raise exceptions.BenchmarkError(msg)

def log_follower_stats(self):
try:
ccr_stats_api_endpoint = "/_ccr/stats"
filter_path = "follow_stats"
stats = self.client.transport.perform_request("GET", ccr_stats_api_endpoint, params={"human": "false",
"filter_path": filter_path})
stats = self.client.transport.perform_request("GET", "/_plugins/_replication/follower_stats")
self.record_cluster_level_stats(stats)
except elasticsearch.TransportError:
msg = "A transport error occurred while collecting CCR stats from the endpoint [{}?filter_path={}] on " \
"cluster [{}]".format(ccr_stats_api_endpoint, filter_path, self.cluster_name)
msg = "A transport error occurred while collecting follower stats for remote cluster: {}".format(self.cluster_name)
self.logger.exception(msg)
raise exceptions.BenchmarkError(msg)

if filter_path in stats and "indices" in stats[filter_path]:
for indices in stats[filter_path]["indices"]:
try:
if self.indices and indices["index"] not in self.indices:
# Skip metrics for indices not part of user supplied whitelist (ccr-stats-indices) in telemetry params.
continue
self.record_stats_per_index(indices["index"], indices["shards"])
except KeyError:
self.logger.warning(
"The 'indices' key in %s does not contain an 'index' or 'shards' key "
"Maybe the output format of the %s endpoint has changed. Skipping.", ccr_stats_api_endpoint, ccr_stats_api_endpoint
)

def log_ccr_lag_per_index(self):
for index in self.indices:
try:
stats = self.client.transport.perform_request("GET", "/_plugins/_replication/" + index + "/_status")
if stats["status"] == "SYNCING":
self.record_stats_per_index(index, stats)
else:
self.logger.info("CCR Status is not syncing. Ignoring for now!")
except elasticsearch.TransportError:
msg = "A transport error occurred while collecting CCR stats for remote cluster: {}".format(self.cluster_name)
self.logger.exception(msg)
raise exceptions.BenchmarkError(msg)

def record_stats_per_index(self, name, stats):
"""
:param name: The index name.
:param stats: A dict with returned CCR stats for the index.
"""
doc = {
"name": "ccr-status",
"index": name,
"leader_checkpoint": stats["syncing_details"]["leader_checkpoint"],
"follower_checkpoint": stats["syncing_details"]["follower_checkpoint"],
"replication_lag": self.calculate_lag(name, stats["syncing_details"]["leader_checkpoint"],
stats["syncing_details"]["follower_checkpoint"])
}
index_metadata = {
"cluster": self.cluster_name,
"index": name
}
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=index_metadata)

def record_cluster_level_stats(self, data):
metadata = {
"cluster": self.cluster_name,
}
self.metrics_store.put_doc(data, level=MetaInfoScope.cluster, meta_data=metadata)

def calculate_lag(self, index, leader_checkpoint, follower_checkpoint):
leader_checkpoint_queue = self.leader_checkpoints[index]

while(leader_checkpoint_queue and leader_checkpoint_queue[0] <= follower_checkpoint):
leader_checkpoint_queue.popleft()

if leader_checkpoint > follower_checkpoint:
leader_checkpoint_queue.append(leader_checkpoint)
return len(leader_checkpoint_queue) * self.sample_interval

for shard_stats in stats:
if "shard_id" in shard_stats:
doc = {
"name": "ccr-stats",
"shard": shard_stats
}
shard_metadata = {
"cluster": self.cluster_name,
"index": name
}

self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=shard_metadata)


class RecoveryStats(TelemetryDevice):
Expand Down Expand Up @@ -551,9 +581,6 @@ def record(self):
"""
Collect recovery stats for indexes (optionally) specified in telemetry parameters and push to metrics store.
"""
# pylint: disable=import-outside-toplevel
import elasticsearch

try:
stats = self.client.indices.recovery(index=self.indices, active_only=True, detailed=False)
except elasticsearch.TransportError:
Expand Down Expand Up @@ -751,8 +778,6 @@ def indexing_pressure(self, node_name, node_stats):
return self.flatten_stats_fields(prefix="indexing_pressure", stats=node_stats["indexing_pressure"])

def sample(self):
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
stats = self.client.nodes.stats(metric="_all")
except elasticsearch.TransportError:
Expand Down Expand Up @@ -862,10 +887,6 @@ def record_final(self):

def _record(self, prefix=""):
# ES returns all stats values in bytes or ms via "human: false"

# pylint: disable=import-outside-toplevel
import elasticsearch

try:
stats = self.client.transform.get_transform_stats("_all")

Expand Down Expand Up @@ -1062,9 +1083,6 @@ def record(self):
Collect searchable snapshots stats for indexes (optionally) specified in telemetry parameters
and push to metrics store.
"""
# pylint: disable=import-outside-toplevel
import elasticsearch

try:
stats_api_endpoint = "/_searchable_snapshots/stats"
level = "indices" if self.indices else "cluster"
Expand Down Expand Up @@ -1410,8 +1428,6 @@ def on_benchmark_stop(self):
def jvm_stats(self):
self.logger.debug("Gathering JVM stats")
jvm_stats = {}
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
stats = self.client.nodes.stats(metric="_all")
except elasticsearch.TransportError:
Expand Down Expand Up @@ -1575,8 +1591,6 @@ def __init__(self, client, metrics_store):
self.metrics_store = metrics_store

def on_benchmark_stop(self):
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
results = self.client.search(index=".ml-anomalies-*", body={
"size": 0,
Expand Down
44 changes: 44 additions & 0 deletions samples/ccr/docker-compose-metricstore.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Creates an Opensearch cluster to publish the Opensearch Benchmark metrics.
version: '3'
services:
metricstore-node:
image: opensearchproject/opensearch:latest
container_name: metricstore-node
environment:
- cluster.name=opensearch-metricstore-cluster
- node.name=metricstore-node
- discovery.seed_hosts=metricstore-node
- cluster.initial_master_nodes=metricstore-node
- bootstrap.memory_lock=true # along with the memlock settings below, disables swapping
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems
hard: 65536
volumes:
- metricstore-volume:/usr/share/opensearch/data

ports:
- 9209:9200
networks:
- opensearch-net-metrics
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:latest
container_name: opensearch-dashboards
ports:
- 5601:5601
expose:
- "5601"
environment:
OPENSEARCH_HOSTS: '["https://metricstore-node:9200"]'
networks:
- opensearch-net-metrics

volumes:
metricstore-volume:

networks:
opensearch-net-metrics:
59 changes: 59 additions & 0 deletions samples/ccr/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Creates 2 single node Opensearch cluster(one leader and one follower). Leader cluster is used for ingesting the data which is then
# replicated to the follower cluster.
version: '3'
services:
leader-cluster:
image: opensearchproject/opensearch:latest
container_name: leader-cluster
environment:
- cluster.name=leader-cluster
- node.name=leader-node
- discovery.seed_hosts=leader-node
- cluster.initial_master_nodes=leader-node
- bootstrap.memory_lock=true # along with the memlock settings below, disables swapping
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems
hard: 65536
volumes:
- leader-node-volume:/usr/share/opensearch/data
ports:
- 9200:9200
- 9300:9300 # required for Performance Analyzer
networks:
- opensearch-net
follower-cluster:
image: opensearchproject/opensearch:latest
container_name: follower-cluster
environment:
- cluster.name=follower-cluster
- node.name=follower-node
- discovery.seed_hosts=follower-node
- cluster.initial_master_nodes=follower-node
- bootstrap.memory_lock=true # along with the memlock settings below, disables swapping
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems
hard: 65536
volumes:
- follower-node-volume:/usr/share/opensearch/data
ports:
- 9201:9200
- 9301:9300 # required for Performance Analyzer
networks:
- opensearch-net

volumes:
leader-node-volume:
follower-node-volume:

networks:
opensearch-net:
Loading

0 comments on commit 29b1c48

Please sign in to comment.