Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adding negative cache to throttle extra request https://github.com/opendistro-for-elasticsearch/anomaly-detection/issues/33 #40

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public HourlyCron(ClusterService clusterService, Client client) {
public void run() {
DiscoveryNode[] dataNodes = clusterService.state().nodes().getDataNodes().values().toArray(DiscoveryNode.class);

// we also add the cancel query function here based on query text from the negative cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question. From #33 , query cannot be aborted. The comment indicates the opposite. Is it confusing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. That's true. Currently anomaly-detection cannot abort running query in elasticsearch. If a query running longer than expected, anomaly-detection will not wait for that even though the query is still running. To solve this issue, we will 1) stop accepting new query if this case happen which is #33 2) daily cron clean up running query using es task management API(https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html)


CronRequest modelDeleteRequest = new CronRequest(dataNodes);
client.execute(CronAction.INSTANCE, modelDeleteRequest, ActionListener.wrap(response -> {
if (response.hasFailures()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -115,15 +116,16 @@ public FeatureManager(
* @param detector anomaly detector for which the features are returned
* @param startTime start time of the data point in epoch milliseconds
* @param endTime end time of the data point in epoch milliseconds
* @param stateManager ADStateManager
* @return unprocessed features and processed features for the current data point
*/
@Deprecated
public SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime) {
public SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime, ADStateManager stateManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added parameter should be a dependency injected rather than passed all the way down the stack.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. We should not pass state manager around.

double[][] currentPoints = null;
Deque<Entry<Long, double[]>> shingle = detectorIdsToTimeShingles
.computeIfAbsent(detector.getDetectorId(), id -> new ArrayDeque<Entry<Long, double[]>>(shingleSize));
if (shingle.isEmpty() || shingle.getLast().getKey() < endTime) {
Optional<double[]> point = searchFeatureDao.getFeaturesForPeriod(detector, startTime, endTime);
Optional<double[]> point = searchFeatureDao.getFeaturesForPeriod(detector, startTime, endTime, stateManager);
if (point.isPresent()) {
if (shingle.size() == shingleSize) {
shingle.remove();
Expand Down Expand Up @@ -174,13 +176,16 @@ private double[][] filterAndFill(Deque<Entry<Long, double[]>> shingle, long endT
* in dimension via shingling.
*
* @param detector contains data info (indices, documents, etc)
* @param stateManager ADStateManager
* @return data for cold-start training, or empty if unavailable
*/
@Deprecated
public Optional<double[][]> getColdStartData(AnomalyDetector detector) {
public Optional<double[][]> getColdStartData(AnomalyDetector detector, ADStateManager stateManager) {
return searchFeatureDao
.getLatestDataTime(detector)
.flatMap(latest -> searchFeatureDao.getFeaturesForSampledPeriods(detector, maxTrainSamples, maxSampleStride, latest))
.flatMap(
latest -> searchFeatureDao.getFeaturesForSampledPeriods(detector, maxTrainSamples, maxSampleStride, latest, stateManager)
)
.map(
samples -> transpose(
interpolator.interpolate(transpose(samples.getKey()), samples.getValue() * (samples.getKey().length - 1) + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -114,18 +115,24 @@ public Optional<Long> getLatestDataTime(AnomalyDetector detector) {
}

/**
* Gets features for the given time period.
* Gets features for the given time period. This function also add given detector to negative cache before sending es request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. Adds.

Also, use descriptive language instead of prescriptive. See java doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update, thanks for the suggestion

* Once we get response/exception within timeout, we treat this request as complete and clear the negative cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. We is prescriptive (giving orders to code), not descriptive (stating what code does).

* Otherwise this detector entry remain in the negative to reject further request.
*
* @param detector info about indices, documents, feature query
* @param startTime epoch milliseconds at the beginning of the period
* @param endTime epoch milliseconds at the end of the period
* @param stateManager ADStateManager
* @throws IllegalStateException when unexpected failures happen
* @return features from search results, empty when no data found
*/
public Optional<double[]> getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime) {
public Optional<double[]> getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime, ADStateManager stateManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The needed dependency can be injected in this class.

SearchRequest searchRequest = createFeatureSearchRequest(detector, startTime, endTime, Optional.empty());
// add (detectorId, filteredQuery) to negative cache
stateManager.insertFilteredQuery(detector, searchRequest);
// send throttled request: this request will clear the negative cache if the request finished within timeout
return clientUtil
.<SearchRequest, SearchResponse>timedRequest(searchRequest, logger, client::search)
.<SearchRequest, SearchResponse>throttledTimedRequest(searchRequest, logger, client::search, stateManager, detector)
.flatMap(resp -> parseResponse(resp, detector.getEnabledFeatureIds()));
}

Expand Down Expand Up @@ -242,20 +249,22 @@ public void getFeatureSamplesForPeriods(
* @param maxSamples the maximum number of samples to return
* @param maxStride the maximum number of periods between samples
* @param endTime the end time of the latest period
* @param stateManager ADStateManager
* @return sampled features and stride, empty when no data found
*/
public Optional<Entry<double[][], Integer>> getFeaturesForSampledPeriods(
AnomalyDetector detector,
int maxSamples,
int maxStride,
long endTime
long endTime,
ADStateManager stateManager
) {
Map<Long, double[]> cache = new HashMap<>();
int currentStride = maxStride;
Optional<double[][]> features = Optional.empty();
while (currentStride >= 1) {
boolean isInterpolatable = currentStride < maxStride;
features = getFeaturesForSampledPeriods(detector, maxSamples, currentStride, endTime, cache, isInterpolatable);
features = getFeaturesForSampledPeriods(detector, maxSamples, currentStride, endTime, cache, isInterpolatable, stateManager);
if (!features.isPresent() || features.get().length > maxSamples / 2 || currentStride == 1) {
break;
} else {
Expand All @@ -275,7 +284,8 @@ private Optional<double[][]> getFeaturesForSampledPeriods(
int stride,
long endTime,
Map<Long, double[]> cache,
boolean isInterpolatable
boolean isInterpolatable,
ADStateManager stateManager
) {
ArrayDeque<double[]> sampledFeatures = new ArrayDeque<>(maxSamples);
for (int i = 0; i < maxSamples; i++) {
Expand All @@ -284,7 +294,7 @@ private Optional<double[][]> getFeaturesForSampledPeriods(
if (cache.containsKey(end)) {
sampledFeatures.addFirst(cache.get(end));
} else {
Optional<double[]> features = getFeaturesForPeriod(detector, end - span, end);
Optional<double[]> features = getFeaturesForPeriod(detector, end - span, end, stateManager);
if (features.isPresent()) {
cache.put(end, features.get());
sampledFeatures.addFirst(features.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorActionHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorResponse;
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorRequest;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -89,8 +89,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
return channel -> {
if (channel.request().method() == RestRequest.Method.POST) {
logger.info("Stop anomaly detector {}", detectorId);
StopDetectorRequest stopDetectorRequest = new StopDetectorRequest(detectorId);
client.execute(StopDetectorAction.INSTANCE, stopDetectorRequest, stopAdDetectorListener(channel, detectorId));
DeleteDetectorRequest deleteDetectorRequest = new DeleteDetectorRequest().adID(detectorId);
client.execute(DeleteDetectorAction.INSTANCE, deleteDetectorRequest, stopAdDetectorListener(channel, detectorId));
} else if (channel.request().method() == RestRequest.Method.DELETE) {
logger.info("Delete anomaly detector {}", detectorId);
handler
Expand All @@ -117,11 +117,11 @@ private void deleteAnomalyDetectorDoc(
client.delete(deleteRequest, new RestStatusToXContentListener<>(channel));
}

private ActionListener<StopDetectorResponse> stopAdDetectorListener(RestChannel channel, String detectorId) {
return new ActionListener<StopDetectorResponse>() {
private ActionListener<AcknowledgedResponse> stopAdDetectorListener(RestChannel channel, String detectorId) {
return new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(StopDetectorResponse stopDetectorResponse) {
if (stopDetectorResponse.success()) {
public void onResponse(AcknowledgedResponse deleteDetectorResponse) {
if (deleteDetectorResponse.isAcknowledged()) {
logger.info("AD model deleted successfully for detector {}", detectorId);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, "AD model deleted successfully"));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,93 +29,91 @@ public final class AnomalyDetectorSettings {
private AnomalyDetectorSettings() {}

public static final Setting<Integer> MAX_ANOMALY_DETECTORS = Setting
.intSetting("opendistro.anomaly_detection.max_anomaly_detectors", 1000, Setting.Property.NodeScope, Setting.Property.Dynamic);
.intSetting("ml.anomaly_detectors.max_anomaly_detectors", 1000, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<Integer> MAX_ANOMALY_FEATURES = Setting
.intSetting("opendistro.anomaly_detection.max_anomaly_features", 5, Setting.Property.NodeScope, Setting.Property.Dynamic);
.intSetting("ml.anomaly_detectors.max_anomaly_features", 5, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<TimeValue> REQUEST_TIMEOUT = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.request_timeout",
"ml.anomaly_detectors.request_timeout",
TimeValue.timeValueSeconds(10),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> DETECTION_INTERVAL = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.detection_interval",
"ml.anomaly_detectors.detection_interval",
TimeValue.timeValueMinutes(10),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> DETECTION_WINDOW_DELAY = Setting
.timeSetting(
"opendistro.anomaly_detection.detection_window_delay",
"ml.anomaly_detectors.detection_window_delay",
TimeValue.timeValueMinutes(0),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> AD_RESULT_ROLLOVER_PERIOD = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.ad_result_rollover_period",
"ml.anomaly_detectors.ad_result_rollover_period",
TimeValue.timeValueHours(12),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> AD_RESULT_HISTORY_ROLLOVER_PERIOD = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.ad_result_history_rollover_period",
"ml.anomaly_detectors.ad_result_history_rollover_period",
TimeValue.timeValueHours(12),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> AD_RESULT_HISTORY_INDEX_MAX_AGE = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.ad_result_history_max_age",
"ml.anomaly_detectors.ad_result_history_max_age",
TimeValue.timeValueHours(24),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<Long> AD_RESULT_HISTORY_MAX_DOCS = Setting
.longSetting("opendistro.anomaly_detection.ad_result_history_max_docs", 10000L, 0L,
Setting.Property.NodeScope, Setting.Property.Dynamic);
.longSetting("ml.anomaly_detectors.ad_result_history_max_docs", 10000L, 0L, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<Integer> MAX_RETRY_FOR_UNRESPONSIVE_NODE = Setting
.intSetting("opendistro.anomaly_detection.max_retry_for_unresponsive_node", 5, 0,
Setting.Property.NodeScope, Setting.Property.Dynamic);
.intSetting("ml.anomaly_detectors.max_retry_for_unresponsive_node", 5, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<TimeValue> COOLDOWN_MINUTES = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.cooldown_minutes",
"ml.anomaly_detectors.cooldown_minutes",
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> BACKOFF_MINUTES = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.backoff_minutes",
"ml.anomaly_detectors.backoff_minutes",
TimeValue.timeValueMinutes(15),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> BACKOFF_INITIAL_DELAY = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.backoff_initial_delay",
"ml.anomaly_detectors.backoff_initial_delay",
TimeValue.timeValueMillis(1000),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<Integer> MAX_RETRY_FOR_BACKOFF = Setting
.intSetting("opendistro.anomaly_detection.max_retry_for_backoff", 3, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);
.intSetting("ml.anomaly_detectors.max_retry_for_backoff", 3, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final String ANOMALY_DETECTORS_INDEX_MAPPING_FILE = "mappings/anomaly-detectors.json";
public static final String ANOMALY_RESULTS_INDEX_MAPPING_FILE = "mappings/anomaly-results.json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -55,6 +56,9 @@ public class ADStateManager {
private static final Logger LOG = LogManager.getLogger(ADStateManager.class);
private ConcurrentHashMap<String, Entry<AnomalyDetector, Instant>> currentDetectors;
private ConcurrentHashMap<String, Entry<Integer, Instant>> partitionNumber;
// negativeCache is used to reject search query if given detector already has one query running
// key is detectorId, value is an entry. Key is QueryBuilder and value is the timestamp
private ConcurrentHashMap<String, Entry<SearchRequest, Instant>> negativeCache;
private Client client;
private Random random;
private ModelManager modelManager;
Expand Down Expand Up @@ -83,6 +87,7 @@ public ADStateManager(
this.partitionNumber = new ConcurrentHashMap<>();
this.clientUtil = clientUtil;
this.backpressureMuter = new ConcurrentHashMap<>();
this.negativeCache = new ConcurrentHashMap<>();
this.clock = clock;
this.settings = settings;
this.stateTtl = stateTtl;
Expand Down Expand Up @@ -119,6 +124,47 @@ public int getPartitionNumber(String adID) throws InterruptedException {
return partitionNum;
}

/**
* Get negative cache value(QueryBuilder, Instant) for given detector
* If detectorId is null, return Optional.empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. This can be assumed to be unlikely. If this is a real concern that must be addressed, the proper behavior to expect is to throw an exception.

* @param detector AnomalyDetector
* @return negative cache value(QueryBuilder, Instant)
*/
public Optional<Entry<SearchRequest, Instant>> getFilteredQuery(AnomalyDetector detector) {
if (detector.getDetectorId() == null) {
return Optional.empty();
}
if (negativeCache.containsKey(detector.getDetectorId())) {
return Optional.of(negativeCache.get(detector.getDetectorId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. This method can be simplified to this line.

}
return Optional.empty();
}

/**
* Insert the negative cache entry for given detector
* If detectorId is null, do nothing
* @param detector AnomalyDetector
* @param searchRequest ES search request
*/
public void insertFilteredQuery(AnomalyDetector detector, SearchRequest searchRequest) {
if (detector.getDetectorId() == null) {
return;
}
negativeCache.putIfAbsent(detector.getDetectorId(), new SimpleEntry<>(searchRequest, clock.instant()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question. Is put more expected for the client? Or, the insert call returns but the entry is still not updated. If that's by design, the documentation should make it clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think put is better than putIfAbsent here. Will update.

}

/**
* Clear the negative cache for given detector.
* If detectorId is null, do nothing
* @param detector AnomalyDetector
*/
public void clearFilteredQuery(AnomalyDetector detector) {
if (detector.getDetectorId() == null) {
return;
}
negativeCache.keySet().removeIf(key -> key.equals(detector.getDetectorId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. Map::remove should work.

}

public Optional<AnomalyDetector> getAnomalyDetector(String adID) {
Entry<AnomalyDetector, Instant> detectorAndTime = currentDetectors.get(adID);
if (detectorAndTime != null) {
Expand Down
Loading