Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes to merge PA-RTF #651

Merged
merged 36 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4ec8da2
PA RTF merging init
atharvasharma61 Apr 22, 2024
a68d0fb
working model
atharvasharma61 Apr 22, 2024
2fecd36
working model tip
atharvasharma61 Apr 22, 2024
5ec43af
functional model init
atharvasharma61 Apr 22, 2024
b19e35f
Migrated HeapMetricsCollector
atharvasharma61 Apr 25, 2024
791e99a
Merge branch 'opensearch-project:main' into pa-rtf
atharvasharma61 Apr 25, 2024
d77c71f
Added RTFThreadPoolMetricsCollector
atharvasharma61 May 7, 2024
7072a65
migrated NodeStats and DiskMetricsCollector
atharvasharma61 May 13, 2024
8e43e03
Added gauge data model for Heap_Max metric
atharvasharma61 May 17, 2024
4d5dec4
implemented TelemetryAwarePlugin
atharvasharma61 May 20, 2024
ef28130
Merge branch 'opensearch-project:main' into pa-rtf
atharvasharma61 May 30, 2024
49946a6
Framework changes for PA RTF merging
atharvasharma61 May 30, 2024
82e15e7
refactored
atharvasharma61 May 30, 2024
8922651
spotless applied
atharvasharma61 May 30, 2024
a80873f
Addressed small comments
atharvasharma61 May 31, 2024
e39aada
Added different flag for RCA collectors
atharvasharma61 May 31, 2024
97dd4bc
Addressed more comments
atharvasharma61 May 31, 2024
e9b5d57
Added RTF collectors in config map
atharvasharma61 May 31, 2024
538d682
Added UTs
atharvasharma61 May 31, 2024
f182464
Added further UTs
atharvasharma61 Jun 1, 2024
447e15f
Added dynamic control support to all collectors
atharvasharma61 Jun 3, 2024
369bd95
fixed UT
atharvasharma61 Jun 3, 2024
25d66e8
refactoring
atharvasharma61 Jun 3, 2024
84580dc
Revert "refactoring"
atharvasharma61 Jun 4, 2024
16c592e
Revert "fixed UT"
atharvasharma61 Jun 4, 2024
55a7604
Revert "Added dynamic control support to all collectors"
atharvasharma61 Jun 4, 2024
837b15d
Adding two new collector interfaces
atharvasharma61 Jun 4, 2024
96166aa
simplified interfaces
atharvasharma61 Jun 4, 2024
a9a756f
Added units and javadocs
atharvasharma61 Jun 5, 2024
8df65a5
Changes metrics semantic conventions
atharvasharma61 Jun 5, 2024
7213080
refactored
atharvasharma61 Jun 5, 2024
857214b
fixed UT
atharvasharma61 Jun 5, 2024
da1c4ba
Added stats metrics for rtf collectors
atharvasharma61 Jun 6, 2024
1e211f0
reverted test delete
atharvasharma61 Jun 6, 2024
44f0b7c
Fixes javadoc compilation issue
Jun 10, 2024
870e3a5
Merge branch 'opensearch-project:main' into pa-rtf
atharvasharma61 Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.env.Environment;
import org.opensearch.indices.IndicesService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool;

public final class OpenSearchResources {
Expand All @@ -20,6 +21,7 @@ public final class OpenSearchResources {
private CircuitBreakerService circuitBreakerService;
private ClusterService clusterService;
private IndicesService indicesService;
private MetricsRegistry metricsRegistry;
private Settings settings;
private Environment environment;
private java.nio.file.Path configPath;
Expand All @@ -32,6 +34,7 @@ private OpenSearchResources() {
clusterService = null;
settings = null;
indicesService = null;
metricsRegistry = null;
environment = null;
configPath = null;
pluginFileLocation = null;
Expand Down Expand Up @@ -101,6 +104,14 @@ public void setIndicesService(IndicesService indicesService) {
this.indicesService = indicesService;
}

public MetricsRegistry getMetricsRegistry() {
return metricsRegistry;
}

public void setMetricsRegistry(MetricsRegistry metricsRegistry) {
this.metricsRegistry = metricsRegistry;
}

public void setClient(final Client client) {
this.client = client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
import org.opensearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.ShardStateCollector;
import org.opensearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory;
import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector;
import org.opensearch.performanceanalyzer.commons.collectors.GCInfoCollector;
Expand All @@ -74,6 +78,7 @@
import org.opensearch.performanceanalyzer.config.setting.handler.ConfigOverridesClusterSettingHandler;
import org.opensearch.performanceanalyzer.config.setting.handler.NodeStatsSettingHandler;
import org.opensearch.performanceanalyzer.config.setting.handler.PerformanceAnalyzerClusterSettingHandler;
import org.opensearch.performanceanalyzer.config.setting.handler.PerformanceAnalyzerCollectorsSettingHandler;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerClusterConfigAction;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerOverridesClusterConfigAction;
Expand All @@ -88,17 +93,19 @@
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.plugins.TelemetryAwarePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.watcher.ResourceWatcherService;

public final class PerformanceAnalyzerPlugin extends Plugin
implements ActionPlugin, NetworkPlugin, SearchPlugin {
implements ActionPlugin, NetworkPlugin, SearchPlugin, TelemetryAwarePlugin {
private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerPlugin.class);
public static final String PLUGIN_NAME = "opensearch-performance-analyzer";
private static final String ADD_FAULT_DETECTION_METHOD = "addFaultDetectionListener";
Expand All @@ -108,6 +115,8 @@ public final class PerformanceAnalyzerPlugin extends Plugin
private static SecurityManager sm = null;
private final PerformanceAnalyzerClusterSettingHandler perfAnalyzerClusterSettingHandler;
private final NodeStatsSettingHandler nodeStatsSettingHandler;
private final PerformanceAnalyzerCollectorsSettingHandler
performanceAnalyzerCollectorsSettingHandler;
private final ConfigOverridesClusterSettingHandler configOverridesClusterSettingHandler;
private final ConfigOverridesWrapper configOverridesWrapper;
private final PerformanceAnalyzerController performanceAnalyzerController;
Expand Down Expand Up @@ -166,7 +175,8 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
new ClusterSettingsManager(
Arrays.asList(
PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING),
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING,
PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING),
Collections.singletonList(
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING));
configOverridesClusterSettingHandler =
Expand All @@ -189,27 +199,62 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
clusterSettingsManager.addSubscriberForIntSetting(
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING, nodeStatsSettingHandler);

performanceAnalyzerCollectorsSettingHandler =
new PerformanceAnalyzerCollectorsSettingHandler(
performanceAnalyzerController, clusterSettingsManager);
clusterSettingsManager.addSubscriberForIntSetting(
PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING,
performanceAnalyzerCollectorsSettingHandler);

scheduleTelemetryCollectors();
scheduleRcaCollectors();

scheduledMetricCollectorsExecutor.start();

EventLog eventLog = new EventLog();
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
EventLogFileHandler eventLogFileHandler =
new EventLogFileHandler(eventLog, PluginSettings.instance().getMetricsLocation());
new EventLogQueueProcessor(
eventLogFileHandler,
MetricsConfiguration.SAMPLING_INTERVAL,
QUEUE_PURGE_INTERVAL_MS,
performanceAnalyzerController)
.scheduleExecutor();
}

private void scheduleTelemetryCollectors() {
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFDisksCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFHeapMetricsCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFThreadPoolMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFNodeStatsAllShardsMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
}

private void scheduleRcaCollectors() {
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ThreadPoolMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new HeapMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NodeStatsAllShardsMetricsCollector(performanceAnalyzerController));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new DisksCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
new CacheConfigMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new CircuitBreakerCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new OSMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new HeapMetricsCollector());

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NodeDetailsCollector(configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NodeStatsAllShardsMetricsCollector(performanceAnalyzerController));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ClusterManagerServiceMetrics());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ClusterManagerServiceEventMetrics());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new DisksCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NetworkInterfaceCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new GCInfoCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(StatsCollector.instance());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new FaultDetectionMetricsCollector(
Expand All @@ -222,11 +267,11 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new SearchBackPressureStatsCollector(
performanceAnalyzerController, configOverridesWrapper));

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new AdmissionControlMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ElectionTermCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new GCInfoCollector());
try {
Class.forName(ShardIndexingPressureMetricsCollector.SHARD_INDEXING_PRESSURE_CLASS_NAME);
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
Expand All @@ -236,17 +281,6 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
LOG.info(
"Shard IndexingPressure not present in this OpenSearch version. Skipping ShardIndexingPressureMetricsCollector");
}
scheduledMetricCollectorsExecutor.start();

EventLog eventLog = new EventLog();
EventLogFileHandler eventLogFileHandler =
new EventLogFileHandler(eventLog, PluginSettings.instance().getMetricsLocation());
new EventLogQueueProcessor(
eventLogFileHandler,
MetricsConfiguration.SAMPLING_INTERVAL,
QUEUE_PURGE_INTERVAL_MS,
performanceAnalyzerController)
.scheduleExecutor();
}

// - http level: bulk, search
Expand Down Expand Up @@ -319,7 +353,8 @@ public List<org.opensearch.rest.RestHandler> getRestHandlers(
settings,
restController,
perfAnalyzerClusterSettingHandler,
nodeStatsSettingHandler);
nodeStatsSettingHandler,
performanceAnalyzerCollectorsSettingHandler);
PerformanceAnalyzerOverridesClusterConfigAction paOverridesConfigClusterAction =
new PerformanceAnalyzerOverridesClusterConfigAction(
settings,
Expand All @@ -346,12 +381,14 @@ public Collection<Object> createComponents(
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
Supplier<RepositoriesService> repositoriesServiceSupplier,
Tracer tracer,
MetricsRegistry metricsRegistry) {
OpenSearchResources.INSTANCE.setClusterService(clusterService);
OpenSearchResources.INSTANCE.setThreadPool(threadPool);
OpenSearchResources.INSTANCE.setEnvironment(environment);
OpenSearchResources.INSTANCE.setClient(client);

OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry);
// ClusterSettingsManager needs ClusterService to have been created before we can
// initialize it. This is the earliest point at which we know ClusterService is created.
// So, call the initialize method here.
Expand Down Expand Up @@ -379,6 +416,7 @@ public List<Setting<?>> getSettings() {
return Arrays.asList(
PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING,
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING);
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING,
PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
import org.opensearch.action.admin.indices.stats.ShardStats;

@FunctionalInterface
interface ValueCalculator {
public interface ValueCalculator {
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
long calculateValue(ShardStats shardStats);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.collectors.telemetry;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory;
import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector;
import org.opensearch.performanceanalyzer.commons.collectors.TelemetryCollector;
import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
import org.opensearch.performanceanalyzer.commons.metrics_generator.DiskMetricsGenerator;
import org.opensearch.performanceanalyzer.commons.metrics_generator.OSMetricsGenerator;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

public class RTFDisksCollector extends PerformanceAnalyzerMetricsCollector
Copy link
Collaborator

@nishchay21 nishchay21 Jun 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add java doc on all collectors

implements TelemetryCollector {

private Histogram diskWaitTimeMetrics;
private Histogram diskServiceRateMetrics;
private Histogram diskUtilizationMetrics;
private MetricsRegistry metricsRegistry;
private boolean metricsInitialised;
private static final Logger LOG = LogManager.getLogger(RTFDisksCollector.class);
private PerformanceAnalyzerController performanceAnalyzerController;
private ConfigOverridesWrapper configOverridesWrapper;

public RTFDisksCollector(
PerformanceAnalyzerController performanceAnalyzerController,
ConfigOverridesWrapper configOverridesWrapper) {
super(
MetricsConfiguration.CONFIG_MAP.get(RTFDisksCollector.class).samplingInterval,
"RTFDisksCollector",
StatMetrics.DISKS_COLLECTOR_EXECUTION_TIME,
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
StatExceptionCode.DISK_METRICS_COLLECTOR_ERROR);
this.metricsInitialised = false;
this.performanceAnalyzerController = performanceAnalyzerController;
this.configOverridesWrapper = configOverridesWrapper;
}

@Override
public void collectMetrics(long startTime) {
if (performanceAnalyzerController.isCollectorDisabled(
configOverridesWrapper, getCollectorName())) {
LOG.info("RTFDisksCollector is disabled. Skipping collection.");
return;
}

OSMetricsGenerator generator = OSMetricsGeneratorFactory.getInstance();
if (generator == null) {
LOG.error("could not get the instance of OSMetricsGeneratorFactory class");
return;
}

metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry == null) {
LOG.error("could not get the instance of MetricsRegistry class");
return;
}

LOG.debug("Executing collect metrics for RTFDisksCollector");

initialiseMetricsIfNeeded();
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
DiskMetricsGenerator diskMetricsGenerator = generator.getDiskMetricsGenerator();
diskMetricsGenerator.addSample();

recordMetrics(diskMetricsGenerator);
}

public void recordMetrics(DiskMetricsGenerator diskMetricsGenerator) {
for (String disk : diskMetricsGenerator.getAllDisks()) {
Tags diskNameTag =
Tags.create().addTag(RTFMetrics.DiskDimension.DISK_NAME.toString(), disk);
double diskWaitTime = diskMetricsGenerator.getAwait(disk);
double diskServiceRate = diskMetricsGenerator.getServiceRate(disk);
double diskUtilization = diskMetricsGenerator.getDiskUtilization(disk);
diskWaitTimeMetrics.record(diskWaitTime, diskNameTag);
diskUtilizationMetrics.record(diskUtilization, diskNameTag);
diskServiceRateMetrics.record(diskServiceRate, diskNameTag);
}
}

private void initialiseMetricsIfNeeded() {
if (metricsInitialised == false) {
diskWaitTimeMetrics =
metricsRegistry.createHistogram(
RTFMetrics.DiskValue.Constants.WAIT_VALUE,
"DiskWaitTimeMetrics",
atharvasharma61 marked this conversation as resolved.
Show resolved Hide resolved
RTFMetrics.MetricUnits.MILLISECOND.toString());
diskServiceRateMetrics =
metricsRegistry.createHistogram(
RTFMetrics.DiskValue.Constants.SRATE_VALUE,
"DiskServiceRateMetrics",
RTFMetrics.MetricUnits.MEGABYTE_PER_SEC.toString());
diskUtilizationMetrics =
metricsRegistry.createHistogram(
RTFMetrics.DiskValue.Constants.UTIL_VALUE,
"DiskUtilizationMetrics",
RTFMetrics.MetricUnits.PERCENT.toString());
metricsInitialised = true;
}
}
}
Loading
Loading