Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add initial support for dynamic config overriding #148

Merged
merged 5 commits into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.setting.handler.ConfigOverridesClusterSettingHandler;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CacheConfigMetricsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.setting.handler.NodeStatsSettingHandler;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.http_action.config.PerformanceAnalyzerOverridesClusterConfigAction;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.http_action.config.PerformanceAnalyzerResourceProvider;
import java.io.File;
import java.security.AccessController;
Expand Down Expand Up @@ -95,6 +98,7 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.transport.PerformanceAnalyzerTransportInterceptor;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.util.Utils;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.writer.EventLogQueueProcessor;

import static java.util.Collections.singletonList;

public final class PerformanceAnalyzerPlugin extends Plugin implements ActionPlugin, NetworkPlugin, SearchPlugin {
Expand All @@ -104,6 +108,8 @@ public final class PerformanceAnalyzerPlugin extends Plugin implements ActionPlu
private static SecurityManager sm = null;
private final PerformanceAnalyzerClusterSettingHandler perfAnalyzerClusterSettingHandler;
private final NodeStatsSettingHandler nodeStatsSettingHandler;
private final ConfigOverridesClusterSettingHandler configOverridesClusterSettingHandler;
private final ConfigOverridesWrapper configOverridesWrapper;
private final PerformanceAnalyzerController performanceAnalyzerController;
private final ClusterSettingsManager clusterSettingsManager;

Expand Down Expand Up @@ -152,9 +158,27 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
//initialize plugin settings. Accessing plugin settings before this
//point will break, as the plugin location will not be initialized.
PluginSettings.instance();

scheduledMetricCollectorsExecutor = new ScheduledMetricCollectorsExecutor();
this.performanceAnalyzerController = new PerformanceAnalyzerController(scheduledMetricCollectorsExecutor);

configOverridesWrapper = new ConfigOverridesWrapper();
clusterSettingsManager = new ClusterSettingsManager(Arrays.asList(PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING),
Collections.singletonList(PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if there is any documentation change we need for adding this new setting in cluster settings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it to the readme in the reader repo.

configOverridesClusterSettingHandler = new ConfigOverridesClusterSettingHandler(configOverridesWrapper, clusterSettingsManager,
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING);
clusterSettingsManager.addSubscriberForStringSetting(PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING,
configOverridesClusterSettingHandler);
perfAnalyzerClusterSettingHandler = new PerformanceAnalyzerClusterSettingHandler(performanceAnalyzerController,
clusterSettingsManager);
clusterSettingsManager.addSubscriberForIntSetting(PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
perfAnalyzerClusterSettingHandler);

nodeStatsSettingHandler = new NodeStatsSettingHandler(performanceAnalyzerController,
clusterSettingsManager);
clusterSettingsManager.addSubscriberForIntSetting(PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING,
nodeStatsSettingHandler);

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new ThreadPoolMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new CacheConfigMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new CircuitBreakerCollector());
Expand All @@ -163,7 +187,7 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new MetricsPurgeActivity());

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new NodeDetailsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new NodeDetailsCollector(configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new NodeStatsMetricsCollector(performanceAnalyzerController));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new MasterServiceMetrics());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new MasterServiceEventMetrics());
Expand All @@ -172,22 +196,6 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(StatsCollector.instance());
scheduledMetricCollectorsExecutor.start();

clusterSettingsManager = new ClusterSettingsManager(
Arrays.asList(PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING));

perfAnalyzerClusterSettingHandler = new PerformanceAnalyzerClusterSettingHandler(
performanceAnalyzerController,
clusterSettingsManager);
clusterSettingsManager.addSubscriberForSetting(PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
perfAnalyzerClusterSettingHandler);

nodeStatsSettingHandler = new NodeStatsSettingHandler(
performanceAnalyzerController,
clusterSettingsManager);
clusterSettingsManager.addSubscriberForSetting(PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING,
nodeStatsSettingHandler);

EventLog eventLog = new EventLog();
EventLogFileHandler eventLogFileHandler = new EventLogFileHandler(eventLog, PluginSettings.instance().getMetricsLocation());
new EventLogQueueProcessor(eventLogFileHandler,
Expand Down Expand Up @@ -237,7 +245,10 @@ public List<org.elasticsearch.rest.RestHandler> getRestHandlers(final Settings s
PerformanceAnalyzerResourceProvider performanceAnalyzerRp = new PerformanceAnalyzerResourceProvider(settings, restController);
PerformanceAnalyzerClusterConfigAction paClusterConfigAction = new PerformanceAnalyzerClusterConfigAction(settings,
restController, perfAnalyzerClusterSettingHandler, nodeStatsSettingHandler);
return Arrays.asList(performanceanalyzerConfigAction, paClusterConfigAction, performanceAnalyzerRp);
PerformanceAnalyzerOverridesClusterConfigAction paOverridesConfigClusterAction =
new PerformanceAnalyzerOverridesClusterConfigAction(settings, restController,
configOverridesClusterSettingHandler, configOverridesWrapper);
return Arrays.asList(performanceanalyzerConfigAction, paClusterConfigAction, performanceAnalyzerRp, paOverridesConfigClusterAction);
}

@Override
Expand Down Expand Up @@ -275,7 +286,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING);
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING,
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesHelper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.NodeDetailColumns;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.NodeRole;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration;
Expand All @@ -27,15 +29,22 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;

import java.io.IOException;
import java.util.Iterator;

public class NodeDetailsCollector extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor {
public static final int SAMPLING_TIME_INTERVAL = MetricsConfiguration.CONFIG_MAP.get(NodeDetailsCollector.class).samplingInterval;
private static final Logger LOG = LogManager.getLogger(NodeDetailsCollector.class);
private static final int KEYS_PATH_LENGTH = 0;
private final ConfigOverridesWrapper configOverridesWrapper;

public NodeDetailsCollector() {
this(null);
}

public NodeDetailsCollector(final ConfigOverridesWrapper configOverridesWrapper) {
super(SAMPLING_TIME_INTERVAL, "NodeDetails");
this.configOverridesWrapper = configOverridesWrapper;
}

@Override
Expand All @@ -54,6 +63,22 @@ public void collectMetrics(long startTime) {
.append(
PerformanceAnalyzerMetrics.sMetricNewLineDelimitor);

// We add the config overrides in line#2 because we don't know how many lines
// follow that belong to actual node details, and the reader also has no way to
// know this information in advance unless we add the number of nodes as
// additional metadata in the file.
try {
String rcaOverrides = ConfigOverridesHelper.serialize(configOverridesWrapper.getCurrentClusterConfigOverrides());
value.append(rcaOverrides);
} catch (IOException ioe) {
LOG.error("Unable to serialize rca config overrides.", ioe);
}
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor);

// line#3 denotes when the timestamp when the config override happened.
value.append(configOverridesWrapper.getLastUpdatedTimestamp());
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor);

DiscoveryNodes discoveryNodes = ESResources.INSTANCE.getClusterService().state().nodes();

DiscoveryNode masterNode = discoveryNodes.getMasterNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@
*/
public class ClusterSettingsManager implements ClusterStateListener {
private static final Logger LOG = LogManager.getLogger(ClusterSettingsManager.class);
private Map<Setting<Integer>, List<ClusterSettingListener<Integer>>> listenerMap = new HashMap<>();
private final List<Setting<Integer>> managedSettings = new ArrayList<>();
private final Map<Setting<Integer>, List<ClusterSettingListener<Integer>>> intSettingListenerMap = new HashMap<>();
private final Map<Setting<String>, List<ClusterSettingListener<String>>> stringSettingListenerMap = new HashMap<>();
private final List<Setting<Integer>> managedIntSettings = new ArrayList<>();
private final List<Setting<String>> managedStringSettings = new ArrayList<>();
private final ClusterSettingsResponseHandler clusterSettingsResponseHandler;

private boolean initialized = false;

public ClusterSettingsManager(List<Setting<Integer>> initialSettings) {
managedSettings.addAll(initialSettings);
public ClusterSettingsManager(List<Setting<Integer>> intSettings, List<Setting<String>> stringSettings) {
managedIntSettings.addAll(intSettings);
managedStringSettings.addAll(stringSettings);
this.clusterSettingsResponseHandler = new ClusterSettingsResponseHandler();
}

Expand All @@ -45,18 +48,35 @@ public ClusterSettingsManager(List<Setting<Integer>> initialSettings) {
* @param setting The setting that needs to be listened to.
* @param listener The listener object that will be called when the setting changes.
*/
public void addSubscriberForSetting(Setting<Integer> setting, ClusterSettingListener<Integer> listener) {
if (listenerMap.containsKey(setting)) {
final List<ClusterSettingListener<Integer>> currentListeners = listenerMap.get(setting);
public void addSubscriberForIntSetting(Setting<Integer> setting, ClusterSettingListener<Integer> listener) {
if (intSettingListenerMap.containsKey(setting)) {
final List<ClusterSettingListener<Integer>> currentListeners = intSettingListenerMap.get(setting);
if (!currentListeners.contains(listener)) {
currentListeners.add(listener);
listenerMap.put(setting, currentListeners);
intSettingListenerMap.put(setting, currentListeners);
}
} else {
listenerMap.put(setting, Collections.singletonList(listener));
intSettingListenerMap.put(setting, Collections.singletonList(listener));
}
}

/**
* Adds a listener that will be called when the requested setting's value changes.
*
* @param setting The setting that needs to be listened to.
* @param listener The listener object that will be called when the setting changes.
*/
public void addSubscriberForStringSetting(Setting<String> setting, ClusterSettingListener<String> listener) {
if (stringSettingListenerMap.containsKey(setting)) {
final List<ClusterSettingListener<String>> currentListeners = stringSettingListenerMap.get(setting);
if (!currentListeners.contains(listener)) {
currentListeners.add(listener);
stringSettingListenerMap.put(setting, currentListeners);
}
} else {
stringSettingListenerMap.put(setting, Collections.singletonList(listener));
}
}
/**
* Bootstraps the listeners and tries to read initial values for cluster settings.
*/
Expand Down Expand Up @@ -91,14 +111,34 @@ public void updateSetting(final Setting<Integer> setting, final Integer newValue
ESResources.INSTANCE.getClient().admin().cluster().updateSettings(request);
}

/**
* Updates the requested setting with the requested value across the cluster.
*
* @param setting The setting that needs to be updated.
* @param newValue The new value for the setting.
*/
public void updateSetting(final Setting<String> setting, final String newValue) {
final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();
request.persistentSettings(Settings.builder()
.put(setting.getKey(), newValue)
.build());
ESResources.INSTANCE.getClient().admin().cluster().updateSettings(request);
}

/**
* Registers a setting update listener for all the settings managed by this instance.
*/
private void registerSettingUpdateListener() {
for (Setting<Integer> setting : managedSettings) {
for (Setting<Integer> setting : managedIntSettings) {
ESResources.INSTANCE.getClusterService()
.getClusterSettings()
.addSettingsUpdateConsumer(setting, updatedVal -> callListeners(setting, updatedVal));
.addSettingsUpdateConsumer(setting, updatedVal -> callIntSettingListeners(setting, updatedVal));
}

for (Setting<String> setting : managedStringSettings) {
ESResources.INSTANCE.getClusterService()
.getClusterSettings()
.addSettingsUpdateConsumer(setting, updatedVal -> callStringSettingListeners(setting, updatedVal));
}
}

Expand Down Expand Up @@ -166,9 +206,9 @@ public void clusterChanged(final ClusterChangedEvent event) {
* @param setting The setting whose listeners need to be notified.
* @param settingValue The new value for the setting.
*/
private void callListeners(final Setting<Integer> setting, int settingValue) {
private void callIntSettingListeners(final Setting<Integer> setting, int settingValue) {
try {
final List<ClusterSettingListener<Integer>> listeners = listenerMap.get(setting);
final List<ClusterSettingListener<Integer>> listeners = intSettingListenerMap.get(setting);
if (listeners != null) {
for (ClusterSettingListener<Integer> listener : listeners) {
listener.onSettingUpdate(settingValue);
Expand All @@ -180,6 +220,25 @@ private void callListeners(final Setting<Integer> setting, int settingValue) {
}
}

/**
* Calls all the listeners for the specified setting with the requested value.
*
* @param setting The setting whose listeners need to be notified.
* @param settingValue The new value for the setting.
*/
private void callStringSettingListeners(final Setting<String> setting, String settingValue) {
try {
final List<ClusterSettingListener<String>> listeners = stringSettingListenerMap.get(setting);
if (listeners != null) {
for (ClusterSettingListener<String> listener : listeners) {
listener.onSettingUpdate(settingValue);
}
}
} catch(Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}
/**
* Class that handles response to GET /_cluster/settings
*/
Expand All @@ -196,10 +255,17 @@ public void onResponse(final ClusterStateResponse clusterStateResponse) {
.getMetaData()
.persistentSettings();

for (final Setting<Integer> setting : managedSettings) {
for (final Setting<Integer> setting : managedIntSettings) {
Integer settingValue = clusterSettings.getAsInt(setting.getKey(), null);
if (settingValue != null) {
callListeners(setting, settingValue);
callIntSettingListeners(setting, settingValue);
}
}

for (final Setting<String> setting : managedStringSettings) {
String settingValue = clusterSettings.get(setting.getKey(), "");
if (settingValue != null) {
callStringSettingListeners(setting, settingValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public final class PerformanceAnalyzerClusterSettings {
0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
);

public enum PerformanceAnalyzerFeatureBits {
PA_BIT,
Expand All @@ -33,4 +33,15 @@ public enum PerformanceAnalyzerFeatureBits {
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Cluster setting controlling the config overrides to be applied on performance
* analyzer components.
*/
public static final Setting<String> CONFIG_OVERRIDES_SETTING = Setting.simpleString(
"cluster.metadata.perf_analyzer.config.overrides",
"",
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
}
Loading