diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index b6f9164b..3824f6e6 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -124,10 +124,12 @@ import java.security.PrivilegedAction; import java.time.Clock; import java.util.Arrays; +import java.util.Calendar; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.TimeZone; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -186,7 +188,14 @@ public List getRestHandlers( jobRunner.setAnomalyResultHandler(anomalyResultHandler); jobRunner.setSettings(settings); - AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(client, this.xContentRegistry, this.nodeFilter); + AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner( + client, + this.xContentRegistry, + this.nodeFilter, + indexNameExpressionResolver, + clusterService, + Calendar.getInstance(TimeZone.getTimeZone("UTC")) + ); RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(restController, profileRunner); RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction( settings, diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java index b254eb3a..943a1a4f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java @@ -20,7 +20,16 @@ import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -29,8 +38,11 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParseException; @@ -39,6 +51,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -62,13 +75,26 @@ public class AnomalyDetectorProfileRunner { private Client client; private NamedXContentRegistry xContentRegistry; private DiscoveryNodeFilterer nodeFilter; + private final IndexNameExpressionResolver indexNameExpressionResolver; static String FAIL_TO_FIND_DETECTOR_MSG = "Fail to find detector with id: "; static String FAIL_TO_GET_PROFILE_MSG = "Fail to get profile for detector "; - - public AnomalyDetectorProfileRunner(Client client, NamedXContentRegistry xContentRegistry, DiscoveryNodeFilterer nodeFilter) { + private final ClusterService clusterService; + private Calendar calendar; + + public AnomalyDetectorProfileRunner( + Client client, + NamedXContentRegistry xContentRegistry, + DiscoveryNodeFilterer nodeFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterService clusterService, + Calendar calendar + ) { this.client = client; this.xContentRegistry = xContentRegistry; this.nodeFilter = nodeFilter; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.clusterService = clusterService; + this.calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); } public void profile(String detectorId, ActionListener listener, Set profiles) { @@ -127,7 +153,7 @@ private void prepareProfile( profileState(detectorId, enabledTimeMs, listener, job.isEnabled()); } if (profiles.contains(ProfileName.ERROR)) { - profileError(detectorId, enabledTimeMs, listener); + profileError(detectorId, enabledTimeMs, job.getDisabledTime(), listener); } if (profiles.contains(ProfileName.COORDINATING_NODE) @@ -208,7 +234,7 @@ private ActionListener onInittedEver( return ActionListener.wrap(searchResponse -> { SearchHits hits = searchResponse.getHits(); DetectorProfile profile = new DetectorProfile(); - if (hits.getTotalHits().value == 0L) { + if (hits.getHits().length == 0L) { profile.setState(DetectorState.INIT); } else { profile.setState(DetectorState.RUNNING); @@ -234,20 +260,106 @@ private ActionListener onInittedEver( } /** + * Precondition: + * 1. Index are rotated with name pattern ".opendistro-anomaly-results-history-{now/d}-1" and now is using UTC. + * 2. Latest entry with error is recorded within enabled and disabled time. Note disabled time can be null. + * * Error is populated if error of the latest anomaly result is not empty. + * + * Two optimization to avoid scanning all anomaly result indices to get a detector's most recent error + * + * First, when a detector is running, we only need to scan the current index, not all of the rolled over ones + * since we are interested in the latest error. + * Second, when a detector is disabled, we only need to scan the latest anomaly result indices created before the + * detector's enable time. + * * @param detectorId detector id - * @param enabledTime the time when AD job is enabled in milliseconds + * @param enabledTimeMillis the time when AD job is enabled in milliseconds * @param listener listener to process the returned error or exception */ - private void profileError(String detectorId, long enabledTime, MultiResponsesDelegateActionListener listener) { - SearchRequest searchLatestResult = createLatestAnomalyResultRequest(detectorId, enabledTime); + private void profileError( + String detectorId, + long enabledTimeMillis, + Instant disabledTime, + MultiResponsesDelegateActionListener listener + ) { + String[] latestIndex = null; + + long disabledTimeMillis = 0; + if (disabledTime != null) { + disabledTimeMillis = disabledTime.toEpochMilli(); + } + if (enabledTimeMillis > disabledTimeMillis) { + // detector is still running + latestIndex = new String[1]; + latestIndex[0] = AnomalyResult.ANOMALY_RESULT_INDEX; + } else { + String[] concreteIndices = indexNameExpressionResolver + .concreteIndexNames( + clusterService.state(), + IndicesOptions.lenientExpandOpen(), + AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN + ); + + // find the latest from result indices such as .opendistro-anomaly-results-history-2020.04.06-1 and + // /.opendistro-anomaly-results-history-2020.04.07-000002 + long maxTimestamp = -1; + TreeMap> candidateIndices = new TreeMap<>(); + for (String indexName : concreteIndices) { + Matcher m = Pattern.compile("\\.opendistro-anomaly-results-history-(\\d{4})\\.(\\d{2})\\.(\\d{2})-\\d+").matcher(indexName); + if (m.matches()) { + int year = Integer.parseInt(m.group(1)); + int month = Integer.parseInt(m.group(2)); + int date = Integer.parseInt(m.group(3)); + // month starts with 0 + calendar.clear(); + calendar.set(year, month - 1, date); + // 2020.05.08 is translated to 1588896000000 + long timestamp = calendar.getTimeInMillis(); + + // a candidate index can be created before or after enabled time, but the index is definitely created before disabled + // time + if (timestamp <= disabledTimeMillis && maxTimestamp <= timestamp) { + maxTimestamp = timestamp; + // we can have two rotations on the same day and we don't know which one has our data, so we keep all + List indexList = candidateIndices.computeIfAbsent(timestamp, k -> new ArrayList()); + indexList.add(indexName); + } + } + } + List candidates = new ArrayList(); + List latestCandidate = candidateIndices.get(maxTimestamp); + + if (latestCandidate != null) { + candidates.addAll(latestCandidate); + } + + // look back one more index for an edge case: + // Suppose detector interval is 1 minute. Detector last run is at 2020-05-07, 11:59:50 PM, + // then AD result indices rolled over as .opendistro-anomaly-results-history-2020.05.07-001 + // Detector next run will be 2020-05-08, 00:00:50 AM. If a user stop the detector at + // 2020-05-08 00:00:10 AM, detector will not have AD result on 2020-05-08. + // We check AD result indices one day earlier to make sure we can always get AD result. + Map.Entry> earlierCandidate = candidateIndices.lowerEntry(maxTimestamp); + if (earlierCandidate != null) { + candidates.addAll(earlierCandidate.getValue()); + } + latestIndex = candidates.toArray(new String[0]); + } + + if (latestIndex == null || latestIndex.length == 0) { + // no result index found: can be due to anomaly result is not created yet or result indices for the detector have been deleted. + listener.onResponse(new DetectorProfile()); + return; + } + SearchRequest searchLatestResult = createLatestAnomalyResultRequest(detectorId, enabledTimeMillis, disabledTimeMillis, latestIndex); client.search(searchLatestResult, onGetLatestAnomalyResult(listener, detectorId)); } private ActionListener onGetLatestAnomalyResult(ActionListener listener, String detectorId) { return ActionListener.wrap(searchResponse -> { SearchHits hits = searchResponse.getHits(); - if (hits.getTotalHits().value == 0L) { + if (hits.getHits().length == 0L) { listener.onResponse(new DetectorProfile()); } else { SearchHit hit = hits.getAt(0); @@ -259,12 +371,12 @@ private ActionListener onGetLatestAnomalyResult(ActionListener= enabledTime) { + rangeBuilder.lte(disabledTime); + } + filterQuery.filter(rangeBuilder); FieldSortBuilder sortQuery = new FieldSortBuilder(AnomalyResult.EXECUTION_END_TIME_FIELD).order(SortOrder.DESC); - SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1).sort(sortQuery); + // I am only looking for last 1 occurrence and have no interest in the total number of documents that match the query. + // ES will not try to count the number of documents and will be able to terminate the query as soon as 1 document + // have been collected per segment. + SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1).sort(sortQuery).trackTotalHits(false); - SearchRequest request = new SearchRequest(AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN); + SearchRequest request = new SearchRequest(index); request.source(source); return request; } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java index 158d1c92..99f7c189 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java @@ -22,10 +22,12 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.time.Instant; import java.util.Arrays; +import java.util.Calendar; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -46,7 +48,10 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -57,6 +62,7 @@ import org.junit.BeforeClass; import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData; +import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; @@ -74,10 +80,18 @@ public class AnomalyDetectorProfileRunnerTests extends ESTestCase { private Client client; private DiscoveryNodeFilterer nodeFilter; private AnomalyDetector detector; + private IndexNameExpressionResolver resolver; + private ClusterService clusterService; + private static Set stateOnly; private static Set stateNError; private static Set modelProfile; - private static String error = "No full shingle in current detection window"; + private static String noFullShingleError = "No full shingle in current detection window"; + private static String stoppedError = "Stopped detector as job failed consecutively for more than 3 times: Having trouble querying data." + + " Maybe all of your features have been disabled."; + private Calendar calendar; + private String indexWithRequiredError1 = ".opendistro-anomaly-results-history-2020.04.06-1"; + private String indexWithRequiredError2 = ".opendistro-anomaly-results-history-2020.04.07-000002"; // profile model related String node1; @@ -120,13 +134,25 @@ public void setUp() throws Exception { super.setUp(); client = mock(Client.class); nodeFilter = mock(DiscoveryNodeFilterer.class); - runner = new AnomalyDetectorProfileRunner(client, xContentRegistry(), nodeFilter); + calendar = mock(Calendar.class); + resolver = mock(IndexNameExpressionResolver.class); + clusterService = mock(ClusterService.class); + when(resolver.concreteIndexNames(any(), any(), any())) + .thenReturn( + new String[] { indexWithRequiredError1, indexWithRequiredError2, ".opendistro-anomaly-results-history-2020.04.08-000003" } + ); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test cluster")).build()); + + runner = new AnomalyDetectorProfileRunner(client, xContentRegistry(), nodeFilter, resolver, clusterService, calendar); } enum JobStatus { INDEX_NOT_EXIT, DISABLED, - ENABLED + ENABLED, + DISABLED_ROTATED_1, + DISABLED_ROTATED_2, + DISABLED_ROTATED_3 } enum InittedEverResultStatus { @@ -139,7 +165,9 @@ enum InittedEverResultStatus { enum ErrorResultStatus { INDEX_NOT_EXIT, NO_ERROR, - ERROR + SHINGLE_ERROR, + STOPPED_ERROR_1, + STOPPED_ERROR_2 } @SuppressWarnings("unchecked") @@ -170,6 +198,27 @@ private void setUpClientGet(boolean detectorExists, JobStatus jobStatus) throws job = TestHelpers.randomAnomalyDetectorJob(true); listener.onResponse(TestHelpers.createGetResponse(job, detector.getDetectorId())); break; + case DISABLED_ROTATED_1: + // enabled time is smaller than 1586217600000, while disabled time is larger than 1586217600000 + // which is April 7, 2020 12:00:00 AM. + job = TestHelpers + .randomAnomalyDetectorJob(false, Instant.ofEpochMilli(1586217500000L), Instant.ofEpochMilli(1586227600000L)); + listener.onResponse(TestHelpers.createGetResponse(job, detector.getDetectorId())); + break; + case DISABLED_ROTATED_2: + // both enabled and disabled time are larger than 1586217600000, + // which is April 7, 2020 12:00:00 AM. + job = TestHelpers + .randomAnomalyDetectorJob(false, Instant.ofEpochMilli(1586217500000L), Instant.ofEpochMilli(1586227600000L)); + listener.onResponse(TestHelpers.createGetResponse(job, detector.getDetectorId())); + break; + case DISABLED_ROTATED_3: + // both enabled and disabled time are larger than 1586131200000, + // which is April 6, 2020 12:00:00 AM. + job = TestHelpers + .randomAnomalyDetectorJob(false, Instant.ofEpochMilli(1586131300000L), Instant.ofEpochMilli(1586131400000L)); + listener.onResponse(TestHelpers.createGetResponse(job, detector.getDetectorId())); + break; default: assertTrue("should not reach here", false); break; @@ -214,10 +263,32 @@ private void setUpClientSearch(InittedEverResultStatus inittedEverResultStatus, result = TestHelpers.randomAnomalyDetectResult(null); listener.onResponse(TestHelpers.createSearchResponse(result)); break; - case ERROR: - result = TestHelpers.randomAnomalyDetectResult(error); + case SHINGLE_ERROR: + result = TestHelpers.randomAnomalyDetectResult(noFullShingleError); listener.onResponse(TestHelpers.createSearchResponse(result)); break; + case STOPPED_ERROR_2: + if (request.indices().length == 2) { + for (int i = 0; i < 2; i++) { + assertTrue( + request.indices()[i].equals(indexWithRequiredError1) + || request.indices()[i].equals(indexWithRequiredError2) + ); + } + result = TestHelpers.randomAnomalyDetectResult(stoppedError); + listener.onResponse(TestHelpers.createSearchResponse(result)); + } else { + assertTrue("should not reach here", false); + } + break; + case STOPPED_ERROR_1: + if (request.indices().length == 1 && request.indices()[0].equals(indexWithRequiredError1)) { + result = TestHelpers.randomAnomalyDetectResult(stoppedError); + listener.onResponse(TestHelpers.createSearchResponse(result)); + } else { + assertTrue("should not reach here", false); + } + break; default: assertTrue("should not reach here", false); break; @@ -326,11 +397,16 @@ public void testRunningNoError() throws IOException, InterruptedException { } public void testRunningWithError() throws IOException, InterruptedException { - testErrorStateTemplate(InittedEverResultStatus.GREATER_THAN_ZERO, ErrorResultStatus.ERROR, DetectorState.RUNNING, error); + testErrorStateTemplate( + InittedEverResultStatus.GREATER_THAN_ZERO, + ErrorResultStatus.SHINGLE_ERROR, + DetectorState.RUNNING, + noFullShingleError + ); } public void testInitWithError() throws IOException, InterruptedException { - testErrorStateTemplate(InittedEverResultStatus.EMPTY, ErrorResultStatus.ERROR, DetectorState.INIT, error); + testErrorStateTemplate(InittedEverResultStatus.EMPTY, ErrorResultStatus.SHINGLE_ERROR, DetectorState.INIT, noFullShingleError); } public void testExceptionOnStateFetching() throws IOException, InterruptedException { @@ -439,4 +515,60 @@ public void testProfileModels() throws InterruptedException, IOException { }), modelProfile); assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); } + + /** + * A detector's error message can be on a rotated index. This test makes sure we get error info + * from .opendistro-anomaly-results index that has been rolled over. + * @param state expected detector state + * @param jobStatus job status to config in the test case + * @throws IOException when profile API throws it + * @throws InterruptedException when our CountDownLatch has been interruptted + */ + private void stoppedDetectorErrorTemplate(DetectorState state, JobStatus jobStatus, ErrorResultStatus errorStatus) throws IOException, + InterruptedException { + setUpClientGet(true, jobStatus); + setUpClientSearch(InittedEverResultStatus.GREATER_THAN_ZERO, errorStatus); + DetectorProfile expectedProfile = new DetectorProfile(); + expectedProfile.setState(state); + expectedProfile.setError(stoppedError); + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertEquals(expectedProfile, response); + inProgressLatch.countDown(); + }, exception -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }), stateNError); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + /** + * Job enabled time is earlier than and disabled time is later than index 2 creation date, we expect to search 2 indices + */ + public void testDetectorStoppedEnabledTimeLtIndex2Date() throws IOException, InterruptedException { + stoppedDetectorErrorTemplate(DetectorState.DISABLED, JobStatus.DISABLED_ROTATED_1, ErrorResultStatus.STOPPED_ERROR_2); + } + + /** + * Both job enabled and disabled time are later than index 2 creation date, we expect to search 2 indices + */ + public void testDetectorStoppedEnabledTimeGtIndex2Date() throws IOException, InterruptedException { + stoppedDetectorErrorTemplate(DetectorState.DISABLED, JobStatus.DISABLED_ROTATED_2, ErrorResultStatus.STOPPED_ERROR_2); + } + + /** + * Both job enabled and disabled time are earlier than index 2 creation date, we expect to search 1 indices + */ + public void testDetectorStoppedEnabledTimeGtIndex1Date() throws IOException, InterruptedException { + stoppedDetectorErrorTemplate(DetectorState.DISABLED, JobStatus.DISABLED_ROTATED_3, ErrorResultStatus.STOPPED_ERROR_1); + } + + public void testAssumption() { + assertEquals( + "profileError depends on this assumption.", + ".opendistro-anomaly-results*", + AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN + ); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index 3af20295..8cb94b1d 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -343,19 +343,27 @@ public static AnomalyDetectorJob randomAnomalyDetectorJob() { return randomAnomalyDetectorJob(true); } - public static AnomalyDetectorJob randomAnomalyDetectorJob(boolean enabled) { + public static AnomalyDetectorJob randomAnomalyDetectorJob(boolean enabled, Instant enabledTime, Instant disabledTime) { return new AnomalyDetectorJob( randomAlphaOfLength(10), randomIntervalSchedule(), randomIntervalTimeConfiguration(), enabled, - Instant.now().truncatedTo(ChronoUnit.SECONDS), - Instant.now().truncatedTo(ChronoUnit.SECONDS), + enabledTime, + disabledTime, Instant.now().truncatedTo(ChronoUnit.SECONDS), 60L ); } + public static AnomalyDetectorJob randomAnomalyDetectorJob(boolean enabled) { + return randomAnomalyDetectorJob( + enabled, + Instant.now().truncatedTo(ChronoUnit.SECONDS), + Instant.now().truncatedTo(ChronoUnit.SECONDS) + ); + } + public static AnomalyDetectorExecutionInput randomAnomalyDetectorExecutionInput() throws IOException { return new AnomalyDetectorExecutionInput( randomAlphaOfLength(5),