Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add initial support for dynamic config overriding (#148)
Browse files Browse the repository at this point in the history
* Add initial support for dynamic config overriding

* Use helper to serialize/deserialize instead of the wrapper

* Add licence header to new files

* Update licence year to 2020
  • Loading branch information
ktkrg authored Jul 31, 2020
1 parent f89a956 commit dd7f58a
Show file tree
Hide file tree
Showing 9 changed files with 758 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.setting.handler.ConfigOverridesClusterSettingHandler;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CacheConfigMetricsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.setting.handler.NodeStatsSettingHandler;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.http_action.config.PerformanceAnalyzerOverridesClusterConfigAction;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.http_action.config.PerformanceAnalyzerResourceProvider;
import java.io.File;
import java.security.AccessController;
Expand Down Expand Up @@ -95,6 +98,7 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.transport.PerformanceAnalyzerTransportInterceptor;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.util.Utils;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.writer.EventLogQueueProcessor;

import static java.util.Collections.singletonList;

public final class PerformanceAnalyzerPlugin extends Plugin implements ActionPlugin, NetworkPlugin, SearchPlugin {
Expand All @@ -104,6 +108,8 @@ public final class PerformanceAnalyzerPlugin extends Plugin implements ActionPlu
private static SecurityManager sm = null;
private final PerformanceAnalyzerClusterSettingHandler perfAnalyzerClusterSettingHandler;
private final NodeStatsSettingHandler nodeStatsSettingHandler;
private final ConfigOverridesClusterSettingHandler configOverridesClusterSettingHandler;
private final ConfigOverridesWrapper configOverridesWrapper;
private final PerformanceAnalyzerController performanceAnalyzerController;
private final ClusterSettingsManager clusterSettingsManager;

Expand Down Expand Up @@ -152,9 +158,27 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
//initialize plugin settings. Accessing plugin settings before this
//point will break, as the plugin location will not be initialized.
PluginSettings.instance();

scheduledMetricCollectorsExecutor = new ScheduledMetricCollectorsExecutor();
this.performanceAnalyzerController = new PerformanceAnalyzerController(scheduledMetricCollectorsExecutor);

configOverridesWrapper = new ConfigOverridesWrapper();
clusterSettingsManager = new ClusterSettingsManager(Arrays.asList(PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING),
Collections.singletonList(PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING));
configOverridesClusterSettingHandler = new ConfigOverridesClusterSettingHandler(configOverridesWrapper, clusterSettingsManager,
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING);
clusterSettingsManager.addSubscriberForStringSetting(PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING,
configOverridesClusterSettingHandler);
perfAnalyzerClusterSettingHandler = new PerformanceAnalyzerClusterSettingHandler(performanceAnalyzerController,
clusterSettingsManager);
clusterSettingsManager.addSubscriberForIntSetting(PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
perfAnalyzerClusterSettingHandler);

nodeStatsSettingHandler = new NodeStatsSettingHandler(performanceAnalyzerController,
clusterSettingsManager);
clusterSettingsManager.addSubscriberForIntSetting(PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING,
nodeStatsSettingHandler);

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new ThreadPoolMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new CacheConfigMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new CircuitBreakerCollector());
Expand All @@ -163,7 +187,7 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new MetricsPurgeActivity());

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new NodeDetailsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new NodeDetailsCollector(configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new NodeStatsMetricsCollector(performanceAnalyzerController));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new MasterServiceMetrics());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new MasterServiceEventMetrics());
Expand All @@ -172,22 +196,6 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(StatsCollector.instance());
scheduledMetricCollectorsExecutor.start();

clusterSettingsManager = new ClusterSettingsManager(
Arrays.asList(PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING));

perfAnalyzerClusterSettingHandler = new PerformanceAnalyzerClusterSettingHandler(
performanceAnalyzerController,
clusterSettingsManager);
clusterSettingsManager.addSubscriberForSetting(PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
perfAnalyzerClusterSettingHandler);

nodeStatsSettingHandler = new NodeStatsSettingHandler(
performanceAnalyzerController,
clusterSettingsManager);
clusterSettingsManager.addSubscriberForSetting(PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING,
nodeStatsSettingHandler);

EventLog eventLog = new EventLog();
EventLogFileHandler eventLogFileHandler = new EventLogFileHandler(eventLog, PluginSettings.instance().getMetricsLocation());
new EventLogQueueProcessor(eventLogFileHandler,
Expand Down Expand Up @@ -237,7 +245,10 @@ public List<org.elasticsearch.rest.RestHandler> getRestHandlers(final Settings s
PerformanceAnalyzerResourceProvider performanceAnalyzerRp = new PerformanceAnalyzerResourceProvider(settings, restController);
PerformanceAnalyzerClusterConfigAction paClusterConfigAction = new PerformanceAnalyzerClusterConfigAction(settings,
restController, perfAnalyzerClusterSettingHandler, nodeStatsSettingHandler);
return Arrays.asList(performanceanalyzerConfigAction, paClusterConfigAction, performanceAnalyzerRp);
PerformanceAnalyzerOverridesClusterConfigAction paOverridesConfigClusterAction =
new PerformanceAnalyzerOverridesClusterConfigAction(settings, restController,
configOverridesClusterSettingHandler, configOverridesWrapper);
return Arrays.asList(performanceanalyzerConfigAction, paClusterConfigAction, performanceAnalyzerRp, paOverridesConfigClusterAction);
}

@Override
Expand Down Expand Up @@ -275,7 +286,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING,
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING);
PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING,
PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesHelper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.NodeDetailColumns;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.NodeRole;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration;
Expand All @@ -27,15 +29,22 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;

import java.io.IOException;
import java.util.Iterator;

public class NodeDetailsCollector extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor {
public static final int SAMPLING_TIME_INTERVAL = MetricsConfiguration.CONFIG_MAP.get(NodeDetailsCollector.class).samplingInterval;
private static final Logger LOG = LogManager.getLogger(NodeDetailsCollector.class);
private static final int KEYS_PATH_LENGTH = 0;
private final ConfigOverridesWrapper configOverridesWrapper;

public NodeDetailsCollector() {
this(null);
}

public NodeDetailsCollector(final ConfigOverridesWrapper configOverridesWrapper) {
super(SAMPLING_TIME_INTERVAL, "NodeDetails");
this.configOverridesWrapper = configOverridesWrapper;
}

@Override
Expand All @@ -54,6 +63,22 @@ public void collectMetrics(long startTime) {
.append(
PerformanceAnalyzerMetrics.sMetricNewLineDelimitor);

// We add the config overrides in line#2 because we don't know how many lines
// follow that belong to actual node details, and the reader also has no way to
// know this information in advance unless we add the number of nodes as
// additional metadata in the file.
try {
String rcaOverrides = ConfigOverridesHelper.serialize(configOverridesWrapper.getCurrentClusterConfigOverrides());
value.append(rcaOverrides);
} catch (IOException ioe) {
LOG.error("Unable to serialize rca config overrides.", ioe);
}
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor);

// line#3 denotes when the timestamp when the config override happened.
value.append(configOverridesWrapper.getLastUpdatedTimestamp());
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor);

DiscoveryNodes discoveryNodes = ESResources.INSTANCE.getClusterService().state().nodes();

DiscoveryNode masterNode = discoveryNodes.getMasterNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@
*/
public class ClusterSettingsManager implements ClusterStateListener {
private static final Logger LOG = LogManager.getLogger(ClusterSettingsManager.class);
private Map<Setting<Integer>, List<ClusterSettingListener<Integer>>> listenerMap = new HashMap<>();
private final List<Setting<Integer>> managedSettings = new ArrayList<>();
private final Map<Setting<Integer>, List<ClusterSettingListener<Integer>>> intSettingListenerMap = new HashMap<>();
private final Map<Setting<String>, List<ClusterSettingListener<String>>> stringSettingListenerMap = new HashMap<>();
private final List<Setting<Integer>> managedIntSettings = new ArrayList<>();
private final List<Setting<String>> managedStringSettings = new ArrayList<>();
private final ClusterSettingsResponseHandler clusterSettingsResponseHandler;

private boolean initialized = false;

public ClusterSettingsManager(List<Setting<Integer>> initialSettings) {
managedSettings.addAll(initialSettings);
public ClusterSettingsManager(List<Setting<Integer>> intSettings, List<Setting<String>> stringSettings) {
managedIntSettings.addAll(intSettings);
managedStringSettings.addAll(stringSettings);
this.clusterSettingsResponseHandler = new ClusterSettingsResponseHandler();
}

Expand All @@ -45,18 +48,35 @@ public ClusterSettingsManager(List<Setting<Integer>> initialSettings) {
* @param setting The setting that needs to be listened to.
* @param listener The listener object that will be called when the setting changes.
*/
public void addSubscriberForSetting(Setting<Integer> setting, ClusterSettingListener<Integer> listener) {
if (listenerMap.containsKey(setting)) {
final List<ClusterSettingListener<Integer>> currentListeners = listenerMap.get(setting);
public void addSubscriberForIntSetting(Setting<Integer> setting, ClusterSettingListener<Integer> listener) {
if (intSettingListenerMap.containsKey(setting)) {
final List<ClusterSettingListener<Integer>> currentListeners = intSettingListenerMap.get(setting);
if (!currentListeners.contains(listener)) {
currentListeners.add(listener);
listenerMap.put(setting, currentListeners);
intSettingListenerMap.put(setting, currentListeners);
}
} else {
listenerMap.put(setting, Collections.singletonList(listener));
intSettingListenerMap.put(setting, Collections.singletonList(listener));
}
}

/**
* Adds a listener that will be called when the requested setting's value changes.
*
* @param setting The setting that needs to be listened to.
* @param listener The listener object that will be called when the setting changes.
*/
public void addSubscriberForStringSetting(Setting<String> setting, ClusterSettingListener<String> listener) {
if (stringSettingListenerMap.containsKey(setting)) {
final List<ClusterSettingListener<String>> currentListeners = stringSettingListenerMap.get(setting);
if (!currentListeners.contains(listener)) {
currentListeners.add(listener);
stringSettingListenerMap.put(setting, currentListeners);
}
} else {
stringSettingListenerMap.put(setting, Collections.singletonList(listener));
}
}
/**
* Bootstraps the listeners and tries to read initial values for cluster settings.
*/
Expand Down Expand Up @@ -91,14 +111,34 @@ public void updateSetting(final Setting<Integer> setting, final Integer newValue
ESResources.INSTANCE.getClient().admin().cluster().updateSettings(request);
}

/**
* Updates the requested setting with the requested value across the cluster.
*
* @param setting The setting that needs to be updated.
* @param newValue The new value for the setting.
*/
public void updateSetting(final Setting<String> setting, final String newValue) {
final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();
request.persistentSettings(Settings.builder()
.put(setting.getKey(), newValue)
.build());
ESResources.INSTANCE.getClient().admin().cluster().updateSettings(request);
}

/**
* Registers a setting update listener for all the settings managed by this instance.
*/
private void registerSettingUpdateListener() {
for (Setting<Integer> setting : managedSettings) {
for (Setting<Integer> setting : managedIntSettings) {
ESResources.INSTANCE.getClusterService()
.getClusterSettings()
.addSettingsUpdateConsumer(setting, updatedVal -> callListeners(setting, updatedVal));
.addSettingsUpdateConsumer(setting, updatedVal -> callIntSettingListeners(setting, updatedVal));
}

for (Setting<String> setting : managedStringSettings) {
ESResources.INSTANCE.getClusterService()
.getClusterSettings()
.addSettingsUpdateConsumer(setting, updatedVal -> callStringSettingListeners(setting, updatedVal));
}
}

Expand Down Expand Up @@ -166,9 +206,9 @@ public void clusterChanged(final ClusterChangedEvent event) {
* @param setting The setting whose listeners need to be notified.
* @param settingValue The new value for the setting.
*/
private void callListeners(final Setting<Integer> setting, int settingValue) {
private void callIntSettingListeners(final Setting<Integer> setting, int settingValue) {
try {
final List<ClusterSettingListener<Integer>> listeners = listenerMap.get(setting);
final List<ClusterSettingListener<Integer>> listeners = intSettingListenerMap.get(setting);
if (listeners != null) {
for (ClusterSettingListener<Integer> listener : listeners) {
listener.onSettingUpdate(settingValue);
Expand All @@ -180,6 +220,25 @@ private void callListeners(final Setting<Integer> setting, int settingValue) {
}
}

/**
* Calls all the listeners for the specified setting with the requested value.
*
* @param setting The setting whose listeners need to be notified.
* @param settingValue The new value for the setting.
*/
private void callStringSettingListeners(final Setting<String> setting, String settingValue) {
try {
final List<ClusterSettingListener<String>> listeners = stringSettingListenerMap.get(setting);
if (listeners != null) {
for (ClusterSettingListener<String> listener : listeners) {
listener.onSettingUpdate(settingValue);
}
}
} catch(Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}
/**
* Class that handles response to GET /_cluster/settings
*/
Expand All @@ -196,10 +255,17 @@ public void onResponse(final ClusterStateResponse clusterStateResponse) {
.getMetaData()
.persistentSettings();

for (final Setting<Integer> setting : managedSettings) {
for (final Setting<Integer> setting : managedIntSettings) {
Integer settingValue = clusterSettings.getAsInt(setting.getKey(), null);
if (settingValue != null) {
callListeners(setting, settingValue);
callIntSettingListeners(setting, settingValue);
}
}

for (final Setting<String> setting : managedStringSettings) {
String settingValue = clusterSettings.get(setting.getKey(), "");
if (settingValue != null) {
callStringSettingListeners(setting, settingValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public final class PerformanceAnalyzerClusterSettings {
0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
);

public enum PerformanceAnalyzerFeatureBits {
PA_BIT,
Expand All @@ -33,4 +33,15 @@ public enum PerformanceAnalyzerFeatureBits {
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Cluster setting controlling the config overrides to be applied on performance
* analyzer components.
*/
public static final Setting<String> CONFIG_OVERRIDES_SETTING = Setting.simpleString(
"cluster.metadata.perf_analyzer.config.overrides",
"",
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
}
Loading

0 comments on commit dd7f58a

Please sign in to comment.