From 9aa9e4691f26f286e62e27e948c24a41404a7171 Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Thu, 30 Jan 2020 10:40:55 -0800 Subject: [PATCH 01/10] Revert "merge changes from alpha branch: change setting name and fix stop AD request" This reverts commit 363daa6af5bbf416311ed7860c489a2848584d05, reversing changes made to caebb97d91a87369aa33f9fc90d4b87eba12eb76. This also reverts commit ff190fa038a71564adbf9aa86ca406123096c026 --- .../rest/RestDeleteAnomalyDetectorAction.java | 18 +++++------ .../ad/settings/AnomalyDetectorSettings.java | 30 +++++++++---------- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java index c6a08a06..7f7558c1 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java @@ -17,15 +17,15 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorActionHandler; -import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction; -import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorRequest; -import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorResponse; +import com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorAction; +import com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorRequest; import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -89,8 +89,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli return channel -> { if (channel.request().method() == RestRequest.Method.POST) { logger.info("Stop anomaly detector {}", detectorId); - StopDetectorRequest stopDetectorRequest = new StopDetectorRequest(detectorId); - client.execute(StopDetectorAction.INSTANCE, stopDetectorRequest, stopAdDetectorListener(channel, detectorId)); + DeleteDetectorRequest deleteDetectorRequest = new DeleteDetectorRequest().adID(detectorId); + client.execute(DeleteDetectorAction.INSTANCE, deleteDetectorRequest, stopAdDetectorListener(channel, detectorId)); } else if (channel.request().method() == RestRequest.Method.DELETE) { logger.info("Delete anomaly detector {}", detectorId); handler @@ -117,11 +117,11 @@ private void deleteAnomalyDetectorDoc( client.delete(deleteRequest, new RestStatusToXContentListener<>(channel)); } - private ActionListener stopAdDetectorListener(RestChannel channel, String detectorId) { - return new ActionListener() { + private ActionListener stopAdDetectorListener(RestChannel channel, String detectorId) { + return new ActionListener() { @Override - public void onResponse(StopDetectorResponse stopDetectorResponse) { - if (stopDetectorResponse.success()) { + public void onResponse(AcknowledgedResponse deleteDetectorResponse) { + if (deleteDetectorResponse.isAcknowledged()) { logger.info("AD model deleted successfully for detector {}", detectorId); channel.sendResponse(new BytesRestResponse(RestStatus.OK, "AD model deleted successfully")); } else { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java index 314d1a98..1c669888 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java @@ -29,14 +29,14 @@ public final class AnomalyDetectorSettings { private AnomalyDetectorSettings() {} public static final Setting MAX_ANOMALY_DETECTORS = Setting - .intSetting("opendistro.anomaly_detection.max_anomaly_detectors", 1000, Setting.Property.NodeScope, Setting.Property.Dynamic); + .intSetting("ml.anomaly_detectors.max_anomaly_detectors", 1000, Setting.Property.NodeScope, Setting.Property.Dynamic); public static final Setting MAX_ANOMALY_FEATURES = Setting - .intSetting("opendistro.anomaly_detection.max_anomaly_features", 5, Setting.Property.NodeScope, Setting.Property.Dynamic); + .intSetting("ml.anomaly_detectors.max_anomaly_features", 5, Setting.Property.NodeScope, Setting.Property.Dynamic); public static final Setting REQUEST_TIMEOUT = Setting .positiveTimeSetting( - "opendistro.anomaly_detection.request_timeout", + "ml.anomaly_detectors.request_timeout", TimeValue.timeValueSeconds(10), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -44,7 +44,7 @@ private AnomalyDetectorSettings() {} public static final Setting DETECTION_INTERVAL = Setting .positiveTimeSetting( - "opendistro.anomaly_detection.detection_interval", + "ml.anomaly_detectors.detection_interval", TimeValue.timeValueMinutes(10), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -52,7 +52,7 @@ private AnomalyDetectorSettings() {} public static final Setting DETECTION_WINDOW_DELAY = Setting .timeSetting( - "opendistro.anomaly_detection.detection_window_delay", + "ml.anomaly_detectors.detection_window_delay", TimeValue.timeValueMinutes(0), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -60,7 +60,7 @@ private AnomalyDetectorSettings() {} public static final Setting AD_RESULT_ROLLOVER_PERIOD = Setting .positiveTimeSetting( - "opendistro.anomaly_detection.ad_result_rollover_period", + "ml.anomaly_detectors.ad_result_rollover_period", TimeValue.timeValueHours(12), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -68,7 +68,7 @@ private AnomalyDetectorSettings() {} public static final Setting AD_RESULT_HISTORY_ROLLOVER_PERIOD = Setting .positiveTimeSetting( - "opendistro.anomaly_detection.ad_result_history_rollover_period", + "ml.anomaly_detectors.ad_result_history_rollover_period", TimeValue.timeValueHours(12), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -76,23 +76,21 @@ private AnomalyDetectorSettings() {} public static final Setting AD_RESULT_HISTORY_INDEX_MAX_AGE = Setting .positiveTimeSetting( - "opendistro.anomaly_detection.ad_result_history_max_age", + "ml.anomaly_detectors.ad_result_history_max_age", TimeValue.timeValueHours(24), Setting.Property.NodeScope, Setting.Property.Dynamic ); public static final Setting AD_RESULT_HISTORY_MAX_DOCS = Setting - .longSetting("opendistro.anomaly_detection.ad_result_history_max_docs", 10000L, 0L, - Setting.Property.NodeScope, Setting.Property.Dynamic); + .longSetting("ml.anomaly_detectors.ad_result_history_max_docs", 10000L, 0L, Setting.Property.NodeScope, Setting.Property.Dynamic); public static final Setting MAX_RETRY_FOR_UNRESPONSIVE_NODE = Setting - .intSetting("opendistro.anomaly_detection.max_retry_for_unresponsive_node", 5, 0, - Setting.Property.NodeScope, Setting.Property.Dynamic); + .intSetting("ml.anomaly_detectors.max_retry_for_unresponsive_node", 5, 0, Setting.Property.NodeScope, Setting.Property.Dynamic); public static final Setting COOLDOWN_MINUTES = Setting .positiveTimeSetting( - "opendistro.anomaly_detection.cooldown_minutes", + "ml.anomaly_detectors.cooldown_minutes", TimeValue.timeValueMinutes(5), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -100,7 +98,7 @@ private AnomalyDetectorSettings() {} public static final Setting BACKOFF_MINUTES = Setting .positiveTimeSetting( - "opendistro.anomaly_detection.backoff_minutes", + "ml.anomaly_detectors.backoff_minutes", TimeValue.timeValueMinutes(15), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -108,14 +106,14 @@ private AnomalyDetectorSettings() {} public static final Setting BACKOFF_INITIAL_DELAY = Setting .positiveTimeSetting( - "opendistro.anomaly_detection.backoff_initial_delay", + "ml.anomaly_detectors.backoff_initial_delay", TimeValue.timeValueMillis(1000), Setting.Property.NodeScope, Setting.Property.Dynamic ); public static final Setting MAX_RETRY_FOR_BACKOFF = Setting - .intSetting("opendistro.anomaly_detection.max_retry_for_backoff", 3, 0, Setting.Property.NodeScope, Setting.Property.Dynamic); + .intSetting("ml.anomaly_detectors.max_retry_for_backoff", 3, 0, Setting.Property.NodeScope, Setting.Property.Dynamic); public static final String ANOMALY_DETECTORS_INDEX_MAPPING_FILE = "mappings/anomaly-detectors.json"; public static final String ANOMALY_RESULTS_INDEX_MAPPING_FILE = "mappings/anomaly-results.json"; From bc6a763172f7e1162125f35b445b81b9916204b8 Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Thu, 30 Jan 2020 15:14:57 -0800 Subject: [PATCH 02/10] Adding negative cache to AD https://github.com/opendistro-for-elasticsearch/anomaly-detection/issues/33 --- .../ad/cluster/HourlyCron.java | 2 + .../ad/feature/FeatureManager.java | 13 +++-- .../ad/feature/SearchFeatureDao.java | 24 +++++--- .../ad/transport/ADStateManager.java | 46 ++++++++++++++++ .../AnomalyResultTransportAction.java | 12 +++- .../ad/util/ClientUtil.java | 33 +++++++++++ .../ad/feature/FeatureManagerTests.java | 41 ++++++++------ .../ad/feature/SearchFeatureDaoTests.java | 28 ++++++++-- .../ad/transport/ADStateManagerTests.java | 14 +++++ .../ad/transport/AnomalyResultTests.java | 55 ++++++++++++++++--- 10 files changed, 225 insertions(+), 43 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java index aff9c50b..16055aeb 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java @@ -43,6 +43,8 @@ public HourlyCron(ClusterService clusterService, Client client) { public void run() { DiscoveryNode[] dataNodes = clusterService.state().nodes().getDataNodes().values().toArray(DiscoveryNode.class); + // we also add the cancel query function here based on query text from the negative cache. + CronRequest modelDeleteRequest = new CronRequest(dataNodes); client.execute(CronAction.INSTANCE, modelDeleteRequest, ActionListener.wrap(response -> { if (response.hasFailures()) { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java index e3bc5dcb..5199a889 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java @@ -35,6 +35,7 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; +import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; @@ -115,15 +116,16 @@ public FeatureManager( * @param detector anomaly detector for which the features are returned * @param startTime start time of the data point in epoch milliseconds * @param endTime end time of the data point in epoch milliseconds + * @param stateManager ADStateManager * @return unprocessed features and processed features for the current data point */ @Deprecated - public SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime) { + public SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime, ADStateManager stateManager) { double[][] currentPoints = null; Deque> shingle = detectorIdsToTimeShingles .computeIfAbsent(detector.getDetectorId(), id -> new ArrayDeque>(shingleSize)); if (shingle.isEmpty() || shingle.getLast().getKey() < endTime) { - Optional point = searchFeatureDao.getFeaturesForPeriod(detector, startTime, endTime); + Optional point = searchFeatureDao.getFeaturesForPeriod(detector, startTime, endTime, stateManager); if (point.isPresent()) { if (shingle.size() == shingleSize) { shingle.remove(); @@ -174,13 +176,16 @@ private double[][] filterAndFill(Deque> shingle, long endT * in dimension via shingling. * * @param detector contains data info (indices, documents, etc) + * @param stateManager ADStateManager * @return data for cold-start training, or empty if unavailable */ @Deprecated - public Optional getColdStartData(AnomalyDetector detector) { + public Optional getColdStartData(AnomalyDetector detector, ADStateManager stateManager) { return searchFeatureDao .getLatestDataTime(detector) - .flatMap(latest -> searchFeatureDao.getFeaturesForSampledPeriods(detector, maxTrainSamples, maxSampleStride, latest)) + .flatMap( + latest -> searchFeatureDao.getFeaturesForSampledPeriods(detector, maxTrainSamples, maxSampleStride, latest, stateManager) + ) .map( samples -> transpose( interpolator.interpolate(transpose(samples.getKey()), samples.getValue() * (samples.getKey().length - 1) + 1) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java index 1b8818ce..0e7cb657 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java @@ -30,6 +30,7 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; +import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils; import org.apache.logging.log4j.LogManager; @@ -114,18 +115,24 @@ public Optional getLatestDataTime(AnomalyDetector detector) { } /** - * Gets features for the given time period. + * Gets features for the given time period. This function also add given detector to negative cache before sending es request. + * Once we get response/exception within timeout, we treat this request as complete and clear the negative cache. + * Otherwise this detector entry remain in the negative to reject further request. * * @param detector info about indices, documents, feature query * @param startTime epoch milliseconds at the beginning of the period * @param endTime epoch milliseconds at the end of the period + * @param stateManager ADStateManager * @throws IllegalStateException when unexpected failures happen * @return features from search results, empty when no data found */ - public Optional getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime) { + public Optional getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime, ADStateManager stateManager) { SearchRequest searchRequest = createFeatureSearchRequest(detector, startTime, endTime, Optional.empty()); + // add (detectorId, filteredQuery) to negative cache + stateManager.insertFilteredQuery(detector, searchRequest); + // send throttled request: this request will clear the negative cache if the request finished within timeout return clientUtil - .timedRequest(searchRequest, logger, client::search) + .throttledTimedRequest(searchRequest, logger, client::search, stateManager, detector) .flatMap(resp -> parseResponse(resp, detector.getEnabledFeatureIds())); } @@ -242,20 +249,22 @@ public void getFeatureSamplesForPeriods( * @param maxSamples the maximum number of samples to return * @param maxStride the maximum number of periods between samples * @param endTime the end time of the latest period + * @param stateManager ADStateManager * @return sampled features and stride, empty when no data found */ public Optional> getFeaturesForSampledPeriods( AnomalyDetector detector, int maxSamples, int maxStride, - long endTime + long endTime, + ADStateManager stateManager ) { Map cache = new HashMap<>(); int currentStride = maxStride; Optional features = Optional.empty(); while (currentStride >= 1) { boolean isInterpolatable = currentStride < maxStride; - features = getFeaturesForSampledPeriods(detector, maxSamples, currentStride, endTime, cache, isInterpolatable); + features = getFeaturesForSampledPeriods(detector, maxSamples, currentStride, endTime, cache, isInterpolatable, stateManager); if (!features.isPresent() || features.get().length > maxSamples / 2 || currentStride == 1) { break; } else { @@ -275,7 +284,8 @@ private Optional getFeaturesForSampledPeriods( int stride, long endTime, Map cache, - boolean isInterpolatable + boolean isInterpolatable, + ADStateManager stateManager ) { ArrayDeque sampledFeatures = new ArrayDeque<>(maxSamples); for (int i = 0; i < maxSamples; i++) { @@ -284,7 +294,7 @@ private Optional getFeaturesForSampledPeriods( if (cache.containsKey(end)) { sampledFeatures.addFirst(cache.get(end)); } else { - Optional features = getFeaturesForPeriod(detector, end - span, end); + Optional features = getFeaturesForPeriod(detector, end - span, end, stateManager); if (features.isPresent()) { cache.put(end, features.get()); sampledFeatures.addFirst(features.get()); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java index 76246e62..cdd86fdb 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -55,6 +56,9 @@ public class ADStateManager { private static final Logger LOG = LogManager.getLogger(ADStateManager.class); private ConcurrentHashMap> currentDetectors; private ConcurrentHashMap> partitionNumber; + // negativeCache is used to reject search query if given detector already has one query running + // key is detectorId, value is an entry. Key is QueryBuilder and value is the timestamp + private ConcurrentHashMap> negativeCache; private Client client; private Random random; private ModelManager modelManager; @@ -83,6 +87,7 @@ public ADStateManager( this.partitionNumber = new ConcurrentHashMap<>(); this.clientUtil = clientUtil; this.backpressureMuter = new ConcurrentHashMap<>(); + this.negativeCache = new ConcurrentHashMap<>(); this.clock = clock; this.settings = settings; this.stateTtl = stateTtl; @@ -119,6 +124,47 @@ public int getPartitionNumber(String adID) throws InterruptedException { return partitionNum; } + /** + * Get negative cache value(QueryBuilder, Instant) for given detector + * If detectorId is null, return Optional.empty() + * @param detector AnomalyDetector + * @return negative cache value(QueryBuilder, Instant) + */ + public Optional> getFilteredQuery(AnomalyDetector detector) { + if (detector.getDetectorId() == null) { + return Optional.empty(); + } + if (negativeCache.containsKey(detector.getDetectorId())) { + return Optional.of(negativeCache.get(detector.getDetectorId())); + } + return Optional.empty(); + } + + /** + * Insert the negative cache entry for given detector + * If detectorId is null, do nothing + * @param detector AnomalyDetector + * @param searchRequest ES search request + */ + public void insertFilteredQuery(AnomalyDetector detector, SearchRequest searchRequest) { + if (detector.getDetectorId() == null) { + return; + } + negativeCache.putIfAbsent(detector.getDetectorId(), new SimpleEntry<>(searchRequest, clock.instant())); + } + + /** + * Clear the negative cache for given detector. + * If detectorId is null, do nothing + * @param detector AnomalyDetector + */ + public void clearFilteredQuery(AnomalyDetector detector) { + if (detector.getDetectorId() == null) { + return; + } + negativeCache.keySet().removeIf(key -> key.equals(detector.getDetectorId())); + } + public Optional getAnomalyDetector(String adID) { Entry detectorAndTime = currentDetectors.get(adID); if (detectorAndTime != null) { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java index e0727764..2ac6ad3b 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -67,6 +68,7 @@ import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; @@ -249,6 +251,12 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< return; } AnomalyDetector anomalyDetector = detector.get(); + Optional> queryEntry = stateManager.getFilteredQuery(anomalyDetector); + if (queryEntry.isPresent()) { + LOG.info("There is one query running for detectorId: {}", anomalyDetector.getDetectorId()); + listener.onResponse(new AnomalyResultResponse(Double.NaN, Double.NaN, new ArrayList())); + return; + } String thresholdModelID = modelManager.getThresholdModelId(adID); Optional thresholdNode = hashRing.getOwningNode(thresholdModelID); @@ -270,7 +278,7 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< long startTime = request.getStart() - delayMillis; long endTime = request.getEnd() - delayMillis; - SinglePointFeatures featureOptional = featureManager.getCurrentFeatures(anomalyDetector, startTime, endTime); + SinglePointFeatures featureOptional = featureManager.getCurrentFeatures(anomalyDetector, startTime, endTime, stateManager); List featureInResponse = null; @@ -811,7 +819,7 @@ class ColdStartJob implements Callable { @Override public Boolean call() { try { - Optional traingData = featureManager.getColdStartData(detector); + Optional traingData = featureManager.getColdStartData(detector, stateManager); if (traingData.isPresent()) { modelManager.trainModel(detector, traingData.get()); return true; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index 37431226..f5c70c69 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -24,6 +24,8 @@ import java.util.function.BiConsumer; import java.util.function.Function; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.Action; @@ -152,4 +154,35 @@ public Response ) { return function.apply(request).actionGet(requestTimeout); } + + public Optional throttledTimedRequest( + Request request, + Logger LOG, + BiConsumer> consumer, + ADStateManager stateManager, + AnomalyDetector detector + ) { + try { + AtomicReference respReference = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + + consumer.accept(request, new LatchedActionListener(ActionListener.wrap(response -> { + // clear negative cache + stateManager.clearFilteredQuery(detector); + respReference.set(response); + }, exception -> { + // clear negative cache + stateManager.clearFilteredQuery(detector); + LOG.error("Cannot get response for request {}, error: {}", request, exception); + }), latch)); + + if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) { + throw new ElasticsearchTimeoutException("Cannot get response within time limit: " + request.toString()); + } + return Optional.ofNullable(respReference.get()); + } catch (InterruptedException e1) { + LOG.error(CommonErrorMessages.WAIT_ERR_MSG); + throw new IllegalStateException(e1); + } + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java index c47671e2..1a5b8a23 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java @@ -30,6 +30,7 @@ import com.amazon.opendistroforelasticsearch.ad.dataprocessor.SingleFeatureLinearUniformInterpolator; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; +import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager; import junitparams.JUnitParamsRunner; import junitparams.Parameters; @@ -87,6 +88,9 @@ public class FeatureManagerTests { @Mock private Clock clock; + @Mock + private ADStateManager stateManager; + private FeatureManager featureManager; @Before @@ -200,7 +204,7 @@ public void getCurrentFeatures_returnExpected( ) { for (int i = 0; i < allRanges.length; i++) { - when(searchFeatureDao.getFeaturesForPeriod(detector, allRanges[i][0], allRanges[i][1])) + when(searchFeatureDao.getFeaturesForPeriod(detector, allRanges[i][0], allRanges[i][1], stateManager)) .thenReturn(Optional.ofNullable(allPoints[i])); } this.featureManager = spy( @@ -218,10 +222,10 @@ public void getCurrentFeatures_returnExpected( ) ); for (int i = 0; i < previousRanges.length; i++) { - featureManager.getCurrentFeatures(detector, previousRanges[i][0], previousRanges[i][1]); + featureManager.getCurrentFeatures(detector, previousRanges[i][0], previousRanges[i][1], stateManager); } - SinglePointFeatures result = featureManager.getCurrentFeatures(detector, currentRange[0], currentRange[1]); + SinglePointFeatures result = featureManager.getCurrentFeatures(detector, currentRange[0], currentRange[1], stateManager); assertTrue(Arrays.equals(expected.getUnprocessedFeatures().orElse(null), result.getUnprocessedFeatures().orElse(null))); assertTrue(Arrays.equals(expected.getProcessedFeatures().orElse(null), result.getProcessedFeatures().orElse(null))); @@ -241,7 +245,7 @@ private Object[] getColdStartDataTestData() { public void getColdStartData_returnExpected(Long latestTime, Entry data, int interpolants, double[][] expected) { when(searchFeatureDao.getLatestDataTime(detector)).thenReturn(ofNullable(latestTime)); if (latestTime != null) { - when(searchFeatureDao.getFeaturesForSampledPeriods(detector, maxTrainSamples, maxSampleStride, latestTime)) + when(searchFeatureDao.getFeaturesForSampledPeriods(detector, maxTrainSamples, maxSampleStride, latestTime, stateManager)) .thenReturn(ofNullable(data)); } if (data != null) { @@ -249,7 +253,7 @@ public void getColdStartData_returnExpected(Long latestTime, Entry(data.getKey())), eq(shingleSize)); } - Optional results = featureManager.getColdStartData(detector); + Optional results = featureManager.getColdStartData(detector, stateManager); assertTrue(Arrays.deepEquals(expected, results.orElse(null))); } @@ -304,16 +308,17 @@ public void clear_deleteFeatures() { for (int i = 1; i <= shingleSize; i++) { start = i * 10; end = (i + 1) * 10; - when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i })); - featureManager.getCurrentFeatures(detector, start, end); + + when(searchFeatureDao.getFeaturesForPeriod(detector, start, end, stateManager)).thenReturn(Optional.of(new double[] { i })); + featureManager.getCurrentFeatures(detector, start, end, stateManager); } - SinglePointFeatures beforeMaintenance = featureManager.getCurrentFeatures(detector, start, end); + SinglePointFeatures beforeMaintenance = featureManager.getCurrentFeatures(detector, start, end, stateManager); assertTrue(beforeMaintenance.getUnprocessedFeatures().isPresent()); assertTrue(beforeMaintenance.getProcessedFeatures().isPresent()); featureManager.clear(detector.getDetectorId()); - SinglePointFeatures afterMaintenance = featureManager.getCurrentFeatures(detector, start, end); + SinglePointFeatures afterMaintenance = featureManager.getCurrentFeatures(detector, start, end, stateManager); assertTrue(afterMaintenance.getUnprocessedFeatures().isPresent()); assertFalse(afterMaintenance.getProcessedFeatures().isPresent()); } @@ -325,17 +330,18 @@ public void maintenance_removeStaleData() { for (int i = 1; i <= shingleSize; i++) { start = i * 10; end = (i + 1) * 10; - when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i })); - featureManager.getCurrentFeatures(detector, start, end); + + when(searchFeatureDao.getFeaturesForPeriod(detector, start, end, stateManager)).thenReturn(Optional.of(new double[] { i })); + featureManager.getCurrentFeatures(detector, start, end, stateManager); } - SinglePointFeatures beforeMaintenance = featureManager.getCurrentFeatures(detector, start, end); + SinglePointFeatures beforeMaintenance = featureManager.getCurrentFeatures(detector, start, end, stateManager); assertTrue(beforeMaintenance.getUnprocessedFeatures().isPresent()); assertTrue(beforeMaintenance.getProcessedFeatures().isPresent()); when(clock.instant()).thenReturn(Instant.ofEpochMilli(end + 1).plus(featureBufferTtl)); featureManager.maintenance(); - SinglePointFeatures afterMaintenance = featureManager.getCurrentFeatures(detector, start, end); + SinglePointFeatures afterMaintenance = featureManager.getCurrentFeatures(detector, start, end, stateManager); assertTrue(afterMaintenance.getUnprocessedFeatures().isPresent()); assertFalse(afterMaintenance.getProcessedFeatures().isPresent()); } @@ -347,17 +353,18 @@ public void maintenance_keepRecentData() { for (int i = 1; i <= shingleSize; i++) { start = i * 10; end = (i + 1) * 10; - when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i })); - featureManager.getCurrentFeatures(detector, start, end); + + when(searchFeatureDao.getFeaturesForPeriod(detector, start, end, stateManager)).thenReturn(Optional.of(new double[] { i })); + featureManager.getCurrentFeatures(detector, start, end, stateManager); } - SinglePointFeatures beforeMaintenance = featureManager.getCurrentFeatures(detector, start, end); + SinglePointFeatures beforeMaintenance = featureManager.getCurrentFeatures(detector, start, end, stateManager); assertTrue(beforeMaintenance.getUnprocessedFeatures().isPresent()); assertTrue(beforeMaintenance.getProcessedFeatures().isPresent()); when(clock.instant()).thenReturn(Instant.ofEpochMilli(end)); featureManager.maintenance(); - SinglePointFeatures afterMaintenance = featureManager.getCurrentFeatures(detector, start, end); + SinglePointFeatures afterMaintenance = featureManager.getCurrentFeatures(detector, start, end, stateManager); assertTrue(afterMaintenance.getUnprocessedFeatures().isPresent()); assertTrue(afterMaintenance.getProcessedFeatures().isPresent()); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java index f2fc0928..2aeead08 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java @@ -32,6 +32,7 @@ import com.amazon.opendistroforelasticsearch.ad.dataprocessor.SingleFeatureLinearUniformInterpolator; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; +import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils; import junitparams.JUnitParamsRunner; @@ -84,6 +85,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.anyLong; @@ -127,6 +129,8 @@ public class SearchFeatureDaoTests { private Aggregations aggs; @Mock private Max max; + @Mock + private ADStateManager stateManager; @Mock private AnomalyDetector detector; @@ -171,6 +175,16 @@ public void setup() throws Exception { .timedRequest(eq(searchRequest), anyObject(), Matchers.>>anyObject()); when(searchResponse.getAggregations()).thenReturn(aggregations); + doReturn(Optional.of(searchResponse)) + .when(clientUtil) + .throttledTimedRequest( + eq(searchRequest), + anyObject(), + Matchers.>>anyObject(), + anyObject(), + anyObject() + ); + multiSearchRequest = new MultiSearchRequest(); SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0])) .preference(SearchFeatureDao.FEATURE_SAMPLE_PREFERENCE); @@ -282,7 +296,7 @@ public void getFeaturesForPeriod_returnExpected_givenData(List aggs when(detector.getEnabledFeatureIds()).thenReturn(featureIds); // test - Optional result = searchFeatureDao.getFeaturesForPeriod(detector, start, end); + Optional result = searchFeatureDao.getFeaturesForPeriod(detector, start, end, stateManager); // verify assertTrue(Arrays.equals(expected, result.orElse(null))); @@ -367,7 +381,7 @@ public void test_getFeaturesForPeriod_returnEmpty_givenNoData() throws Exception when(searchResponse.getAggregations()).thenReturn(null); // test - Optional result = searchFeatureDao.getFeaturesForPeriod(detector, start, end); + Optional result = searchFeatureDao.getFeaturesForPeriod(detector, start, end, stateManager); // verify assertFalse(result.isPresent()); @@ -383,7 +397,7 @@ public void getFeaturesForPeriod_returnEmpty_givenNoHits() throws Exception { when(searchResponse.getHits()).thenReturn(new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 1f)); // test - Optional result = searchFeatureDao.getFeaturesForPeriod(detector, start, end); + Optional result = searchFeatureDao.getFeaturesForPeriod(detector, start, end, stateManager); // verify assertFalse(result.isPresent()); @@ -501,15 +515,17 @@ public void getFeaturesForSampledPeriods_returnExpected( Optional> expected ) { - doReturn(Optional.empty()).when(searchFeatureDao).getFeaturesForPeriod(eq(detector), anyLong(), anyLong()); + doReturn(Optional.empty()) + .when(searchFeatureDao) + .getFeaturesForPeriod(eq(detector), anyLong(), anyLong(), any(ADStateManager.class)); for (int i = 0; i < queryRanges.length; i++) { doReturn(Optional.of(queryResults[i])) .when(searchFeatureDao) - .getFeaturesForPeriod(detector, queryRanges[i][0], queryRanges[i][1]); + .getFeaturesForPeriod(detector, queryRanges[i][0], queryRanges[i][1], stateManager); } Optional> result = searchFeatureDao - .getFeaturesForSampledPeriods(detector, maxSamples, maxStride, endTime); + .getFeaturesForSampledPeriods(detector, maxSamples, maxStride, endTime, stateManager); assertEquals(expected.isPresent(), result.isPresent()); if (expected.isPresent()) { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java index 03267257..577cd74d 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java @@ -28,6 +28,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.Collections; +import java.util.Optional; import java.util.stream.IntStream; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; @@ -38,9 +39,11 @@ import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import com.google.common.collect.ImmutableMap; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.MapperService; @@ -202,4 +205,15 @@ public void testMaintenancRemove() throws IOException { assertEquals(0, states.size()); } + + public void testNegativeCache() throws IOException { + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of(), null); + SearchRequest dummySearchRequest = new SearchRequest(); + stateManager.insertFilteredQuery(detector, dummySearchRequest); + Optional> entry = stateManager.getFilteredQuery(detector); + assertTrue(entry.isPresent()); + stateManager.clearFilteredQuery(detector); + entry = stateManager.getFilteredQuery(detector); + assertFalse(entry.isPresent()); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java index e49973bc..0075a133 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java @@ -40,6 +40,8 @@ import static org.mockito.Mockito.verify; import java.io.IOException; +import java.time.Instant; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -88,6 +90,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; @@ -188,13 +191,14 @@ public void setUp() throws Exception { List userIndex = new ArrayList<>(); userIndex.add("test*"); when(detector.getIndices()).thenReturn(userIndex); + when(detector.getDetectorId()).thenReturn("testDetectorId"); when(stateManager.getAnomalyDetector(any(String.class))).thenReturn(Optional.of(detector)); hashRing = mock(HashRing.class); when(hashRing.getOwningNode(any(String.class))).thenReturn(Optional.of(clusterService.state().nodes().getLocalNode())); when(hashRing.build()).thenReturn(true); featureQuery = mock(FeatureManager.class); - when(featureQuery.getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong())) + when(featureQuery.getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong(), any(ADStateManager.class))) .thenReturn(new SinglePointFeatures(Optional.of(new double[] { 0.0d }), Optional.of(new double[] { 0 }))); normalModelManager = mock(ModelManager.class); when(normalModelManager.getThresholdingResult(any(String.class), any(String.class), anyDouble())) @@ -370,7 +374,8 @@ public Throwable noModelExceptionTemplate( when(rcfManager.getRcfModelId(any(String.class), anyInt())).thenReturn(rcfModelID); doNothing().when(normalModelManager).trainModel(any(AnomalyDetector.class), any(double[][].class)); - when(featureQuery.getColdStartData(any(AnomalyDetector.class))).thenReturn(Optional.of(new double[][] { { 0 } })); + when(featureQuery.getColdStartData(any(AnomalyDetector.class), any(ADStateManager.class))) + .thenReturn(Optional.of(new double[][] { { 0 } })); // These constructors register handler in transport service new RCFResultTransportAction(new ActionFilters(Collections.emptySet()), transportService, rcfManager, adCircuitBreakerService); @@ -723,6 +728,39 @@ public void testMute() { assertThat(exception.getMessage(), containsString(AnomalyResultTransportAction.NODE_UNRESPONSIVE_ERR_MSG)); } + public void testRejectRequestBasedOnNegativeCache() { + SearchRequest dummyRequest = new SearchRequest(); + Instant timestamp = Instant.now(); + Map.Entry dummyEntry = new AbstractMap.SimpleEntry<>(dummyRequest, timestamp); + when(stateManager.getFilteredQuery(detector)).thenReturn(Optional.of(dummyEntry)); + AnomalyResultTransportAction action = spy( + new AnomalyResultTransportAction( + new ActionFilters(Collections.emptySet()), + transportService, + client, + settings, + stateManager, + runner, + anomalyDetectionIndices, + featureQuery, + normalModelManager, + hashRing, + clusterService, + indexNameResolver, + threadPool, + adCircuitBreakerService, + adStats + ) + ); + AnomalyResultRequest request = new AnomalyResultRequest(adID, 100, 200); + PlainActionFuture listener = new PlainActionFuture<>(); + action.doExecute(null, request, listener); + AnomalyResultResponse response = listener.actionGet(); + assertEquals(Double.NaN, response.getAnomalyGrade(), 0.001); + assertEquals(Double.NaN, response.getConfidence(), 0.001); + assertThat(response.getFeatures(), is(empty())); + } + public void testRCFLatchAwaitException() throws InterruptedException { // These constructors register handler in transport service @@ -997,7 +1035,7 @@ public void testOnFailureNull() throws IOException { } public void testColdStartNoTrainingData() throws Exception { - when(featureQuery.getColdStartData(any(AnomalyDetector.class))).thenReturn(Optional.empty()); + when(featureQuery.getColdStartData(any(AnomalyDetector.class), any(ADStateManager.class))).thenReturn(Optional.empty()); AnomalyResultTransportAction action = new AnomalyResultTransportAction( new ActionFilters(Collections.emptySet()), @@ -1022,7 +1060,8 @@ public void testColdStartNoTrainingData() throws Exception { } public void testColdStartTimeoutPutCheckpoint() throws Exception { - when(featureQuery.getColdStartData(any(AnomalyDetector.class))).thenReturn(Optional.of(new double[][] { { 1.0 } })); + when(featureQuery.getColdStartData(any(AnomalyDetector.class), any(ADStateManager.class))) + .thenReturn(Optional.of(new double[][] { { 1.0 } })); doThrow(new ElasticsearchTimeoutException("")) .when(normalModelManager) .trainModel(any(AnomalyDetector.class), any(double[][].class)); @@ -1155,14 +1194,16 @@ enum FeatureTestMode { public void featureTestTemplate(FeatureTestMode mode) { if (mode == FeatureTestMode.FEATURE_NOT_AVAILABLE) { - when(featureQuery.getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong())) + when(featureQuery.getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong(), any(ADStateManager.class))) .thenReturn(new SinglePointFeatures(Optional.empty(), Optional.empty())); } else if (mode == FeatureTestMode.ILLEGAL_STATE) { - doThrow(IllegalArgumentException.class).when(featureQuery).getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong()); + doThrow(IllegalArgumentException.class) + .when(featureQuery) + .getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong(), any(ADStateManager.class)); } else if (mode == FeatureTestMode.AD_EXCEPTION) { doThrow(AnomalyDetectionException.class) .when(featureQuery) - .getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong()); + .getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong(), any(ADStateManager.class)); } AnomalyResultTransportAction action = new AnomalyResultTransportAction( From 3a82b22edde7528c1fcb0af681aa712b9cd9cd3e Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Wed, 5 Feb 2020 10:33:04 -0800 Subject: [PATCH 03/10] Address feedback from Yaliang and Lai --- .../ad/AnomalyDetectorPlugin.java | 6 +- .../ad/feature/FeatureManager.java | 13 ++-- .../ad/feature/SearchFeatureDao.java | 20 ++---- .../ad/transport/ADStateManager.java | 50 +++----------- .../AnomalyResultTransportAction.java | 11 ++- .../ad/util/ClientUtil.java | 41 +++++++++-- .../ad/util/Throttler.java | 68 +++++++++++++++++++ .../ad/feature/FeatureManagerTests.java | 34 +++++----- .../ad/feature/SearchFeatureDaoTests.java | 16 ++--- .../indices/AnomalyDetectionIndicesTests.java | 6 +- .../ad/transport/ADStateManagerTests.java | 17 +++-- .../ADStatsTransportActionTests.java | 6 +- .../ad/transport/AnomalyResultTests.java | 40 ++++------- .../ad/util/IndexUtilsTests.java | 8 ++- .../ad/util/ThrottlerTests.java | 55 +++++++++++++++ 15 files changed, 250 insertions(+), 141 deletions(-) create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 0f2b4457..db5481f0 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -68,6 +68,7 @@ import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction; import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction; import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils; +import com.amazon.opendistroforelasticsearch.ad.util.Throttler; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; @@ -200,7 +201,9 @@ public Collection createComponents( NamedWriteableRegistry namedWriteableRegistry ) { Settings settings = environment.settings(); - ClientUtil clientUtil = new ClientUtil(settings, client); + Clock clock = Clock.systemUTC(); + Throttler throttler = new Throttler(clock); + ClientUtil clientUtil = new ClientUtil(settings, client, throttler); IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService); anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil); this.clusterService = clusterService; @@ -213,7 +216,6 @@ public Collection createComponents( JvmService jvmService = new JvmService(environment.settings()); RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe(); CheckpointDao checkpoint = new CheckpointDao(client, clientUtil, CommonName.CHECKPOINT_INDEX_NAME); - Clock clock = Clock.systemUTC(); ModelManager modelManager = new ModelManager( clusterService, diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java index 5199a889..e3bc5dcb 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java @@ -35,7 +35,6 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; -import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; @@ -116,16 +115,15 @@ public FeatureManager( * @param detector anomaly detector for which the features are returned * @param startTime start time of the data point in epoch milliseconds * @param endTime end time of the data point in epoch milliseconds - * @param stateManager ADStateManager * @return unprocessed features and processed features for the current data point */ @Deprecated - public SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime, ADStateManager stateManager) { + public SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime) { double[][] currentPoints = null; Deque> shingle = detectorIdsToTimeShingles .computeIfAbsent(detector.getDetectorId(), id -> new ArrayDeque>(shingleSize)); if (shingle.isEmpty() || shingle.getLast().getKey() < endTime) { - Optional point = searchFeatureDao.getFeaturesForPeriod(detector, startTime, endTime, stateManager); + Optional point = searchFeatureDao.getFeaturesForPeriod(detector, startTime, endTime); if (point.isPresent()) { if (shingle.size() == shingleSize) { shingle.remove(); @@ -176,16 +174,13 @@ private double[][] filterAndFill(Deque> shingle, long endT * in dimension via shingling. * * @param detector contains data info (indices, documents, etc) - * @param stateManager ADStateManager * @return data for cold-start training, or empty if unavailable */ @Deprecated - public Optional getColdStartData(AnomalyDetector detector, ADStateManager stateManager) { + public Optional getColdStartData(AnomalyDetector detector) { return searchFeatureDao .getLatestDataTime(detector) - .flatMap( - latest -> searchFeatureDao.getFeaturesForSampledPeriods(detector, maxTrainSamples, maxSampleStride, latest, stateManager) - ) + .flatMap(latest -> searchFeatureDao.getFeaturesForSampledPeriods(detector, maxTrainSamples, maxSampleStride, latest)) .map( samples -> transpose( interpolator.interpolate(transpose(samples.getKey()), samples.getValue() * (samples.getKey().length - 1) + 1) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java index 0e7cb657..1018f9e2 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java @@ -30,7 +30,6 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; -import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils; import org.apache.logging.log4j.LogManager; @@ -122,17 +121,15 @@ public Optional getLatestDataTime(AnomalyDetector detector) { * @param detector info about indices, documents, feature query * @param startTime epoch milliseconds at the beginning of the period * @param endTime epoch milliseconds at the end of the period - * @param stateManager ADStateManager * @throws IllegalStateException when unexpected failures happen * @return features from search results, empty when no data found */ - public Optional getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime, ADStateManager stateManager) { + public Optional getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime) { SearchRequest searchRequest = createFeatureSearchRequest(detector, startTime, endTime, Optional.empty()); - // add (detectorId, filteredQuery) to negative cache - stateManager.insertFilteredQuery(detector, searchRequest); + // send throttled request: this request will clear the negative cache if the request finished within timeout return clientUtil - .throttledTimedRequest(searchRequest, logger, client::search, stateManager, detector) + .throttledTimedRequest(searchRequest, logger, client::search, detector) .flatMap(resp -> parseResponse(resp, detector.getEnabledFeatureIds())); } @@ -249,22 +246,20 @@ public void getFeatureSamplesForPeriods( * @param maxSamples the maximum number of samples to return * @param maxStride the maximum number of periods between samples * @param endTime the end time of the latest period - * @param stateManager ADStateManager * @return sampled features and stride, empty when no data found */ public Optional> getFeaturesForSampledPeriods( AnomalyDetector detector, int maxSamples, int maxStride, - long endTime, - ADStateManager stateManager + long endTime ) { Map cache = new HashMap<>(); int currentStride = maxStride; Optional features = Optional.empty(); while (currentStride >= 1) { boolean isInterpolatable = currentStride < maxStride; - features = getFeaturesForSampledPeriods(detector, maxSamples, currentStride, endTime, cache, isInterpolatable, stateManager); + features = getFeaturesForSampledPeriods(detector, maxSamples, currentStride, endTime, cache, isInterpolatable); if (!features.isPresent() || features.get().length > maxSamples / 2 || currentStride == 1) { break; } else { @@ -284,8 +279,7 @@ private Optional getFeaturesForSampledPeriods( int stride, long endTime, Map cache, - boolean isInterpolatable, - ADStateManager stateManager + boolean isInterpolatable ) { ArrayDeque sampledFeatures = new ArrayDeque<>(maxSamples); for (int i = 0; i < maxSamples; i++) { @@ -294,7 +288,7 @@ private Optional getFeaturesForSampledPeriods( if (cache.containsKey(end)) { sampledFeatures.addFirst(cache.get(end)); } else { - Optional features = getFeaturesForPeriod(detector, end - span, end, stateManager); + Optional features = getFeaturesForPeriod(detector, end - span, end); if (features.isPresent()) { cache.put(end, features.get()); sampledFeatures.addFirst(features.get()); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java index cdd86fdb..878ce86f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java @@ -124,47 +124,6 @@ public int getPartitionNumber(String adID) throws InterruptedException { return partitionNum; } - /** - * Get negative cache value(QueryBuilder, Instant) for given detector - * If detectorId is null, return Optional.empty() - * @param detector AnomalyDetector - * @return negative cache value(QueryBuilder, Instant) - */ - public Optional> getFilteredQuery(AnomalyDetector detector) { - if (detector.getDetectorId() == null) { - return Optional.empty(); - } - if (negativeCache.containsKey(detector.getDetectorId())) { - return Optional.of(negativeCache.get(detector.getDetectorId())); - } - return Optional.empty(); - } - - /** - * Insert the negative cache entry for given detector - * If detectorId is null, do nothing - * @param detector AnomalyDetector - * @param searchRequest ES search request - */ - public void insertFilteredQuery(AnomalyDetector detector, SearchRequest searchRequest) { - if (detector.getDetectorId() == null) { - return; - } - negativeCache.putIfAbsent(detector.getDetectorId(), new SimpleEntry<>(searchRequest, clock.instant())); - } - - /** - * Clear the negative cache for given detector. - * If detectorId is null, do nothing - * @param detector AnomalyDetector - */ - public void clearFilteredQuery(AnomalyDetector detector) { - if (detector.getDetectorId() == null) { - return; - } - negativeCache.keySet().removeIf(key -> key.equals(detector.getDetectorId())); - } - public Optional getAnomalyDetector(String adID) { Entry detectorAndTime = currentDetectors.get(adID); if (detectorAndTime != null) { @@ -255,4 +214,13 @@ public void addPressure(String nodeId) { public void resetBackpressureCounter(String nodeId) { backpressureMuter.remove(nodeId); } + + /** + * Check if there is running query on given detector + * @param detector Anomaly Detector + * @return boolean + */ + public boolean hasRunningQuery(AnomalyDetector detector) { + return clientUtil.hasRunningQuery(detector); + } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java index 2ac6ad3b..4dc68cd8 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java @@ -21,7 +21,6 @@ import java.util.Iterator; import java.util.List; import java.util.Locale; -import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -68,7 +67,6 @@ import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; @@ -251,10 +249,9 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< return; } AnomalyDetector anomalyDetector = detector.get(); - Optional> queryEntry = stateManager.getFilteredQuery(anomalyDetector); - if (queryEntry.isPresent()) { + if (stateManager.hasRunningQuery(anomalyDetector)) { LOG.info("There is one query running for detectorId: {}", anomalyDetector.getDetectorId()); - listener.onResponse(new AnomalyResultResponse(Double.NaN, Double.NaN, new ArrayList())); + listener.onFailure(new EndRunException(adID, "There is one query running on AnomalyDetector", true)); return; } @@ -278,7 +275,7 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< long startTime = request.getStart() - delayMillis; long endTime = request.getEnd() - delayMillis; - SinglePointFeatures featureOptional = featureManager.getCurrentFeatures(anomalyDetector, startTime, endTime, stateManager); + SinglePointFeatures featureOptional = featureManager.getCurrentFeatures(anomalyDetector, startTime, endTime); List featureInResponse = null; @@ -819,7 +816,7 @@ class ColdStartJob implements Callable { @Override public Boolean call() { try { - Optional traingData = featureManager.getColdStartData(detector, stateManager); + Optional traingData = featureManager.getColdStartData(detector); if (traingData.isPresent()) { modelManager.trainModel(detector, traingData.get()); return true; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index f5c70c69..8c4be244 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -17,6 +17,8 @@ import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT; +import java.time.Instant; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -25,7 +27,7 @@ import java.util.function.Function; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; -import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager; + import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.Action; @@ -44,11 +46,13 @@ public class ClientUtil { private volatile TimeValue requestTimeout; private Client client; + private final Throttler throttler; @Inject - public ClientUtil(Settings setting, Client client) { + public ClientUtil(Settings setting, Client client, Throttler throttler) { this.requestTimeout = REQUEST_TIMEOUT.get(setting); this.client = client; + this.throttler = throttler; } /** @@ -155,24 +159,38 @@ public Response return function.apply(request).actionGet(requestTimeout); } + /** + * Send a nonblocking request with a timeout and return response. The request will first be put into + * the negative cache. Once the request complete, it will be removed from the negative cache. + * + * @param request request like index/search/get + * @param LOG log + * @param consumer functional interface to operate as a client request like client::get + * @param ActionRequest + * @param ActionResponse + * @param detector Anomaly Detector + * @return the response + * @throws ElasticsearchTimeoutException when we cannot get response within time. + * @throws IllegalStateException when the waiting thread is interrupted + */ public Optional throttledTimedRequest( Request request, Logger LOG, BiConsumer> consumer, - ADStateManager stateManager, AnomalyDetector detector ) { try { + throttler.insertFilteredQuery(detector, request); AtomicReference respReference = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); consumer.accept(request, new LatchedActionListener(ActionListener.wrap(response -> { // clear negative cache - stateManager.clearFilteredQuery(detector); + throttler.clearFilteredQuery(detector); respReference.set(response); }, exception -> { // clear negative cache - stateManager.clearFilteredQuery(detector); + throttler.clearFilteredQuery(detector); LOG.error("Cannot get response for request {}, error: {}", request, exception); }), latch)); @@ -185,4 +203,17 @@ public Optional throw new IllegalStateException(e1); } } + + /** + * Check if there is running query on given detector + * @param detector Anomaly Detector + * @return boolean + */ + public boolean hasRunningQuery(AnomalyDetector detector) { + Optional> queryEntry = throttler.getFilteredQuery(detector); + if (queryEntry.isPresent()) { + return true; + } + return false; + } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java new file mode 100644 index 00000000..f03082b0 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java @@ -0,0 +1,68 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazon.opendistroforelasticsearch.ad.util; + +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; + +import java.time.Clock; +import java.time.Instant; +import java.util.AbstractMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import org.elasticsearch.action.ActionRequest; + +public class Throttler { + // negativeCache is used to reject search query if given detector already has one query running + // key is detectorId, value is an entry. Key is ActionRequest and value is the timestamp + private final ConcurrentHashMap> negativeCache; + private final Clock clock; + + public Throttler(Clock clock) { + this.negativeCache = new ConcurrentHashMap<>(); + this.clock = clock; + } + + /** + * Get negative cache value(ActionRequest, Instant) for given detector + * @param detector AnomalyDetector + * @return negative cache value(ActionRequest, Instant) + */ + public Optional> getFilteredQuery(AnomalyDetector detector) { + if (negativeCache.containsKey(detector.getDetectorId())) { + return Optional.of(negativeCache.get(detector.getDetectorId())); + } + return Optional.empty(); + } + + /** + * Insert the negative cache entry for given detector + * If detectorId is null, do nothing + * @param detector AnomalyDetector + * @param request ActionRequest + */ + public void insertFilteredQuery(AnomalyDetector detector, ActionRequest request) { + negativeCache.put(detector.getDetectorId(), new AbstractMap.SimpleEntry<>(request, clock.instant())); + } + + /** + * Clear the negative cache for given detector. + * If detectorId is null, do nothing + * @param detector AnomalyDetector + */ + public void clearFilteredQuery(AnomalyDetector detector) { + negativeCache.keySet().removeIf(key -> key.equals(detector.getDetectorId())); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java index 1a5b8a23..5e05c405 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java @@ -204,7 +204,7 @@ public void getCurrentFeatures_returnExpected( ) { for (int i = 0; i < allRanges.length; i++) { - when(searchFeatureDao.getFeaturesForPeriod(detector, allRanges[i][0], allRanges[i][1], stateManager)) + when(searchFeatureDao.getFeaturesForPeriod(detector, allRanges[i][0], allRanges[i][1])) .thenReturn(Optional.ofNullable(allPoints[i])); } this.featureManager = spy( @@ -222,10 +222,10 @@ public void getCurrentFeatures_returnExpected( ) ); for (int i = 0; i < previousRanges.length; i++) { - featureManager.getCurrentFeatures(detector, previousRanges[i][0], previousRanges[i][1], stateManager); + featureManager.getCurrentFeatures(detector, previousRanges[i][0], previousRanges[i][1]); } - SinglePointFeatures result = featureManager.getCurrentFeatures(detector, currentRange[0], currentRange[1], stateManager); + SinglePointFeatures result = featureManager.getCurrentFeatures(detector, currentRange[0], currentRange[1]); assertTrue(Arrays.equals(expected.getUnprocessedFeatures().orElse(null), result.getUnprocessedFeatures().orElse(null))); assertTrue(Arrays.equals(expected.getProcessedFeatures().orElse(null), result.getProcessedFeatures().orElse(null))); @@ -245,7 +245,7 @@ private Object[] getColdStartDataTestData() { public void getColdStartData_returnExpected(Long latestTime, Entry data, int interpolants, double[][] expected) { when(searchFeatureDao.getLatestDataTime(detector)).thenReturn(ofNullable(latestTime)); if (latestTime != null) { - when(searchFeatureDao.getFeaturesForSampledPeriods(detector, maxTrainSamples, maxSampleStride, latestTime, stateManager)) + when(searchFeatureDao.getFeaturesForSampledPeriods(detector, maxTrainSamples, maxSampleStride, latestTime)) .thenReturn(ofNullable(data)); } if (data != null) { @@ -253,7 +253,7 @@ public void getColdStartData_returnExpected(Long latestTime, Entry(data.getKey())), eq(shingleSize)); } - Optional results = featureManager.getColdStartData(detector, stateManager); + Optional results = featureManager.getColdStartData(detector); assertTrue(Arrays.deepEquals(expected, results.orElse(null))); } @@ -309,16 +309,16 @@ public void clear_deleteFeatures() { start = i * 10; end = (i + 1) * 10; - when(searchFeatureDao.getFeaturesForPeriod(detector, start, end, stateManager)).thenReturn(Optional.of(new double[] { i })); - featureManager.getCurrentFeatures(detector, start, end, stateManager); + when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i })); + featureManager.getCurrentFeatures(detector, start, end); } - SinglePointFeatures beforeMaintenance = featureManager.getCurrentFeatures(detector, start, end, stateManager); + SinglePointFeatures beforeMaintenance = featureManager.getCurrentFeatures(detector, start, end); assertTrue(beforeMaintenance.getUnprocessedFeatures().isPresent()); assertTrue(beforeMaintenance.getProcessedFeatures().isPresent()); featureManager.clear(detector.getDetectorId()); - SinglePointFeatures afterMaintenance = featureManager.getCurrentFeatures(detector, start, end, stateManager); + SinglePointFeatures afterMaintenance = featureManager.getCurrentFeatures(detector, start, end); assertTrue(afterMaintenance.getUnprocessedFeatures().isPresent()); assertFalse(afterMaintenance.getProcessedFeatures().isPresent()); } @@ -331,17 +331,17 @@ public void maintenance_removeStaleData() { start = i * 10; end = (i + 1) * 10; - when(searchFeatureDao.getFeaturesForPeriod(detector, start, end, stateManager)).thenReturn(Optional.of(new double[] { i })); - featureManager.getCurrentFeatures(detector, start, end, stateManager); + when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i })); + featureManager.getCurrentFeatures(detector, start, end); } - SinglePointFeatures beforeMaintenance = featureManager.getCurrentFeatures(detector, start, end, stateManager); + SinglePointFeatures beforeMaintenance = featureManager.getCurrentFeatures(detector, start, end); assertTrue(beforeMaintenance.getUnprocessedFeatures().isPresent()); assertTrue(beforeMaintenance.getProcessedFeatures().isPresent()); when(clock.instant()).thenReturn(Instant.ofEpochMilli(end + 1).plus(featureBufferTtl)); featureManager.maintenance(); - SinglePointFeatures afterMaintenance = featureManager.getCurrentFeatures(detector, start, end, stateManager); + SinglePointFeatures afterMaintenance = featureManager.getCurrentFeatures(detector, start, end); assertTrue(afterMaintenance.getUnprocessedFeatures().isPresent()); assertFalse(afterMaintenance.getProcessedFeatures().isPresent()); } @@ -354,17 +354,17 @@ public void maintenance_keepRecentData() { start = i * 10; end = (i + 1) * 10; - when(searchFeatureDao.getFeaturesForPeriod(detector, start, end, stateManager)).thenReturn(Optional.of(new double[] { i })); - featureManager.getCurrentFeatures(detector, start, end, stateManager); + when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i })); + featureManager.getCurrentFeatures(detector, start, end); } - SinglePointFeatures beforeMaintenance = featureManager.getCurrentFeatures(detector, start, end, stateManager); + SinglePointFeatures beforeMaintenance = featureManager.getCurrentFeatures(detector, start, end); assertTrue(beforeMaintenance.getUnprocessedFeatures().isPresent()); assertTrue(beforeMaintenance.getProcessedFeatures().isPresent()); when(clock.instant()).thenReturn(Instant.ofEpochMilli(end)); featureManager.maintenance(); - SinglePointFeatures afterMaintenance = featureManager.getCurrentFeatures(detector, start, end, stateManager); + SinglePointFeatures afterMaintenance = featureManager.getCurrentFeatures(detector, start, end); assertTrue(afterMaintenance.getUnprocessedFeatures().isPresent()); assertTrue(afterMaintenance.getProcessedFeatures().isPresent()); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java index 2aeead08..25ebdf7a 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java @@ -85,7 +85,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.anyLong; @@ -181,7 +180,6 @@ public void setup() throws Exception { eq(searchRequest), anyObject(), Matchers.>>anyObject(), - anyObject(), anyObject() ); @@ -296,7 +294,7 @@ public void getFeaturesForPeriod_returnExpected_givenData(List aggs when(detector.getEnabledFeatureIds()).thenReturn(featureIds); // test - Optional result = searchFeatureDao.getFeaturesForPeriod(detector, start, end, stateManager); + Optional result = searchFeatureDao.getFeaturesForPeriod(detector, start, end); // verify assertTrue(Arrays.equals(expected, result.orElse(null))); @@ -381,7 +379,7 @@ public void test_getFeaturesForPeriod_returnEmpty_givenNoData() throws Exception when(searchResponse.getAggregations()).thenReturn(null); // test - Optional result = searchFeatureDao.getFeaturesForPeriod(detector, start, end, stateManager); + Optional result = searchFeatureDao.getFeaturesForPeriod(detector, start, end); // verify assertFalse(result.isPresent()); @@ -397,7 +395,7 @@ public void getFeaturesForPeriod_returnEmpty_givenNoHits() throws Exception { when(searchResponse.getHits()).thenReturn(new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 1f)); // test - Optional result = searchFeatureDao.getFeaturesForPeriod(detector, start, end, stateManager); + Optional result = searchFeatureDao.getFeaturesForPeriod(detector, start, end); // verify assertFalse(result.isPresent()); @@ -515,17 +513,15 @@ public void getFeaturesForSampledPeriods_returnExpected( Optional> expected ) { - doReturn(Optional.empty()) - .when(searchFeatureDao) - .getFeaturesForPeriod(eq(detector), anyLong(), anyLong(), any(ADStateManager.class)); + doReturn(Optional.empty()).when(searchFeatureDao).getFeaturesForPeriod(eq(detector), anyLong(), anyLong()); for (int i = 0; i < queryRanges.length; i++) { doReturn(Optional.of(queryResults[i])) .when(searchFeatureDao) - .getFeaturesForPeriod(detector, queryRanges[i][0], queryRanges[i][1], stateManager); + .getFeaturesForPeriod(detector, queryRanges[i][0], queryRanges[i][1]); } Optional> result = searchFeatureDao - .getFeaturesForSampledPeriods(detector, maxSamples, maxStride, endTime, stateManager); + .getFeaturesForSampledPeriods(detector, maxSamples, maxStride, endTime); assertEquals(expected.isPresent(), result.isPresent()); if (expected.isPresent()) { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java index 1ddf0396..c84a1461 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java @@ -24,6 +24,7 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; +import com.amazon.opendistroforelasticsearch.ad.util.Throttler; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; @@ -39,6 +40,7 @@ import org.junit.Before; import java.io.IOException; +import java.time.Clock; import java.util.HashSet; import java.util.Set; @@ -70,7 +72,9 @@ public void setup() { clusterSetting = new ClusterSettings(settings, clusterSettings); clusterService = TestHelpers.createClusterService(client().threadPool(), clusterSetting); client = mock(Client.class); - requestUtil = new ClientUtil(settings, client); + Clock clock = Clock.systemUTC(); + Throttler throttler = new Throttler(clock); + requestUtil = new ClientUtil(settings, client, throttler); indices = new AnomalyDetectionIndices(client(), clusterService, client().threadPool(), settings, requestUtil); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java index 577cd74d..1a3fa0ef 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java @@ -28,7 +28,6 @@ import java.time.Instant; import java.util.Arrays; import java.util.Collections; -import java.util.Optional; import java.util.stream.IntStream; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; @@ -39,6 +38,7 @@ import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import com.amazon.opendistroforelasticsearch.ad.util.Throttler; import com.google.common.collect.ImmutableMap; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetRequest; @@ -67,6 +67,7 @@ public class ADStateManagerTests extends ESTestCase { private Client client; private Clock clock; private Duration duration; + private Throttler throttler; @Override protected NamedXContentRegistry xContentRegistry() { @@ -89,12 +90,13 @@ public void setUp() throws Exception { .build(); clock = mock(Clock.class); duration = Duration.ofHours(1); + throttler = new Throttler(clock); stateManager = new ADStateManager( client, xContentRegistry(), modelManager, settings, - new ClientUtil(settings, client), + new ClientUtil(settings, client, throttler), clock, duration ); @@ -206,14 +208,11 @@ public void testMaintenancRemove() throws IOException { } - public void testNegativeCache() throws IOException { + public void testHasRunningQuery() throws IOException { AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of(), null); SearchRequest dummySearchRequest = new SearchRequest(); - stateManager.insertFilteredQuery(detector, dummySearchRequest); - Optional> entry = stateManager.getFilteredQuery(detector); - assertTrue(entry.isPresent()); - stateManager.clearFilteredQuery(detector); - entry = stateManager.getFilteredQuery(detector); - assertFalse(entry.isPresent()); + assertFalse(stateManager.hasRunningQuery(detector)); + throttler.insertFilteredQuery(detector, dummySearchRequest); + assertTrue(stateManager.hasRunningQuery(detector)); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java index 0388da77..10be0c9d 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java @@ -23,6 +23,7 @@ import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.ModelsOnNodeSupplier; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils; +import com.amazon.opendistroforelasticsearch.ad.util.Throttler; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.Client; @@ -32,6 +33,7 @@ import org.junit.Before; import org.junit.Test; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -55,7 +57,9 @@ public void setUp() throws Exception { super.setUp(); Client client = client(); - IndexUtils indexUtils = new IndexUtils(client, new ClientUtil(Settings.EMPTY, client), clusterService()); + Clock clock = mock(Clock.class); + Throttler throttler = new Throttler(clock); + IndexUtils indexUtils = new IndexUtils(client, new ClientUtil(Settings.EMPTY, client, throttler), clusterService()); ModelManager modelManager = mock(ModelManager.class); clusterStatName1 = "clusterStat1"; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java index 0075a133..b961c516 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java @@ -40,8 +40,7 @@ import static org.mockito.Mockito.verify; import java.io.IOException; -import java.time.Instant; -import java.util.AbstractMap; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -83,6 +82,7 @@ import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.ColdStartRunner; import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils; +import com.amazon.opendistroforelasticsearch.ad.util.Throttler; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -90,7 +90,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; @@ -198,7 +197,7 @@ public void setUp() throws Exception { when(hashRing.getOwningNode(any(String.class))).thenReturn(Optional.of(clusterService.state().nodes().getLocalNode())); when(hashRing.build()).thenReturn(true); featureQuery = mock(FeatureManager.class); - when(featureQuery.getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong(), any(ADStateManager.class))) + when(featureQuery.getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong())) .thenReturn(new SinglePointFeatures(Optional.of(new double[] { 0.0d }), Optional.of(new double[] { 0 }))); normalModelManager = mock(ModelManager.class); when(normalModelManager.getThresholdingResult(any(String.class), any(String.class), anyDouble())) @@ -236,8 +235,9 @@ public void setUp() throws Exception { }).when(client).index(any(), any()); indexNameResolver = new IndexNameExpressionResolver(); - - ClientUtil clientUtil = new ClientUtil(Settings.EMPTY, client); + Clock clock = mock(Clock.class); + Throttler throttler = new Throttler(clock); + ClientUtil clientUtil = new ClientUtil(Settings.EMPTY, client, throttler); IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService); Map> statsMap = new HashMap>() { @@ -374,8 +374,7 @@ public Throwable noModelExceptionTemplate( when(rcfManager.getRcfModelId(any(String.class), anyInt())).thenReturn(rcfModelID); doNothing().when(normalModelManager).trainModel(any(AnomalyDetector.class), any(double[][].class)); - when(featureQuery.getColdStartData(any(AnomalyDetector.class), any(ADStateManager.class))) - .thenReturn(Optional.of(new double[][] { { 0 } })); + when(featureQuery.getColdStartData(any(AnomalyDetector.class))).thenReturn(Optional.of(new double[][] { { 0 } })); // These constructors register handler in transport service new RCFResultTransportAction(new ActionFilters(Collections.emptySet()), transportService, rcfManager, adCircuitBreakerService); @@ -729,10 +728,7 @@ public void testMute() { } public void testRejectRequestBasedOnNegativeCache() { - SearchRequest dummyRequest = new SearchRequest(); - Instant timestamp = Instant.now(); - Map.Entry dummyEntry = new AbstractMap.SimpleEntry<>(dummyRequest, timestamp); - when(stateManager.getFilteredQuery(detector)).thenReturn(Optional.of(dummyEntry)); + when(stateManager.hasRunningQuery(detector)).thenReturn(true); AnomalyResultTransportAction action = spy( new AnomalyResultTransportAction( new ActionFilters(Collections.emptySet()), @@ -755,10 +751,8 @@ public void testRejectRequestBasedOnNegativeCache() { AnomalyResultRequest request = new AnomalyResultRequest(adID, 100, 200); PlainActionFuture listener = new PlainActionFuture<>(); action.doExecute(null, request, listener); - AnomalyResultResponse response = listener.actionGet(); - assertEquals(Double.NaN, response.getAnomalyGrade(), 0.001); - assertEquals(Double.NaN, response.getConfidence(), 0.001); - assertThat(response.getFeatures(), is(empty())); + Throwable exception = assertException(listener, AnomalyDetectionException.class); + assertThat(exception.getMessage(), containsString("There is one query running on AnomalyDetector")); } public void testRCFLatchAwaitException() throws InterruptedException { @@ -1035,8 +1029,7 @@ public void testOnFailureNull() throws IOException { } public void testColdStartNoTrainingData() throws Exception { - when(featureQuery.getColdStartData(any(AnomalyDetector.class), any(ADStateManager.class))).thenReturn(Optional.empty()); - + when(featureQuery.getColdStartData(any(AnomalyDetector.class))).thenReturn(Optional.empty()); AnomalyResultTransportAction action = new AnomalyResultTransportAction( new ActionFilters(Collections.emptySet()), transportService, @@ -1060,8 +1053,7 @@ public void testColdStartNoTrainingData() throws Exception { } public void testColdStartTimeoutPutCheckpoint() throws Exception { - when(featureQuery.getColdStartData(any(AnomalyDetector.class), any(ADStateManager.class))) - .thenReturn(Optional.of(new double[][] { { 1.0 } })); + when(featureQuery.getColdStartData(any(AnomalyDetector.class))).thenReturn(Optional.of(new double[][] { { 1.0 } })); doThrow(new ElasticsearchTimeoutException("")) .when(normalModelManager) .trainModel(any(AnomalyDetector.class), any(double[][].class)); @@ -1194,16 +1186,14 @@ enum FeatureTestMode { public void featureTestTemplate(FeatureTestMode mode) { if (mode == FeatureTestMode.FEATURE_NOT_AVAILABLE) { - when(featureQuery.getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong(), any(ADStateManager.class))) + when(featureQuery.getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong())) .thenReturn(new SinglePointFeatures(Optional.empty(), Optional.empty())); } else if (mode == FeatureTestMode.ILLEGAL_STATE) { - doThrow(IllegalArgumentException.class) - .when(featureQuery) - .getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong(), any(ADStateManager.class)); + doThrow(IllegalArgumentException.class).when(featureQuery).getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong()); } else if (mode == FeatureTestMode.AD_EXCEPTION) { doThrow(AnomalyDetectionException.class) .when(featureQuery) - .getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong(), any(ADStateManager.class)); + .getCurrentFeatures(any(AnomalyDetector.class), anyLong(), anyLong()); } AnomalyResultTransportAction action = new AnomalyResultTransportAction( diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java index 896a1a8a..d3660076 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java @@ -22,6 +22,10 @@ import org.junit.Before; import org.junit.Test; +import java.time.Clock; + +import static org.mockito.Mockito.mock; + public class IndexUtilsTests extends ESIntegTestCase { private ClientUtil clientUtil; @@ -29,7 +33,9 @@ public class IndexUtilsTests extends ESIntegTestCase { @Before public void setup() { Client client = client(); - clientUtil = new ClientUtil(Settings.EMPTY, client); + Clock clock = mock(Clock.class); + Throttler throttler = new Throttler(clock); + clientUtil = new ClientUtil(Settings.EMPTY, client, throttler); } @Test diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java new file mode 100644 index 00000000..fb9fbe33 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.util; + +import com.amazon.opendistroforelasticsearch.ad.TestHelpers; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.time.Clock; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import static org.mockito.Mockito.mock; + +public class ThrottlerTests extends ESTestCase { + private Throttler throttler; + + @Before + public void setup() { + Clock clock = mock(Clock.class); + this.throttler = new Throttler(clock); + } + + @Test + public void test() throws IOException { + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of(), null); + SearchRequest dummySearchRequest = new SearchRequest(); + throttler.insertFilteredQuery(detector, dummySearchRequest); + Optional> entry = throttler.getFilteredQuery(detector); + assertTrue(entry.isPresent()); + throttler.clearFilteredQuery(detector); + entry = throttler.getFilteredQuery(detector); + assertFalse(entry.isPresent()); + return; + } +} From fe2a193db5522ef3696f2b394cb88d7f2f2a38b4 Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Wed, 5 Feb 2020 10:50:54 -0800 Subject: [PATCH 04/10] Remove unused data structure --- .../ad/transport/ADStateManager.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java index 878ce86f..fcc341f5 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java @@ -37,7 +37,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -56,9 +55,6 @@ public class ADStateManager { private static final Logger LOG = LogManager.getLogger(ADStateManager.class); private ConcurrentHashMap> currentDetectors; private ConcurrentHashMap> partitionNumber; - // negativeCache is used to reject search query if given detector already has one query running - // key is detectorId, value is an entry. Key is QueryBuilder and value is the timestamp - private ConcurrentHashMap> negativeCache; private Client client; private Random random; private ModelManager modelManager; @@ -87,7 +83,6 @@ public ADStateManager( this.partitionNumber = new ConcurrentHashMap<>(); this.clientUtil = clientUtil; this.backpressureMuter = new ConcurrentHashMap<>(); - this.negativeCache = new ConcurrentHashMap<>(); this.clock = clock; this.settings = settings; this.stateTtl = stateTtl; From 1bb92577a9c8570fc4f6dc02a1eafb00865f116e Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Tue, 18 Feb 2020 14:17:37 -0800 Subject: [PATCH 05/10] Adding missing java doc and simplify code. --- .../ad/feature/SearchFeatureDao.java | 3 ++- .../ad/transport/ADStateManager.java | 2 +- .../opendistroforelasticsearch/ad/util/ClientUtil.java | 8 ++------ .../opendistroforelasticsearch/ad/util/Throttler.java | 9 ++++----- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java index 1018f9e2..4bc5d891 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java @@ -114,7 +114,8 @@ public Optional getLatestDataTime(AnomalyDetector detector) { } /** - * Gets features for the given time period. This function also add given detector to negative cache before sending es request. + * Gets features for the given time period. + * This function also adds given detector to negative cache before sending es request. * Once we get response/exception within timeout, we treat this request as complete and clear the negative cache. * Otherwise this detector entry remain in the negative to reject further request. * diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java index fcc341f5..eb57123c 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java @@ -213,7 +213,7 @@ public void resetBackpressureCounter(String nodeId) { /** * Check if there is running query on given detector * @param detector Anomaly Detector - * @return boolean + * @return true if given detector has a running query else false */ public boolean hasRunningQuery(AnomalyDetector detector) { return clientUtil.hasRunningQuery(detector); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index 8c4be244..36ae5281 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -207,13 +207,9 @@ public Optional /** * Check if there is running query on given detector * @param detector Anomaly Detector - * @return boolean + * @return true if given detector has a running query else false */ public boolean hasRunningQuery(AnomalyDetector detector) { - Optional> queryEntry = throttler.getFilteredQuery(detector); - if (queryEntry.isPresent()) { - return true; - } - return false; + return throttler.getFilteredQuery(detector).isPresent(); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java index f03082b0..54e66160 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java @@ -24,6 +24,9 @@ import java.util.concurrent.ConcurrentHashMap; import org.elasticsearch.action.ActionRequest; +/** + * Utility functions for throttling query. + */ public class Throttler { // negativeCache is used to reject search query if given detector already has one query running // key is detectorId, value is an entry. Key is ActionRequest and value is the timestamp @@ -41,15 +44,11 @@ public Throttler(Clock clock) { * @return negative cache value(ActionRequest, Instant) */ public Optional> getFilteredQuery(AnomalyDetector detector) { - if (negativeCache.containsKey(detector.getDetectorId())) { - return Optional.of(negativeCache.get(detector.getDetectorId())); - } - return Optional.empty(); + return Optional.of(negativeCache.get(detector.getDetectorId())); } /** * Insert the negative cache entry for given detector - * If detectorId is null, do nothing * @param detector AnomalyDetector * @param request ActionRequest */ From fae4c59dea02a78b9aec2a1acea45ca85672faab Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Wed, 19 Feb 2020 08:34:51 -0800 Subject: [PATCH 06/10] Address feedback: Optional.ofNullable --- .../amazon/opendistroforelasticsearch/ad/util/Throttler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java index 54e66160..333516df 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java @@ -44,7 +44,7 @@ public Throttler(Clock clock) { * @return negative cache value(ActionRequest, Instant) */ public Optional> getFilteredQuery(AnomalyDetector detector) { - return Optional.of(negativeCache.get(detector.getDetectorId())); + return Optional.ofNullable(negativeCache.get(detector.getDetectorId())); } /** From c6a048df327cf67b8b21e85392783ed9409c5b71 Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Thu, 20 Feb 2020 09:21:16 -0800 Subject: [PATCH 07/10] Address feedback --- .../ad/feature/SearchFeatureDao.java | 3 ++- .../ad/util/ClientUtil.java | 8 ++++---- .../ad/util/Throttler.java | 19 +++++++++---------- .../ad/transport/ADStateManagerTests.java | 2 +- .../ad/util/ThrottlerTests.java | 8 ++++---- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java index 4bc5d891..09ccba25 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java @@ -116,7 +116,8 @@ public Optional getLatestDataTime(AnomalyDetector detector) { /** * Gets features for the given time period. * This function also adds given detector to negative cache before sending es request. - * Once we get response/exception within timeout, we treat this request as complete and clear the negative cache. + * Once response/exception is received within timeout, this request will be treated as complete + * and cleared from the negative cache. * Otherwise this detector entry remain in the negative to reject further request. * * @param detector info about indices, documents, feature query diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index 36ae5281..7dfce1cd 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -180,17 +180,17 @@ public Optional AnomalyDetector detector ) { try { - throttler.insertFilteredQuery(detector, request); + throttler.insertFilteredQuery(detector.getDetectorId(), request); AtomicReference respReference = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); consumer.accept(request, new LatchedActionListener(ActionListener.wrap(response -> { // clear negative cache - throttler.clearFilteredQuery(detector); + throttler.clearFilteredQuery(detector.getDetectorId()); respReference.set(response); }, exception -> { // clear negative cache - throttler.clearFilteredQuery(detector); + throttler.clearFilteredQuery(detector.getDetectorId()); LOG.error("Cannot get response for request {}, error: {}", request, exception); }), latch)); @@ -210,6 +210,6 @@ public Optional * @return true if given detector has a running query else false */ public boolean hasRunningQuery(AnomalyDetector detector) { - return throttler.getFilteredQuery(detector).isPresent(); + return throttler.getFilteredQuery(detector.getDetectorId()).isPresent(); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java index 333516df..22508ae9 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java @@ -40,28 +40,27 @@ public Throttler(Clock clock) { /** * Get negative cache value(ActionRequest, Instant) for given detector - * @param detector AnomalyDetector + * @param detectorId AnomalyDetector ID * @return negative cache value(ActionRequest, Instant) */ - public Optional> getFilteredQuery(AnomalyDetector detector) { - return Optional.ofNullable(negativeCache.get(detector.getDetectorId())); + public Optional> getFilteredQuery(String detectorId) { + return Optional.ofNullable(negativeCache.get(detectorId)); } /** * Insert the negative cache entry for given detector - * @param detector AnomalyDetector + * @param detectorId AnomalyDetector ID * @param request ActionRequest */ - public void insertFilteredQuery(AnomalyDetector detector, ActionRequest request) { - negativeCache.put(detector.getDetectorId(), new AbstractMap.SimpleEntry<>(request, clock.instant())); + public synchronized void insertFilteredQuery(String detectorId, ActionRequest request) { + negativeCache.put(detectorId, new AbstractMap.SimpleEntry<>(request, clock.instant())); } /** * Clear the negative cache for given detector. - * If detectorId is null, do nothing - * @param detector AnomalyDetector + * @param detectorId AnomalyDetector ID */ - public void clearFilteredQuery(AnomalyDetector detector) { - negativeCache.keySet().removeIf(key -> key.equals(detector.getDetectorId())); + public void clearFilteredQuery(String detectorId) { + negativeCache.remove(detectorId); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java index 1a3fa0ef..c38fddea 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java @@ -212,7 +212,7 @@ public void testHasRunningQuery() throws IOException { AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of(), null); SearchRequest dummySearchRequest = new SearchRequest(); assertFalse(stateManager.hasRunningQuery(detector)); - throttler.insertFilteredQuery(detector, dummySearchRequest); + throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest); assertTrue(stateManager.hasRunningQuery(detector)); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java index fb9fbe33..1bc9db42 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java @@ -44,11 +44,11 @@ public void setup() { public void test() throws IOException { AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of(), null); SearchRequest dummySearchRequest = new SearchRequest(); - throttler.insertFilteredQuery(detector, dummySearchRequest); - Optional> entry = throttler.getFilteredQuery(detector); + throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest); + Optional> entry = throttler.getFilteredQuery(detector.getDetectorId()); assertTrue(entry.isPresent()); - throttler.clearFilteredQuery(detector); - entry = throttler.getFilteredQuery(detector); + throttler.clearFilteredQuery(detector.getDetectorId()); + entry = throttler.getFilteredQuery(detector.getDetectorId()); assertFalse(entry.isPresent()); return; } From f7ad7c0fb5c812e67cee3e1c851d32282cb02b25 Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Thu, 20 Feb 2020 13:43:53 -0800 Subject: [PATCH 08/10] 1) Adding test case for throttler 2) Adding check when inserting new entry to negative cache. --- .../rest/RestDeleteAnomalyDetectorAction.java | 18 ++++---- .../ad/settings/AnomalyDetectorSettings.java | 24 +++++------ .../AnomalyResultTransportAction.java | 2 +- .../ad/util/ClientUtil.java | 9 ++-- .../ad/util/Throttler.java | 8 ++-- .../ad/util/ThrottlerTests.java | 43 +++++++++++++------ 6 files changed, 61 insertions(+), 43 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java index 7f7558c1..c6a08a06 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java @@ -17,15 +17,15 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorActionHandler; -import com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorAction; -import com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorRequest; +import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction; +import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorRequest; +import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorResponse; import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -89,8 +89,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli return channel -> { if (channel.request().method() == RestRequest.Method.POST) { logger.info("Stop anomaly detector {}", detectorId); - DeleteDetectorRequest deleteDetectorRequest = new DeleteDetectorRequest().adID(detectorId); - client.execute(DeleteDetectorAction.INSTANCE, deleteDetectorRequest, stopAdDetectorListener(channel, detectorId)); + StopDetectorRequest stopDetectorRequest = new StopDetectorRequest(detectorId); + client.execute(StopDetectorAction.INSTANCE, stopDetectorRequest, stopAdDetectorListener(channel, detectorId)); } else if (channel.request().method() == RestRequest.Method.DELETE) { logger.info("Delete anomaly detector {}", detectorId); handler @@ -117,11 +117,11 @@ private void deleteAnomalyDetectorDoc( client.delete(deleteRequest, new RestStatusToXContentListener<>(channel)); } - private ActionListener stopAdDetectorListener(RestChannel channel, String detectorId) { - return new ActionListener() { + private ActionListener stopAdDetectorListener(RestChannel channel, String detectorId) { + return new ActionListener() { @Override - public void onResponse(AcknowledgedResponse deleteDetectorResponse) { - if (deleteDetectorResponse.isAcknowledged()) { + public void onResponse(StopDetectorResponse stopDetectorResponse) { + if (stopDetectorResponse.success()) { logger.info("AD model deleted successfully for detector {}", detectorId); channel.sendResponse(new BytesRestResponse(RestStatus.OK, "AD model deleted successfully")); } else { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java index c1d8de57..3dbf0234 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java @@ -29,14 +29,14 @@ public final class AnomalyDetectorSettings { private AnomalyDetectorSettings() {} public static final Setting MAX_ANOMALY_DETECTORS = Setting - .intSetting("ml.anomaly_detectors.max_anomaly_detectors", 1000, Setting.Property.NodeScope, Setting.Property.Dynamic); + .intSetting("opendistro.anomaly_detection.max_anomaly_detectors", 1000, Setting.Property.NodeScope, Setting.Property.Dynamic); public static final Setting MAX_ANOMALY_FEATURES = Setting - .intSetting("ml.anomaly_detectors.max_anomaly_features", 5, Setting.Property.NodeScope, Setting.Property.Dynamic); + .intSetting("opendistro.anomaly_detection.max_anomaly_features", 5, Setting.Property.NodeScope, Setting.Property.Dynamic); public static final Setting REQUEST_TIMEOUT = Setting .positiveTimeSetting( - "ml.anomaly_detectors.request_timeout", + "opendistro.anomaly_detection.request_timeout", TimeValue.timeValueSeconds(10), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -44,7 +44,7 @@ private AnomalyDetectorSettings() {} public static final Setting DETECTION_INTERVAL = Setting .positiveTimeSetting( - "ml.anomaly_detectors.detection_interval", + "opendistro.anomaly_detection.detection_interval", TimeValue.timeValueMinutes(10), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -52,7 +52,7 @@ private AnomalyDetectorSettings() {} public static final Setting DETECTION_WINDOW_DELAY = Setting .timeSetting( - "ml.anomaly_detectors.detection_window_delay", + "opendistro.anomaly_detection.detection_window_delay", TimeValue.timeValueMinutes(0), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -60,7 +60,7 @@ private AnomalyDetectorSettings() {} public static final Setting AD_RESULT_ROLLOVER_PERIOD = Setting .positiveTimeSetting( - "ml.anomaly_detectors.ad_result_rollover_period", + "opendistro.anomaly_detection.ad_result_rollover_period", TimeValue.timeValueHours(12), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -68,7 +68,7 @@ private AnomalyDetectorSettings() {} public static final Setting AD_RESULT_HISTORY_ROLLOVER_PERIOD = Setting .positiveTimeSetting( - "ml.anomaly_detectors.ad_result_history_rollover_period", + "opendistro.anomaly_detection.ad_result_history_rollover_period", TimeValue.timeValueHours(12), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -76,7 +76,7 @@ private AnomalyDetectorSettings() {} public static final Setting AD_RESULT_HISTORY_INDEX_MAX_AGE = Setting .positiveTimeSetting( - "ml.anomaly_detectors.ad_result_history_max_age", + "opendistro.anomaly_detection.ad_result_history_max_age", TimeValue.timeValueHours(24), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -102,7 +102,7 @@ private AnomalyDetectorSettings() {} public static final Setting COOLDOWN_MINUTES = Setting .positiveTimeSetting( - "ml.anomaly_detectors.cooldown_minutes", + "opendistro.anomaly_detection.cooldown_minutes", TimeValue.timeValueMinutes(5), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -110,7 +110,7 @@ private AnomalyDetectorSettings() {} public static final Setting BACKOFF_MINUTES = Setting .positiveTimeSetting( - "ml.anomaly_detectors.backoff_minutes", + "opendistro.anomaly_detection.backoff_minutes", TimeValue.timeValueMinutes(15), Setting.Property.NodeScope, Setting.Property.Dynamic @@ -118,14 +118,14 @@ private AnomalyDetectorSettings() {} public static final Setting BACKOFF_INITIAL_DELAY = Setting .positiveTimeSetting( - "ml.anomaly_detectors.backoff_initial_delay", + "opendistro.anomaly_detection.backoff_initial_delay", TimeValue.timeValueMillis(1000), Setting.Property.NodeScope, Setting.Property.Dynamic ); public static final Setting MAX_RETRY_FOR_BACKOFF = Setting - .intSetting("ml.anomaly_detectors.max_retry_for_backoff", 3, 0, Setting.Property.NodeScope, Setting.Property.Dynamic); + .intSetting("opendistro.anomaly_detection.max_retry_for_backoff", 3, 0, Setting.Property.NodeScope, Setting.Property.Dynamic); public static final String ANOMALY_DETECTORS_INDEX_MAPPING_FILE = "mappings/anomaly-detectors.json"; public static final String ANOMALY_RESULTS_INDEX_MAPPING_FILE = "mappings/anomaly-results.json"; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java index 4dc68cd8..b4cdfe2f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java @@ -250,7 +250,7 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< } AnomalyDetector anomalyDetector = detector.get(); if (stateManager.hasRunningQuery(anomalyDetector)) { - LOG.info("There is one query running for detectorId: {}", anomalyDetector.getDetectorId()); + LOG.error("There is one query running for detectorId: {}", anomalyDetector.getDetectorId()); listener.onFailure(new EndRunException(adID, "There is one query running on AnomalyDetector", true)); return; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index 7dfce1cd..36595ec0 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -17,8 +17,6 @@ import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT; -import java.time.Instant; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -26,6 +24,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; +import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import org.apache.logging.log4j.Logger; @@ -180,7 +179,11 @@ public Optional AnomalyDetector detector ) { try { - throttler.insertFilteredQuery(detector.getDetectorId(), request); + // if key already exist, reject the request and throws exception + if (!throttler.insertFilteredQuery(detector.getDetectorId(), request)) { + LOG.error("There is one query running for detectorId: {}", detector.getDetectorId()); + throw new EndRunException(detector.getDetectorId(), "There is one query running on AnomalyDetector", true); + } AtomicReference respReference = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java index 22508ae9..4c5ebaed 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java @@ -14,8 +14,6 @@ */ package com.amazon.opendistroforelasticsearch.ad.util; -import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; - import java.time.Clock; import java.time.Instant; import java.util.AbstractMap; @@ -49,11 +47,13 @@ public Optional> getFilteredQuery(String detec /** * Insert the negative cache entry for given detector + * If key already exists, return false. Otherwise true. * @param detectorId AnomalyDetector ID * @param request ActionRequest + * @return true if key doesn't exist otherwise false. */ - public synchronized void insertFilteredQuery(String detectorId, ActionRequest request) { - negativeCache.put(detectorId, new AbstractMap.SimpleEntry<>(request, clock.instant())); + public synchronized boolean insertFilteredQuery(String detectorId, ActionRequest request) { + return negativeCache.putIfAbsent(detectorId, new AbstractMap.SimpleEntry<>(request, clock.instant())) == null; } /** diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java index 1bc9db42..df91946e 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java @@ -15,21 +15,15 @@ package com.amazon.opendistroforelasticsearch.ad.util; -import com.amazon.opendistroforelasticsearch.ad.TestHelpers; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; -import com.google.common.collect.ImmutableMap; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.test.ESTestCase; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.time.Clock; -import java.time.Instant; -import java.util.Map; -import java.util.Optional; import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; public class ThrottlerTests extends ESTestCase { private Throttler throttler; @@ -41,15 +35,36 @@ public void setup() { } @Test - public void test() throws IOException { - AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of(), null); + public void testGetFilteredQuery() { + AnomalyDetector detector = mock(AnomalyDetector.class); + when(detector.getDetectorId()).thenReturn("test detector Id"); SearchRequest dummySearchRequest = new SearchRequest(); throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest); - Optional> entry = throttler.getFilteredQuery(detector.getDetectorId()); - assertTrue(entry.isPresent()); + // case 1: key exists + assertTrue(throttler.getFilteredQuery(detector.getDetectorId()).isPresent()); + // case 2: key doesn't exist + assertFalse(throttler.getFilteredQuery("different test detector Id").isPresent()); + } + + @Test + public void testInsertFilteredQuery() { + AnomalyDetector detector = mock(AnomalyDetector.class); + when(detector.getDetectorId()).thenReturn("test detector Id"); + SearchRequest dummySearchRequest = new SearchRequest(); + // first time: key doesn't exist + assertTrue(throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest)); + // second time: key exists + assertFalse(throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest)); + } + + @Test + public void testClearFilteredQuery() { + AnomalyDetector detector = mock(AnomalyDetector.class); + when(detector.getDetectorId()).thenReturn("test detector Id"); + SearchRequest dummySearchRequest = new SearchRequest(); + assertTrue(throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest)); throttler.clearFilteredQuery(detector.getDetectorId()); - entry = throttler.getFilteredQuery(detector.getDetectorId()); - assertFalse(entry.isPresent()); - return; + assertTrue(throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest)); } + } From 95f389255c0d487162bae63747dd89e987c46ff3 Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Mon, 24 Feb 2020 15:20:06 -0800 Subject: [PATCH 09/10] Add additional clean cache step. --- .../ad/util/ClientUtil.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index 91c89a46..deda8c06 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -187,15 +187,21 @@ public Optional AtomicReference respReference = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - consumer.accept(request, new LatchedActionListener(ActionListener.wrap(response -> { - // clear negative cache + try { + consumer.accept(request, new LatchedActionListener(ActionListener.wrap(response -> { + // clear negative cache + throttler.clearFilteredQuery(detector.getDetectorId()); + respReference.set(response); + }, exception -> { + // clear negative cache + throttler.clearFilteredQuery(detector.getDetectorId()); + LOG.error("Cannot get response for request {}, error: {}", request, exception); + }), latch)); + } catch (Exception e) { + LOG.error("Failed to process the request. Clear negative cache"); throttler.clearFilteredQuery(detector.getDetectorId()); - respReference.set(response); - }, exception -> { - // clear negative cache - throttler.clearFilteredQuery(detector.getDetectorId()); - LOG.error("Cannot get response for request {}, error: {}", request, exception); - }), latch)); + throw e; + } if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) { throw new ElasticsearchTimeoutException("Cannot get response within time limit: " + request.toString()); From 1d086d120dcc1f0fb02e9bd131cbfe9b9f434c29 Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Mon, 24 Feb 2020 15:37:20 -0800 Subject: [PATCH 10/10] Add javadoc for EndRunException --- .../amazon/opendistroforelasticsearch/ad/util/ClientUtil.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index deda8c06..136583fb 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -169,6 +169,7 @@ public Response * @param ActionResponse * @param detector Anomaly Detector * @return the response + * @throws EndRunException when there is already a query running * @throws ElasticsearchTimeoutException when we cannot get response within time. * @throws IllegalStateException when the waiting thread is interrupted */ @@ -198,7 +199,7 @@ public Optional LOG.error("Cannot get response for request {}, error: {}", request, exception); }), latch)); } catch (Exception e) { - LOG.error("Failed to process the request. Clear negative cache"); + LOG.error("Failed to process the request for detectorId: {}.", detector.getDetectorId()); throttler.clearFilteredQuery(detector.getDetectorId()); throw e; }