Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes to merge PA-RTF #651

Merged
merged 36 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4ec8da2
PA RTF merging init
atharvasharma61 Apr 22, 2024
a68d0fb
working model
atharvasharma61 Apr 22, 2024
2fecd36
working model tip
atharvasharma61 Apr 22, 2024
5ec43af
functional model init
atharvasharma61 Apr 22, 2024
b19e35f
Migrated HeapMetricsCollector
atharvasharma61 Apr 25, 2024
791e99a
Merge branch 'opensearch-project:main' into pa-rtf
atharvasharma61 Apr 25, 2024
d77c71f
Added RTFThreadPoolMetricsCollector
atharvasharma61 May 7, 2024
7072a65
migrated NodeStats and DiskMetricsCollector
atharvasharma61 May 13, 2024
8e43e03
Added gauge data model for Heap_Max metric
atharvasharma61 May 17, 2024
4d5dec4
implemented TelemetryAwarePlugin
atharvasharma61 May 20, 2024
ef28130
Merge branch 'opensearch-project:main' into pa-rtf
atharvasharma61 May 30, 2024
49946a6
Framework changes for PA RTF merging
atharvasharma61 May 30, 2024
82e15e7
refactored
atharvasharma61 May 30, 2024
8922651
spotless applied
atharvasharma61 May 30, 2024
a80873f
Addressed small comments
atharvasharma61 May 31, 2024
e39aada
Added different flag for RCA collectors
atharvasharma61 May 31, 2024
97dd4bc
Addressed more comments
atharvasharma61 May 31, 2024
e9b5d57
Added RTF collectors in config map
atharvasharma61 May 31, 2024
538d682
Added UTs
atharvasharma61 May 31, 2024
f182464
Added further UTs
atharvasharma61 Jun 1, 2024
447e15f
Added dynamic control support to all collectors
atharvasharma61 Jun 3, 2024
369bd95
fixed UT
atharvasharma61 Jun 3, 2024
25d66e8
refactoring
atharvasharma61 Jun 3, 2024
84580dc
Revert "refactoring"
atharvasharma61 Jun 4, 2024
16c592e
Revert "fixed UT"
atharvasharma61 Jun 4, 2024
55a7604
Revert "Added dynamic control support to all collectors"
atharvasharma61 Jun 4, 2024
837b15d
Adding two new collector interfaces
atharvasharma61 Jun 4, 2024
96166aa
simplified interfaces
atharvasharma61 Jun 4, 2024
a9a756f
Added units and javadocs
atharvasharma61 Jun 5, 2024
8df65a5
Changes metrics semantic conventions
atharvasharma61 Jun 5, 2024
7213080
refactored
atharvasharma61 Jun 5, 2024
857214b
fixed UT
atharvasharma61 Jun 5, 2024
da1c4ba
Added stats metrics for rtf collectors
atharvasharma61 Jun 6, 2024
1e211f0
reverted test delete
atharvasharma61 Jun 6, 2024
44f0b7c
Fixes javadoc compilation issue
Jun 10, 2024
870e3a5
Merge branch 'opensearch-project:main' into pa-rtf
atharvasharma61 Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.env.Environment;
import org.opensearch.indices.IndicesService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool;

public final class OpenSearchResources {
Expand All @@ -20,6 +21,7 @@ public final class OpenSearchResources {
private CircuitBreakerService circuitBreakerService;
private ClusterService clusterService;
private IndicesService indicesService;
private MetricsRegistry metricsRegistry;
private Settings settings;
private Environment environment;
private java.nio.file.Path configPath;
Expand All @@ -32,6 +34,7 @@ private OpenSearchResources() {
clusterService = null;
settings = null;
indicesService = null;
metricsRegistry = null;
environment = null;
configPath = null;
pluginFileLocation = null;
Expand Down Expand Up @@ -101,6 +104,14 @@ public void setIndicesService(IndicesService indicesService) {
this.indicesService = indicesService;
}

public MetricsRegistry getMetricsRegistry() {
return metricsRegistry;
}

public void setMetricsRegistry(MetricsRegistry metricsRegistry) {
this.metricsRegistry = metricsRegistry;
}

public void setClient(final Client client) {
this.client = client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
import org.opensearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.ShardStateCollector;
import org.opensearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory;
import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector;
import org.opensearch.performanceanalyzer.commons.collectors.GCInfoCollector;
Expand Down Expand Up @@ -88,17 +92,19 @@
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.plugins.TelemetryAwarePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.watcher.ResourceWatcherService;

public final class PerformanceAnalyzerPlugin extends Plugin
implements ActionPlugin, NetworkPlugin, SearchPlugin {
implements ActionPlugin, NetworkPlugin, SearchPlugin, TelemetryAwarePlugin {
private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerPlugin.class);
public static final String PLUGIN_NAME = "opensearch-performance-analyzer";
private static final String ADD_FAULT_DETECTION_METHOD = "addFaultDetectionListener";
Expand Down Expand Up @@ -189,27 +195,19 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
clusterSettingsManager.addSubscriberForIntSetting(
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING, nodeStatsSettingHandler);

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ThreadPoolMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
new CacheConfigMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new CircuitBreakerCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new OSMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new HeapMetricsCollector());

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NodeDetailsCollector(configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NodeStatsAllShardsMetricsCollector(performanceAnalyzerController));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ClusterManagerServiceMetrics());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ClusterManagerServiceEventMetrics());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new DisksCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NetworkInterfaceCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new GCInfoCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(StatsCollector.instance());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new FaultDetectionMetricsCollector(
Expand All @@ -227,6 +225,29 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
new AdmissionControlMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ElectionTermCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new GCInfoCollector());

// Adding RTF Collectors if flag is enabled in performance-analyzer.properties
if (PluginSettings.instance().isTelemetryCollectorsEnabled()) {
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
LOG.info("Telemetry Collectors are enabled!");
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new RTFDisksCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFHeapMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFThreadPoolMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFNodeStatsAllShardsMetricsCollector());
} else {
LOG.info("Telemetry Collectors are disabled!");
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ThreadPoolMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new HeapMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NodeStatsAllShardsMetricsCollector(performanceAnalyzerController));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new DisksCollector());
}

try {
Class.forName(ShardIndexingPressureMetricsCollector.SHARD_INDEXING_PRESSURE_CLASS_NAME);
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
Expand Down Expand Up @@ -346,12 +367,14 @@ public Collection<Object> createComponents(
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
Supplier<RepositoriesService> repositoriesServiceSupplier,
Tracer tracer,
MetricsRegistry metricsRegistry) {
OpenSearchResources.INSTANCE.setClusterService(clusterService);
OpenSearchResources.INSTANCE.setThreadPool(threadPool);
OpenSearchResources.INSTANCE.setEnvironment(environment);
OpenSearchResources.INSTANCE.setClient(client);

OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry);
// ClusterSettingsManager needs ClusterService to have been created before we can
// initialize it. This is the earliest point at which we know ClusterService is created.
// So, call the initialize method here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
import org.opensearch.action.admin.indices.stats.ShardStats;

@FunctionalInterface
interface ValueCalculator {
public interface ValueCalculator {
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
long calculateValue(ShardStats shardStats);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.collectors.telemetry;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory;
import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector;
import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector;
import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics_generator.DiskMetricsGenerator;
import org.opensearch.performanceanalyzer.commons.metrics_generator.OSMetricsGenerator;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

public class RTFDisksCollector extends PerformanceAnalyzerMetricsCollector {

private Histogram diskWaitTimeMetrics;
private Histogram diskServiceRateMetrics;
private Histogram diskUtilizationMetrics;
private MetricsRegistry metricsRegistry;
private boolean metricsInitialised;
private static final Logger LOG = LogManager.getLogger(RTFDisksCollector.class);

public RTFDisksCollector() {
super(
MetricsConfiguration.CONFIG_MAP.get(DisksCollector.class).samplingInterval,
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
"RTFDisksCollector",
StatMetrics.DISKS_COLLECTOR_EXECUTION_TIME,
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
StatExceptionCode.DISK_METRICS_COLLECTOR_ERROR);
this.metricsInitialised = false;
}

@Override
public void collectMetrics(long startTime) {
OSMetricsGenerator generator = OSMetricsGeneratorFactory.getInstance();
if (generator == null) {
LOG.error("could not get the instance of OSMetricsGeneratorFactory class");
return;
}

metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry == null) {
LOG.error("could not get the instance of MetricsRegistry class");
return;
}

LOG.debug("Executing collect metrics for RTFDisksCollector");

initialiseMetricsIfNeeded();
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
DiskMetricsGenerator diskMetricsGenerator = generator.getDiskMetricsGenerator();
diskMetricsGenerator.addSample();

recordMetrics(diskMetricsGenerator);
}

private void recordMetrics(DiskMetricsGenerator diskMetricsGenerator) {
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
for (String disk : diskMetricsGenerator.getAllDisks()) {
Tags DiskNameTag = Tags.create().addTag("disk_name", disk);
double Disk_WaitTime = diskMetricsGenerator.getAwait(disk);
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
double Disk_ServiceRate = diskMetricsGenerator.getServiceRate(disk);
double Disk_Utilization = diskMetricsGenerator.getDiskUtilization(disk);
diskWaitTimeMetrics.record(Disk_WaitTime, DiskNameTag);
diskUtilizationMetrics.record(Disk_Utilization, DiskNameTag);
diskServiceRateMetrics.record(Disk_ServiceRate, DiskNameTag);
}
}

private void initialiseMetricsIfNeeded() {
if (metricsInitialised == false) {
diskWaitTimeMetrics =
metricsRegistry.createHistogram(
AllMetrics.DiskValue.Constants.WAIT_VALUE, "DiskWaitTimeMetrics", "");
diskServiceRateMetrics =
metricsRegistry.createHistogram(
AllMetrics.DiskValue.Constants.SRATE_VALUE,
"DiskServiceRateMetrics",
"");
diskUtilizationMetrics =
metricsRegistry.createHistogram(
AllMetrics.DiskValue.Constants.UTIL_VALUE,
"DiskUtilizationMetrics",
"");
metricsInitialised = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.collectors.telemetry;

import java.lang.management.MemoryUsage;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.collectors.HeapMetricsCollector;
import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector;
import org.opensearch.performanceanalyzer.commons.jvm.GCMetrics;
import org.opensearch.performanceanalyzer.commons.jvm.HeapMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

public class RTFHeapMetricsCollector extends PerformanceAnalyzerMetricsCollector {
private static final Logger LOG = LogManager.getLogger(RTFHeapMetricsCollector.class);
public static final int SAMPLING_TIME_INTERVAL =
MetricsConfiguration.CONFIG_MAP.get(HeapMetricsCollector.class).samplingInterval;

private int count;
private Histogram gcCollectionEventMetrics;
private Histogram gcCollectionTimeMetrics;
private Histogram heapUsedMetrics;
private MetricsRegistry metricsRegistry;
private final String memTypeAttributeKey = "mem_type";
private boolean metricsInitialised;

public RTFHeapMetricsCollector() {
super(
SAMPLING_TIME_INTERVAL,
"RTFHeapMetricsCollector",
StatMetrics.HEAP_METRICS_COLLECTOR_EXECUTION_TIME,
StatExceptionCode.HEAP_METRICS_COLLECTOR_ERROR);
this.count = 0;
this.metricsInitialised = false;
}

@Override
public void collectMetrics(long startTime) {
count += 1;
metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry == null) {
LOG.error("could not get the instance of MetricsRegistry class");
return;
}

initialiseMetricsIfNeeded();
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
GCMetrics.runGCMetrics();
LOG.debug("Executing collect metrics for RTFHeapMetricsCollector");
recordMetrics();
}

private void initialiseMetricsIfNeeded() {
if (metricsInitialised == false) {
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
gcCollectionEventMetrics =
metricsRegistry.createHistogram(
AllMetrics.HeapValue.Constants.COLLECTION_COUNT_VALUE,
"GC Collection Event PA Metrics",
"");

gcCollectionTimeMetrics =
metricsRegistry.createHistogram(
AllMetrics.HeapValue.Constants.COLLECTION_TIME_VALUE,
"GC Collection Time PA Metrics",
"");
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved

heapUsedMetrics =
metricsRegistry.createHistogram(
AllMetrics.HeapValue.Constants.USED_VALUE,
"GC Heap Used PA Metrics",
"");
metricsInitialised = true;
}
}

private void recordMetrics() {
Tags TotYoungGCTag =
Tags.create()
.addTag(memTypeAttributeKey, AllMetrics.GCType.TOT_YOUNG_GC.toString());

Tags TotFullGCTag =
Tags.create().addTag(memTypeAttributeKey, AllMetrics.GCType.TOT_FULL_GC.toString());

gcCollectionEventMetrics.record(GCMetrics.getTotYoungGCCollectionCount(), TotYoungGCTag);

gcCollectionEventMetrics.record(GCMetrics.getTotFullGCCollectionCount(), TotFullGCTag);

gcCollectionTimeMetrics.record(GCMetrics.getTotYoungGCCollectionTime(), TotYoungGCTag);

gcCollectionTimeMetrics.record(GCMetrics.getTotFullGCCollectionTime(), TotFullGCTag);

for (Map.Entry<String, Supplier<MemoryUsage>> entry :
HeapMetrics.getMemoryUsageSuppliers().entrySet()) {
MemoryUsage memoryUsage = entry.getValue().get();
heapUsedMetrics.record(
memoryUsage.getUsed(),
Tags.create().addTag(memTypeAttributeKey, entry.getKey()));
}

if (count == 12) {
count = 0;
for (Map.Entry<String, Supplier<MemoryUsage>> entry :
HeapMetrics.getMemoryUsageSuppliers().entrySet()) {
MemoryUsage memoryUsage = entry.getValue().get();
metricsRegistry.createGauge(
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
AllMetrics.HeapValue.Constants.MAX_VALUE,
"Heap Max PA metrics",
"",
() -> (double) memoryUsage.getMax(),
Tags.create().addTag(memTypeAttributeKey, entry.getKey()));
}
}
}
}
Loading
Loading