Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add setting for enabling/disabling circuit breaker (#169)
Browse files Browse the repository at this point in the history
* Add setting for enabling/disabling circuit breaker

A circuit breaker is broken when heap memory usage exceeds 85%, and the related AD job would be disabled because of that. It is possible at one point the heap memory usage exceeds 85% and gets back to less than 85% soon afterward.This PR mitigates the issue in the two following ways:
First, only disable the AD job after the circuit breaker is broken for a consecutive number of times (3 times).
Second, add a setting for enabling/disabling circuit breaker.

Testing done:
1. After disabling circuit breaker, an open circuit breaker does not affect AD job execution.
2. Verified an open circuit breaker wouldn't cause an AD job to be stopped immediately.
  • Loading branch information
kaituo authored Jun 23, 2020
1 parent f62e570 commit 7c1d524
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.monitor.jvm.JvmService;

import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;

/**
* Class {@code ADCircuitBreakerService} provide storing, retrieving circuit breakers functions.
*
Expand Down Expand Up @@ -79,6 +81,10 @@ public ADCircuitBreakerService init() {
}

public Boolean isOpen() {
if (!EnabledSetting.isADBreakerEnabled()) {
return false;
}

for (CircuitBreaker breaker : breakers.values()) {
if (breaker.isOpen()) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,15 @@ public class LimitExceededException extends EndRunException {
public LimitExceededException(String anomalyDetectorId, String message) {
super(anomalyDetectorId, message, true);
}

/**
* Constructor with an anomaly detector ID and an explanation, and a flag for stopping.
*
* @param anomalyDetectorId ID of the anomaly detector for which the limit is exceeded
* @param message explanation for the limit
* @param stopNow whether to stop detector immediately
*/
public LimitExceededException(String anomalyDetectorId, String message, boolean stopNow) {
super(anomalyDetectorId, message, stopNow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private AnomalyDetectorSettings() {}
public static final Setting<Integer> MAX_RETRY_FOR_END_RUN_EXCEPTION = Setting
.intSetting(
"opendistro.anomaly_detection.max_retry_for_end_run_exception",
3,
6,
0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,19 @@ public class EnabledSetting {
*/
public static final String AD_PLUGIN_ENABLED = "opendistro.anomaly_detection.enabled";

public static final String AD_BREAKER_ENABLED = "opendistro.anomaly_detection.breaker.enabled";

private final Map<String, Setting<?>> settings = unmodifiableMap(new HashMap<String, Setting<?>>() {
{
/**
* AD plugin enable/disable setting
*/
put(AD_PLUGIN_ENABLED, Setting.boolSetting(AD_PLUGIN_ENABLED, true, NodeScope, Dynamic));

/**
* AD breaker enable/disable setting
*/
put(AD_BREAKER_ENABLED, Setting.boolSetting(AD_BREAKER_ENABLED, true, NodeScope, Dynamic));
}
});

Expand Down Expand Up @@ -96,10 +103,22 @@ private Setting<?> getSetting(String key) {
throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

/**
* Whether AD plugin is enabled. If disabled, AD plugin rejects RESTful requests and stop all AD jobs.
* @return whether AD plugin is enabled.
*/
public static boolean isADPluginEnabled() {
return EnabledSetting.getInstance().getSettingValue(EnabledSetting.AD_PLUGIN_ENABLED);
}

/**
* Whether AD circuit breaker is enabled or not. If disabled, an open circuit breaker wouldn't cause an AD job to be stopped.
* @return whether AD circuit breaker is enabled or not.
*/
public static boolean isADBreakerEnabled() {
return EnabledSetting.getInstance().getSettingValue(EnabledSetting.AD_BREAKER_ENABLED);
}

public void init(ClusterService clusterService) {
this.clusterService = clusterService;
setSettingsUpdateConsumers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ private List<FeatureData> getFeatureData(double[] currentFeature, AnomalyDetecto
* + training data for cold start not available
* + cold start cannot succeed
* + unknown prediction error
* + memory circuit breaker tripped
*
* Known cause of EndRunException with endNow returning true:
* + a model's memory size reached limit
Expand Down Expand Up @@ -215,7 +216,7 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener<
adStats.getStat(StatNames.AD_EXECUTE_REQUEST_COUNT.getName()).increment();

if (adCircuitBreakerService.isOpen()) {
listener.onFailure(new LimitExceededException(adID, CommonErrorMessages.MEMORY_CIRCUIT_BROKEN_ERR_MSG));
listener.onFailure(new LimitExceededException(adID, CommonErrorMessages.MEMORY_CIRCUIT_BROKEN_ERR_MSG, false));
return;
}

Expand Down

0 comments on commit 7c1d524

Please sign in to comment.