Skip to content

Commit

Permalink
[Backport 2.x] changes to merge PA-RTF (#662)
Browse files Browse the repository at this point in the history
* changes to merge PA-RTF (#651)

* PA RTF merging init

Signed-off-by: Atharva Sharma <[email protected]>

* working model

Signed-off-by: Atharva Sharma <[email protected]>

* working model tip

Signed-off-by: Atharva Sharma <[email protected]>

* functional model init

Signed-off-by: Atharva Sharma <[email protected]>

* Migrated HeapMetricsCollector

Signed-off-by: Atharva Sharma <[email protected]>

* Added RTFThreadPoolMetricsCollector

Signed-off-by: Atharva Sharma <[email protected]>

* migrated NodeStats and DiskMetricsCollector

Signed-off-by: Atharva Sharma <[email protected]>

* Added gauge data model for Heap_Max metric

Signed-off-by: Atharva Sharma <[email protected]>

* implemented TelemetryAwarePlugin

Signed-off-by: Atharva Sharma <[email protected]>

* Framework changes for PA RTF merging

Signed-off-by: Atharva Sharma <[email protected]>

* refactored

Signed-off-by: Atharva Sharma <[email protected]>

* spotless applied

Signed-off-by: Atharva Sharma <[email protected]>

* Addressed small comments

Signed-off-by: Atharva Sharma <[email protected]>

* Added different flag for RCA collectors

Signed-off-by: Atharva Sharma <[email protected]>

* Addressed more comments

Signed-off-by: Atharva Sharma <[email protected]>

* Added RTF collectors in config map

Signed-off-by: Atharva Sharma <[email protected]>

* Added UTs

Signed-off-by: Atharva Sharma <[email protected]>

* Added further UTs

Signed-off-by: Atharva Sharma <[email protected]>

* Added dynamic control support to all collectors

Signed-off-by: Atharva Sharma <[email protected]>

* fixed UT

Signed-off-by: Atharva Sharma <[email protected]>

* refactoring

Signed-off-by: Atharva Sharma <[email protected]>

* Revert "refactoring"

This reverts commit 25d66e8.

Signed-off-by: Atharva Sharma <[email protected]>

* Revert "fixed UT"

This reverts commit 369bd95.

Signed-off-by: Atharva Sharma <[email protected]>

* Revert "Added dynamic control support to all collectors"

This reverts commit 447e15f.

Signed-off-by: Atharva Sharma <[email protected]>

* Adding two new collector interfaces

Signed-off-by: Atharva Sharma <[email protected]>

* simplified interfaces

Signed-off-by: Atharva Sharma <[email protected]>

* Added units and javadocs

Signed-off-by: Atharva Sharma <[email protected]>

* Changes metrics semantic conventions

Signed-off-by: Atharva Sharma <[email protected]>

* refactored

Signed-off-by: Atharva Sharma <[email protected]>

* fixed UT

Signed-off-by: Atharva Sharma <[email protected]>

* Added stats metrics for rtf collectors

Signed-off-by: Atharva Sharma <[email protected]>

* reverted test delete

Signed-off-by: Atharva Sharma <[email protected]>

* Fixes javadoc compilation issue

Signed-off-by: Gagan Juneja <[email protected]>

---------

Signed-off-by: Atharva Sharma <[email protected]>
Signed-off-by: Gagan Juneja <[email protected]>
Co-authored-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Dev Agarwal <[email protected]>

---------

Signed-off-by: Atharva Sharma <[email protected]>
Signed-off-by: Gagan Juneja <[email protected]>
Signed-off-by: Dev Agarwal <[email protected]>
Co-authored-by: Atharva Sharma <[email protected]>
Co-authored-by: Gagan Juneja <[email protected]>
  • Loading branch information
3 people authored Jun 11, 2024
1 parent df8947e commit 84379d5
Show file tree
Hide file tree
Showing 20 changed files with 1,512 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.env.Environment;
import org.opensearch.indices.IndicesService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool;

public final class OpenSearchResources {
Expand All @@ -20,6 +21,7 @@ public final class OpenSearchResources {
private CircuitBreakerService circuitBreakerService;
private ClusterService clusterService;
private IndicesService indicesService;
private MetricsRegistry metricsRegistry;
private Settings settings;
private Environment environment;
private java.nio.file.Path configPath;
Expand All @@ -32,6 +34,7 @@ private OpenSearchResources() {
clusterService = null;
settings = null;
indicesService = null;
metricsRegistry = null;
environment = null;
configPath = null;
pluginFileLocation = null;
Expand Down Expand Up @@ -101,6 +104,14 @@ public void setIndicesService(IndicesService indicesService) {
this.indicesService = indicesService;
}

public MetricsRegistry getMetricsRegistry() {
return metricsRegistry;
}

public void setMetricsRegistry(MetricsRegistry metricsRegistry) {
this.metricsRegistry = metricsRegistry;
}

public void setClient(final Client client) {
this.client = client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
import org.opensearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.ShardStateCollector;
import org.opensearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory;
import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector;
import org.opensearch.performanceanalyzer.commons.collectors.GCInfoCollector;
Expand All @@ -73,6 +77,7 @@
import org.opensearch.performanceanalyzer.config.setting.handler.ConfigOverridesClusterSettingHandler;
import org.opensearch.performanceanalyzer.config.setting.handler.NodeStatsSettingHandler;
import org.opensearch.performanceanalyzer.config.setting.handler.PerformanceAnalyzerClusterSettingHandler;
import org.opensearch.performanceanalyzer.config.setting.handler.PerformanceAnalyzerCollectorsSettingHandler;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerClusterConfigAction;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerOverridesClusterConfigAction;
Expand All @@ -87,17 +92,19 @@
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.plugins.TelemetryAwarePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.watcher.ResourceWatcherService;

public final class PerformanceAnalyzerPlugin extends Plugin
implements ActionPlugin, NetworkPlugin, SearchPlugin {
implements ActionPlugin, NetworkPlugin, SearchPlugin, TelemetryAwarePlugin {
private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerPlugin.class);
public static final String PLUGIN_NAME = "opensearch-performance-analyzer";
private static final String ADD_FAULT_DETECTION_METHOD = "addFaultDetectionListener";
Expand All @@ -107,6 +114,8 @@ public final class PerformanceAnalyzerPlugin extends Plugin
private static SecurityManager sm = null;
private final PerformanceAnalyzerClusterSettingHandler perfAnalyzerClusterSettingHandler;
private final NodeStatsSettingHandler nodeStatsSettingHandler;
private final PerformanceAnalyzerCollectorsSettingHandler
performanceAnalyzerCollectorsSettingHandler;
private final ConfigOverridesClusterSettingHandler configOverridesClusterSettingHandler;
private final ConfigOverridesWrapper configOverridesWrapper;
private final PerformanceAnalyzerController performanceAnalyzerController;
Expand Down Expand Up @@ -165,7 +174,8 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
new ClusterSettingsManager(
Arrays.asList(
PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING),
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING,
PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING),
Collections.singletonList(
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING));
configOverridesClusterSettingHandler =
Expand All @@ -188,27 +198,62 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
clusterSettingsManager.addSubscriberForIntSetting(
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING, nodeStatsSettingHandler);

performanceAnalyzerCollectorsSettingHandler =
new PerformanceAnalyzerCollectorsSettingHandler(
performanceAnalyzerController, clusterSettingsManager);
clusterSettingsManager.addSubscriberForIntSetting(
PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING,
performanceAnalyzerCollectorsSettingHandler);

scheduleTelemetryCollectors();
scheduleRcaCollectors();

scheduledMetricCollectorsExecutor.start();

EventLog eventLog = new EventLog();
EventLogFileHandler eventLogFileHandler =
new EventLogFileHandler(eventLog, PluginSettings.instance().getMetricsLocation());
new EventLogQueueProcessor(
eventLogFileHandler,
MetricsConfiguration.SAMPLING_INTERVAL,
QUEUE_PURGE_INTERVAL_MS,
performanceAnalyzerController)
.scheduleExecutor();
}

private void scheduleTelemetryCollectors() {
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFDisksCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFHeapMetricsCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFThreadPoolMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFNodeStatsAllShardsMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
}

private void scheduleRcaCollectors() {
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ThreadPoolMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new HeapMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NodeStatsAllShardsMetricsCollector(performanceAnalyzerController));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new DisksCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new CacheConfigMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new CircuitBreakerCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new OSMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new HeapMetricsCollector());

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NodeDetailsCollector(configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NodeStatsAllShardsMetricsCollector(performanceAnalyzerController));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ClusterManagerServiceMetrics());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ClusterManagerServiceEventMetrics());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new DisksCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NetworkInterfaceCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new GCInfoCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(StatsCollector.instance());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new FaultDetectionMetricsCollector(
Expand All @@ -222,6 +267,7 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
new AdmissionControlMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ElectionTermCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new GCInfoCollector());
try {
Class.forName(ShardIndexingPressureMetricsCollector.SHARD_INDEXING_PRESSURE_CLASS_NAME);
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
Expand All @@ -231,17 +277,6 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
LOG.info(
"Shard IndexingPressure not present in this OpenSearch version. Skipping ShardIndexingPressureMetricsCollector");
}
scheduledMetricCollectorsExecutor.start();

EventLog eventLog = new EventLog();
EventLogFileHandler eventLogFileHandler =
new EventLogFileHandler(eventLog, PluginSettings.instance().getMetricsLocation());
new EventLogQueueProcessor(
eventLogFileHandler,
MetricsConfiguration.SAMPLING_INTERVAL,
QUEUE_PURGE_INTERVAL_MS,
performanceAnalyzerController)
.scheduleExecutor();
}

// - http level: bulk, search
Expand Down Expand Up @@ -314,7 +349,8 @@ public List<org.opensearch.rest.RestHandler> getRestHandlers(
settings,
restController,
perfAnalyzerClusterSettingHandler,
nodeStatsSettingHandler);
nodeStatsSettingHandler,
performanceAnalyzerCollectorsSettingHandler);
PerformanceAnalyzerOverridesClusterConfigAction paOverridesConfigClusterAction =
new PerformanceAnalyzerOverridesClusterConfigAction(
settings,
Expand All @@ -341,12 +377,14 @@ public Collection<Object> createComponents(
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
Supplier<RepositoriesService> repositoriesServiceSupplier,
Tracer tracer,
MetricsRegistry metricsRegistry) {
OpenSearchResources.INSTANCE.setClusterService(clusterService);
OpenSearchResources.INSTANCE.setThreadPool(threadPool);
OpenSearchResources.INSTANCE.setEnvironment(environment);
OpenSearchResources.INSTANCE.setClient(client);

OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry);
// ClusterSettingsManager needs ClusterService to have been created before we can
// initialize it. This is the earliest point at which we know ClusterService is created.
// So, call the initialize method here.
Expand Down Expand Up @@ -374,6 +412,7 @@ public List<Setting<?>> getSettings() {
return Arrays.asList(
PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING,
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING);
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING,
PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
import org.opensearch.action.admin.indices.stats.ShardStats;

@FunctionalInterface
interface ValueCalculator {
public interface ValueCalculator {
long calculateValue(ShardStats shardStats);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.collectors.telemetry;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory;
import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector;
import org.opensearch.performanceanalyzer.commons.collectors.TelemetryCollector;
import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
import org.opensearch.performanceanalyzer.commons.metrics_generator.DiskMetricsGenerator;
import org.opensearch.performanceanalyzer.commons.metrics_generator.OSMetricsGenerator;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

public class RTFDisksCollector extends PerformanceAnalyzerMetricsCollector
implements TelemetryCollector {

private Histogram diskWaitTimeMetrics;
private Histogram diskServiceRateMetrics;
private Histogram diskUtilizationMetrics;
private MetricsRegistry metricsRegistry;
private boolean metricsInitialised;
private static final Logger LOG = LogManager.getLogger(RTFDisksCollector.class);
private PerformanceAnalyzerController performanceAnalyzerController;
private ConfigOverridesWrapper configOverridesWrapper;

public RTFDisksCollector(
PerformanceAnalyzerController performanceAnalyzerController,
ConfigOverridesWrapper configOverridesWrapper) {
super(
MetricsConfiguration.CONFIG_MAP.get(RTFDisksCollector.class).samplingInterval,
"RTFDisksCollector",
StatMetrics.RTF_DISKS_COLLECTOR_EXECUTION_TIME,
StatExceptionCode.RTF_DISK_METRICS_COLLECTOR_ERROR);
this.metricsInitialised = false;
this.performanceAnalyzerController = performanceAnalyzerController;
this.configOverridesWrapper = configOverridesWrapper;
}

@Override
public void collectMetrics(long startTime) {
if (performanceAnalyzerController.isCollectorDisabled(
configOverridesWrapper, getCollectorName())) {
LOG.info("RTFDisksCollector is disabled. Skipping collection.");
return;
}

OSMetricsGenerator generator = OSMetricsGeneratorFactory.getInstance();
if (generator == null) {
LOG.error("could not get the instance of OSMetricsGeneratorFactory class");
return;
}

metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry == null) {
LOG.error("could not get the instance of MetricsRegistry class");
return;
}

LOG.debug("Executing collect metrics for RTFDisksCollector");

initialiseMetricsIfNeeded();
DiskMetricsGenerator diskMetricsGenerator = generator.getDiskMetricsGenerator();
diskMetricsGenerator.addSample();

recordMetrics(diskMetricsGenerator);
}

public void recordMetrics(DiskMetricsGenerator diskMetricsGenerator) {
for (String disk : diskMetricsGenerator.getAllDisks()) {
Tags diskNameTag =
Tags.create().addTag(RTFMetrics.DiskDimension.DISK_NAME.toString(), disk);
double diskWaitTime = diskMetricsGenerator.getAwait(disk);
double diskServiceRate = diskMetricsGenerator.getServiceRate(disk);
double diskUtilization = diskMetricsGenerator.getDiskUtilization(disk);
diskWaitTimeMetrics.record(diskWaitTime, diskNameTag);
diskUtilizationMetrics.record(diskUtilization, diskNameTag);
diskServiceRateMetrics.record(diskServiceRate, diskNameTag);
}
}

private void initialiseMetricsIfNeeded() {
if (metricsInitialised == false) {
diskWaitTimeMetrics =
metricsRegistry.createHistogram(
RTFMetrics.DiskValue.Constants.WAIT_VALUE,
"DiskWaitTimeMetrics",
RTFMetrics.MetricUnits.MILLISECOND.toString());
diskServiceRateMetrics =
metricsRegistry.createHistogram(
RTFMetrics.DiskValue.Constants.SRATE_VALUE,
"DiskServiceRateMetrics",
RTFMetrics.MetricUnits.MEGABYTE_PER_SEC.toString());
diskUtilizationMetrics =
metricsRegistry.createHistogram(
RTFMetrics.DiskValue.Constants.UTIL_VALUE,
"DiskUtilizationMetrics",
RTFMetrics.MetricUnits.PERCENT.toString());
metricsInitialised = true;
}
}
}
Loading

0 comments on commit 84379d5

Please sign in to comment.