Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add settings to disable/enable AD dynamically (#105)
Browse files Browse the repository at this point in the history
This PR adds settings to disable/enable AD dynamically.  AD plugin rejects RESTful requests and stops AD jobs when it is disabled.

Testing done:
- added tests to verify if AD plugin rejects RESTful requests when it is disabled
- manually verified AD jobs are stopped when AD plugin is disabled
  • Loading branch information
kaituo authored May 4, 2020
1 parent b351d40 commit 8109660
Show file tree
Hide file tree
Showing 15 changed files with 391 additions and 19 deletions.
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -251,6 +251,9 @@ List<String> jacocoExclusions = [
// Class containing just constants. Don't need to test
'com.amazon.opendistroforelasticsearch.ad.constant.*',

// mostly skeleton code. Tested major logic in restful api tests
'com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting',

'com.amazon.opendistroforelasticsearch.ad.common.exception.FeatureNotAvailableException',
'com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException',
'com.amazon.opendistroforelasticsearch.ad.util.ClientUtil',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package com.amazon.opendistroforelasticsearch.ad;

import static java.util.Collections.unmodifiableList;

import com.amazon.opendistroforelasticsearch.ad.breaker.ADCircuitBreakerService;
import com.amazon.opendistroforelasticsearch.ad.cluster.ADClusterEventListener;
import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData;
Expand Down Expand Up @@ -46,6 +48,7 @@
import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestStatsAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStat;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.StatNames;
Expand Down Expand Up @@ -125,8 +128,8 @@
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Entry point of AD plugin.
Expand Down Expand Up @@ -245,6 +248,7 @@ public Collection<Object> createComponents(
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry
) {
EnabledSetting.getInstance().init(clusterService);
this.client = client;
this.threadPool = threadPool;
Settings settings = environment.settings();
Expand Down Expand Up @@ -378,15 +382,17 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
settings,
AD_THREAD_POOL_NAME,
Math.max(1, EsExecutors.numberOfProcessors(settings) / 4),
AD_THEAD_POOL_QUEUE_SIZE,
AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE,
"opendistro.ad." + AD_THREAD_POOL_NAME
)
);
}

@Override
public List<Setting<?>> getSettings() {
return ImmutableList
List<Setting<?>> enabledSetting = EnabledSetting.getInstance().getSettings();

List<Setting<?>> systemSetting = ImmutableList
.of(
AnomalyDetectorSettings.MAX_ANOMALY_DETECTORS,
AnomalyDetectorSettings.MAX_ANOMALY_FEATURES,
Expand All @@ -403,6 +409,7 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.BACKOFF_INITIAL_DELAY,
AnomalyDetectorSettings.MAX_RETRY_FOR_BACKOFF
);
return unmodifiableList(Stream.concat(enabledSetting.stream(), systemSetting.stream()).collect(Collectors.toList()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -24,4 +24,5 @@ public class CommonErrorMessages {
public static final String MEMORY_LIMIT_EXCEEDED_ERR_MSG = "AD models memory usage exceeds our limit.";
public static final String FEATURE_NOT_AVAILABLE_ERR_MSG = "No Feature in current detection window.";
public static final String MEMORY_CIRCUIT_BROKEN_ERR_MSG = "AD memory circuit is broken.";
public static final String DISABLED_ERR_MSG = "AD plugin is disabled. To enable update opendistro.anomaly_detection.enabled to true";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

package com.amazon.opendistroforelasticsearch.ad.rest;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -64,6 +67,9 @@ public AbstractSearchAction(RestController controller, String urlPath, String in

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.parseXContent(request.contentOrSourceParamParser());
searchSourceBuilder.fetchSource(getSourceContext(request));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
package com.amazon.opendistroforelasticsearch.ad.rest;

import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;

import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -86,6 +89,10 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}

String detectorId = request.param(DETECTOR_ID);

return channel -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,10 @@
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorActionHandler;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -69,6 +72,10 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}

String detectorId = request.param(DETECTOR_ID);

return channel -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorExecutionInput;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultRequest;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
Expand Down Expand Up @@ -106,6 +108,9 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}
AnomalyDetectorExecutionInput input = getAnomalyDetectorExecutionInput(request);
return channel -> {
String rawPath = request.rawPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorProfile;
import com.amazon.opendistroforelasticsearch.ad.model.ProfileName;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import com.google.common.collect.Sets;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorProfileRunner;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -105,6 +107,9 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}
String detectorId = request.param(DETECTOR_ID);
boolean returnJob = request.paramAsBoolean("job", false);
String typesStr = request.param(TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.IndexAnomalyDetectorActionHandler;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.WriteRequest;
Expand Down Expand Up @@ -92,6 +95,10 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}

String detectorId = request.param(DETECTOR_ID, AnomalyDetector.NO_ID);
logger.info("AnomalyDetector {} action for detectorId {}", request.method(), detectorId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package com.amazon.opendistroforelasticsearch.ad.rest;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsRequest;
Expand Down Expand Up @@ -67,6 +69,9 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}
ADStatsRequest adStatsRequest = getRequest(request);
return channel -> client.execute(ADStatsAction.INSTANCE, adStatsRequest, new RestActions.NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.settings;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.settings.Setting.Property.Dynamic;
import static org.elasticsearch.common.settings.Setting.Property.NodeScope;

public class EnabledSetting {

private static Logger logger = LogManager.getLogger(EnabledSetting.class);

/**
* Singleton instance
*/
private static EnabledSetting INSTANCE;

/**
* Settings name
*/
public static final String AD_PLUGIN_ENABLED = "opendistro.anomaly_detection.enabled";

private final Map<String, Setting<?>> settings = unmodifiableMap(new HashMap<String, Setting<?>>() {
{
/**
* AD plugin enable/disable setting
*/
put(AD_PLUGIN_ENABLED, Setting.boolSetting(AD_PLUGIN_ENABLED, true, NodeScope, Dynamic));
}
});

/** Latest setting value for each registered key. Thread-safe is required. */
private final Map<String, Object> latestSettings = new ConcurrentHashMap<>();

private ClusterService clusterService;

private EnabledSetting() {}

public static synchronized EnabledSetting getInstance() {
if (INSTANCE == null) {
INSTANCE = new EnabledSetting();
}
return INSTANCE;
}

private void setSettingsUpdateConsumers() {
for (Setting<?> setting : settings.values()) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(setting, newVal -> {
logger.info("[AD] The value of setting [{}] changed to [{}]", setting.getKey(), newVal);
latestSettings.put(setting.getKey(), newVal);
});
}
}

/**
* Get setting value by key. Return default value if not configured explicitly.
*
* @param key setting key.
* @param <T> Setting type
* @return T setting value or default
*/
@SuppressWarnings("unchecked")
public <T> T getSettingValue(String key) {
return (T) latestSettings.getOrDefault(key, getSetting(key).getDefault(Settings.EMPTY));
}

private Setting<?> getSetting(String key) {
if (settings.containsKey(key)) {
return settings.get(key);
}
throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

public static boolean isADPluginEnabled() {
return EnabledSetting.getInstance().getSettingValue(EnabledSetting.AD_PLUGIN_ENABLED);
}

public void init(ClusterService clusterService) {
this.clusterService = clusterService;
setSettingsUpdateConsumers();
}

public List<Setting<?>> getSettings() {
return new ArrayList<>(settings.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.amazon.opendistroforelasticsearch.ad.model.FeatureData;
import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration;
import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.StatNames;
import com.amazon.opendistroforelasticsearch.ad.util.ColdStartRunner;
Expand Down Expand Up @@ -183,7 +184,7 @@ private List<FeatureData> getFeatureData(double[] currentFeature, AnomalyDetecto
* * index does not exist
* * all features have been disabled
* + anomaly detector is not available
* + AD plugin is disabled
*
* Known cause of InternalFailure:
* + threshold model node is not available
Expand All @@ -197,6 +198,7 @@ private List<FeatureData> getFeatureData(double[] currentFeature, AnomalyDetecto
*/
@Override
protected void doExecute(Task task, ActionRequest actionRequest, ActionListener<AnomalyResultResponse> listener) {

AnomalyResultRequest request = AnomalyResultRequest.fromActionRequest(actionRequest);
ActionListener<AnomalyResultResponse> original = listener;
listener = ActionListener.wrap(original::onResponse, e -> {
Expand All @@ -206,6 +208,10 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener<

String adID = request.getAdID();

if (!EnabledSetting.isADPluginEnabled()) {
throw new EndRunException(adID, CommonErrorMessages.DISABLED_ERR_MSG, true);
}

adStats.getStat(StatNames.AD_EXECUTE_REQUEST_COUNT.getName()).increment();

if (adCircuitBreakerService.isOpen()) {
Expand Down
Loading

0 comments on commit 8109660

Please sign in to comment.