Skip to content

Commit

Permalink
Added dynamic control support to all collectors
Browse files Browse the repository at this point in the history
Signed-off-by: Atharva Sharma <[email protected]>
  • Loading branch information
atharvasharma61 committed Jun 3, 2024
1 parent f182464 commit 447e15f
Show file tree
Hide file tree
Showing 40 changed files with 613 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,13 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexModule;
import org.opensearch.performanceanalyzer.action.PerformanceAnalyzerActionFilter;
import org.opensearch.performanceanalyzer.collectors.AdmissionControlMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.CacheConfigMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.CircuitBreakerCollector;
import org.opensearch.performanceanalyzer.collectors.ClusterApplierServiceStatsCollector;
import org.opensearch.performanceanalyzer.collectors.ClusterManagerServiceEventMetrics;
import org.opensearch.performanceanalyzer.collectors.ClusterManagerServiceMetrics;
import org.opensearch.performanceanalyzer.collectors.ElectionTermCollector;
import org.opensearch.performanceanalyzer.collectors.FaultDetectionMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.NodeDetailsCollector;
import org.opensearch.performanceanalyzer.collectors.NodeStatsAllShardsMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.SearchBackPressureStatsCollector;
import org.opensearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.ShardStateCollector;
import org.opensearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.*;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory;
import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector;
import org.opensearch.performanceanalyzer.commons.collectors.GCInfoCollector;
import org.opensearch.performanceanalyzer.commons.collectors.HeapMetricsCollector;
import org.opensearch.performanceanalyzer.commons.collectors.NetworkInterfaceCollector;
import org.opensearch.performanceanalyzer.commons.collectors.OSMetricsCollector;
import org.opensearch.performanceanalyzer.commons.collectors.ScheduledMetricCollectorsExecutor;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.collectors.*;
import org.opensearch.performanceanalyzer.commons.config.PluginSettings;
import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper;
import org.opensearch.performanceanalyzer.commons.event_process.EventLog;
Expand All @@ -78,6 +59,7 @@
import org.opensearch.performanceanalyzer.config.setting.handler.ConfigOverridesClusterSettingHandler;
import org.opensearch.performanceanalyzer.config.setting.handler.NodeStatsSettingHandler;
import org.opensearch.performanceanalyzer.config.setting.handler.PerformanceAnalyzerClusterSettingHandler;
import org.opensearch.performanceanalyzer.config.setting.handler.PerformanceAnalyzerCollectorsSettingHandler;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerClusterConfigAction;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerOverridesClusterConfigAction;
Expand Down Expand Up @@ -114,6 +96,9 @@ public final class PerformanceAnalyzerPlugin extends Plugin
private static SecurityManager sm = null;
private final PerformanceAnalyzerClusterSettingHandler perfAnalyzerClusterSettingHandler;
private final NodeStatsSettingHandler nodeStatsSettingHandler;

private final PerformanceAnalyzerCollectorsSettingHandler
performanceAnalyzerCollectorsSettingHandler;
private final ConfigOverridesClusterSettingHandler configOverridesClusterSettingHandler;
private final ConfigOverridesWrapper configOverridesWrapper;
private final PerformanceAnalyzerController performanceAnalyzerController;
Expand Down Expand Up @@ -172,7 +157,8 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
new ClusterSettingsManager(
Arrays.asList(
PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING),
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING,
PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING),
Collections.singletonList(
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING));
configOverridesClusterSettingHandler =
Expand All @@ -195,17 +181,15 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
clusterSettingsManager.addSubscriberForIntSetting(
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING, nodeStatsSettingHandler);

// Adding RTF Collectors if flag is enabled in performance-analyzer.properties
if (PluginSettings.instance().isTelemetryCollectorsEnabled()) {
LOG.info("Scheduling Telemetry Collectors");
scheduleTelemetryCollectors();
}
performanceAnalyzerCollectorsSettingHandler =
new PerformanceAnalyzerCollectorsSettingHandler(
performanceAnalyzerController, clusterSettingsManager);
clusterSettingsManager.addSubscriberForIntSetting(
PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING,
performanceAnalyzerCollectorsSettingHandler);

// Adding RCA Collectors if flag is enabled in performance-analyzer.properties
if (PluginSettings.instance().isRcaCollectorsEnabled()) {
LOG.info("Scheduling RCA Collectors");
scheduleRcaCollectors();
}
scheduleTelemetryCollectors();
scheduleRcaCollectors();

scheduledMetricCollectorsExecutor.start();

Expand All @@ -221,35 +205,44 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
}

private void scheduleTelemetryCollectors() {
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new RTFDisksCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFHeapMetricsCollector());
new RTFDisksCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFHeapMetricsCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFThreadPoolMetricsCollector());
new RTFThreadPoolMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFNodeStatsAllShardsMetricsCollector());
new RTFNodeStatsAllShardsMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
}

private void scheduleRcaCollectors() {
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ThreadPoolMetricsCollector());
new ThreadPoolMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new HeapMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NodeStatsAllShardsMetricsCollector(performanceAnalyzerController));
new NodeStatsAllShardsMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new DisksCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new CacheConfigMetricsCollector());
new CacheConfigMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new CircuitBreakerCollector());
new CircuitBreakerCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new OSMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NodeDetailsCollector(configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ClusterManagerServiceMetrics());
new ClusterManagerServiceMetrics(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ClusterManagerServiceEventMetrics());
new ClusterManagerServiceEventMetrics(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NetworkInterfaceCollector());

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(StatsCollector.instance());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new FaultDetectionMetricsCollector(
Expand All @@ -263,7 +256,8 @@ private void scheduleRcaCollectors() {
new SearchBackPressureStatsCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new AdmissionControlMetricsCollector());
new AdmissionControlMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new ElectionTermCollector(performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new GCInfoCollector());
Expand Down Expand Up @@ -348,7 +342,8 @@ public List<org.opensearch.rest.RestHandler> getRestHandlers(
settings,
restController,
perfAnalyzerClusterSettingHandler,
nodeStatsSettingHandler);
nodeStatsSettingHandler,
performanceAnalyzerCollectorsSettingHandler);
PerformanceAnalyzerOverridesClusterConfigAction paOverridesConfigClusterAction =
new PerformanceAnalyzerOverridesClusterConfigAction(
settings,
Expand Down Expand Up @@ -410,6 +405,7 @@ public List<Setting<?>> getSettings() {
return Arrays.asList(
PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING,
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING);
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING,
PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import org.opensearch.performanceanalyzer.commons.collectors.MetricStatus;
import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper;
import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;

/** AdmissionControlMetricsCollector collects `UsedQuota`, `TotalQuota`, RejectionCount */
public class AdmissionControlMetricsCollector extends PerformanceAnalyzerMetricsCollector
Expand All @@ -48,20 +50,37 @@ public class AdmissionControlMetricsCollector extends PerformanceAnalyzerMetrics
private Class admissionControllerClass;
private Class jettyAdmissionControllerServiceClass;
private final boolean admissionControllerAvailable;
private PerformanceAnalyzerController performanceAnalyzerController;
private ConfigOverridesWrapper configOverridesWrapper;

public AdmissionControlMetricsCollector() {
public AdmissionControlMetricsCollector(
PerformanceAnalyzerController performanceAnalyzerController,
ConfigOverridesWrapper configOverridesWrapper) {
super(
sTimeInterval,
"AdmissionControlMetricsCollector",
ADMISSION_CONTROL_COLLECTOR_EXECUTION_TIME,
ADMISSION_CONTROL_COLLECTOR_ERROR);
this.value = new StringBuilder();
this.admissionControllerAvailable = canLoadAdmissionControllerClasses();
this.performanceAnalyzerController = performanceAnalyzerController;
this.configOverridesWrapper = configOverridesWrapper;
}

@Override
@SuppressWarnings("unchecked")
public void collectMetrics(long startTime) {
if (!performanceAnalyzerController.rcaCollectorsEnabled()) {
LOG.info("All RCA collectors are disabled. Skipping collection.");
return;
}

if (performanceAnalyzerController.isCollectorDisabled(
configOverridesWrapper, getCollectorName())) {
LOG.info(getCollectorName() + " is disabled. Skipping collection.");
return;
}

if (!this.admissionControllerAvailable) {
LOG.debug("AdmissionControl is not available for this domain");
ServiceMetrics.COMMONS_STAT_METRICS_AGGREGATOR.updateStat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@
import java.security.AccessController;
import java.security.PrivilegedAction;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.cache.Cache;
import org.opensearch.indices.IndicesService;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.collectors.MetricStatus;
import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector;
import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper;
import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics.CacheConfigDimension;
import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics.CacheConfigValue;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;

/*
* Unlike Cache Hit, Miss, Eviction Count and Size, which is tracked on a per shard basis,
Expand All @@ -41,22 +45,40 @@
*/
public class CacheConfigMetricsCollector extends PerformanceAnalyzerMetricsCollector
implements MetricsProcessor {
private static final Logger LOG = LogManager.getLogger(CacheConfigMetricsCollector.class);

public static final int SAMPLING_TIME_INTERVAL =
MetricsConfiguration.CONFIG_MAP.get(CacheConfigMetricsCollector.class).samplingInterval;
private static final int KEYS_PATH_LENGTH = 0;
private StringBuilder value;
private PerformanceAnalyzerController performanceAnalyzerController;
private ConfigOverridesWrapper configOverridesWrapper;

public CacheConfigMetricsCollector() {
public CacheConfigMetricsCollector(
PerformanceAnalyzerController performanceAnalyzerController,
ConfigOverridesWrapper configOverridesWrapper) {
super(
SAMPLING_TIME_INTERVAL,
"CacheConfigMetrics",
CACHE_CONFIG_METRICS_COLLECTOR_EXECUTION_TIME,
CACHE_CONFIG_METRICS_COLLECTOR_ERROR);
value = new StringBuilder();
this.performanceAnalyzerController = performanceAnalyzerController;
this.configOverridesWrapper = configOverridesWrapper;
}

@Override
public void collectMetrics(long startTime) {
if (!performanceAnalyzerController.rcaCollectorsEnabled()) {
LOG.info("All RCA collectors are disabled. Skipping collection.");
return;
}

if (performanceAnalyzerController.isCollectorDisabled(
configOverridesWrapper, getCollectorName())) {
LOG.info(getCollectorName() + " is disabled. Skipping collection.");
return;
}
IndicesService indicesService = OpenSearchResources.INSTANCE.getIndicesService();
if (indicesService == null) {
return;
Expand Down Expand Up @@ -104,10 +126,14 @@ public void collectMetrics(long startTime) {
indicesService,
"indicesRequestCache",
true);
Object openSearchOnHeapCache =
FieldUtils.readField(reqCache, "cache", true);
Cache requestCache =
(Cache)
FieldUtils.readField(
reqCache, "cache", true);
openSearchOnHeapCache,
"cache",
true);
Long requestCacheMaxSize =
(Long)
FieldUtils.readField(
Expand Down
Loading

0 comments on commit 447e15f

Please sign in to comment.