From 5a728ae90fd0330863ed6ad471b416247da9e9b6 Mon Sep 17 00:00:00 2001 From: Lai <57818076+wnbts@users.noreply.github.com> Date: Fri, 10 Jan 2020 16:39:03 -0800 Subject: [PATCH] add windowdelay support (#24) --- .../transport/AnomalyResultTransportAction.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java index 20879b7c..2b90fb77 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java @@ -46,6 +46,7 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; import com.amazon.opendistroforelasticsearch.ad.model.FeatureData; +import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; import com.amazon.opendistroforelasticsearch.ad.stats.ADStats; import com.amazon.opendistroforelasticsearch.ad.stats.StatNames; @@ -233,6 +234,7 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< listener.onFailure(new EndRunException(adID, "AnomalyDetector is not available.", true)); return; } + AnomalyDetector anomalyDetector = detector.get(); String thresholdModelID = modelManager.getThresholdModelId(adID); Optional thresholdNode = hashRing.getOwningNode(thresholdModelID); @@ -247,8 +249,13 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< return; } - SinglePointFeatures featureOptional = featureManager.getCurrentFeatures(detector.get(), - request.getStart(), request.getEnd()); + long delayMillis = Optional.ofNullable((IntervalTimeConfiguration)anomalyDetector.getWindowDelay()) + .map(t -> t.toDuration().toMillis()).orElse(0L); + long startTime = request.getStart() - delayMillis; + long endTime = request.getEnd() - delayMillis; + + SinglePointFeatures featureOptional = featureManager.getCurrentFeatures(anomalyDetector, + startTime, endTime); List featureInResponse = null; @@ -354,8 +361,8 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< listener.onResponse(response); indexAnomalyResult(new AnomalyResult(adID, Double.valueOf(combinedScore), Double.valueOf(response.getAnomalyGrade()), Double.valueOf(confidence), - featureInResponse, Instant.ofEpochMilli(request.getStart()), - Instant.ofEpochMilli(request.getEnd()))); + featureInResponse, Instant.ofEpochMilli(startTime), + Instant.ofEpochMilli(endTime))); } else if (failure.get() != null) { listener.onFailure(failure.get()); } else {