diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java index aa0ce8075..a7af702a7 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java @@ -560,14 +560,12 @@ private void stopAdJob(String detectorId, AnomalyDetectorFunction function) { }, exception -> { log.error("JobRunner failed to update AD job as disabled for " + detectorId, exception); })); } else { log.info("AD Job was disabled for " + detectorId); - // function.execute(); } } catch (IOException e) { log.error("JobRunner failed to stop detector job " + detectorId, e); } } else { log.info("AD Job was not found for " + detectorId); - // function.execute(); } }, exception -> log.error("JobRunner failed to get detector job " + detectorId, exception)); diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java b/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java index cace64e3f..1ec27253a 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java @@ -762,7 +762,10 @@ public PooledObject wrap(LinkedBuffer obj) { adTaskManager, nodeFilter, threadPool, - client + client, + stateManager, + adTaskCacheManager, + AnomalyDetectorSettings.NUM_MIN_SAMPLES ); // return objects used by Guice to inject dependencies for e.g., diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java index d47494e5a..0bd413ec3 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java @@ -40,7 +40,6 @@ import org.opensearch.ad.model.ADTaskType; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyDetectorJob; -import org.opensearch.ad.model.AnomalyResult; import org.opensearch.ad.model.DetectorProfile; import org.opensearch.ad.model.DetectorProfileName; import org.opensearch.ad.model.DetectorState; @@ -64,8 +63,6 @@ import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.index.query.BoolQueryBuilder; -import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHits; import org.opensearch.search.aggregations.Aggregation; import org.opensearch.search.aggregations.AggregationBuilder; @@ -451,14 +448,15 @@ private void profileMultiEntityDetectorStateRelated( if (profileResponse.getTotalUpdates() < requiredSamples) { // need to double check since what ProfileResponse returns is the highest priority entity currently in memory, but // another entity might have already been initialized and sit somewhere else (in memory or on disk). - confirmMultiEntityDetectorInitStatus( - detector, - job.getEnabledTime().toEpochMilli(), - profileBuilder, - profilesToCollect, - profileResponse.getTotalUpdates(), - listener - ); + long enabledTime = job.getEnabledTime().toEpochMilli(); + long totalUpdates = profileResponse.getTotalUpdates(); + ProfileUtil + .confirmDetectorInitStatus( + detector, + enabledTime, + client, + onInittedEver(enabledTime, profileBuilder, profilesToCollect, detector, totalUpdates, listener) + ); } else { createRunningStateAndInitProgress(profilesToCollect, profileBuilder); listener.onResponse(profileBuilder.build()); @@ -471,18 +469,6 @@ private void profileMultiEntityDetectorStateRelated( } } - private void confirmMultiEntityDetectorInitStatus( - AnomalyDetector detector, - long enabledTime, - DetectorProfile.Builder profile, - Set profilesToCollect, - long totalUpdates, - MultiResponsesDelegateActionListener listener - ) { - SearchRequest searchLatestResult = createInittedEverRequest(detector.getDetectorId(), enabledTime, detector.getResultIndex()); - client.search(searchLatestResult, onInittedEver(enabledTime, profile, profilesToCollect, detector, totalUpdates, listener)); - } - private ActionListener onInittedEver( long lastUpdateTimeMs, DetectorProfile.Builder profileBuilder, @@ -602,26 +588,4 @@ private void processInitResponse( listener.onResponse(builder.build()); } - - /** - * Create search request to check if we have at least 1 anomaly score larger than 0 after AD job enabled time - * @param detectorId detector id - * @param enabledTime the time when AD job is enabled in milliseconds - * @return the search request - */ - private SearchRequest createInittedEverRequest(String detectorId, long enabledTime, String resultIndex) { - BoolQueryBuilder filterQuery = new BoolQueryBuilder(); - filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId)); - filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime)); - filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0)); - - SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1); - - SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS); - request.source(source); - if (resultIndex != null) { - request.indices(resultIndex); - } - return request; - } } diff --git a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java index 19710f0cb..c662ea119 100644 --- a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java +++ b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java @@ -22,6 +22,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionListener; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.ad.common.exception.AnomalyDetectionException; import org.opensearch.ad.common.exception.EndRunException; import org.opensearch.ad.common.exception.ResourceNotFoundException; import org.opensearch.ad.constant.CommonErrorMessages; @@ -32,6 +34,7 @@ import org.opensearch.ad.model.DetectorProfileName; import org.opensearch.ad.model.FeatureData; import org.opensearch.ad.model.IntervalTimeConfiguration; +import org.opensearch.ad.task.ADTaskCacheManager; import org.opensearch.ad.task.ADTaskManager; import org.opensearch.ad.transport.AnomalyResultResponse; import org.opensearch.ad.transport.ProfileAction; @@ -40,10 +43,12 @@ import org.opensearch.ad.transport.RCFPollingRequest; import org.opensearch.ad.transport.handler.AnomalyIndexHandler; import org.opensearch.ad.util.DiscoveryNodeFilterer; +import org.opensearch.ad.util.ExceptionUtil; import org.opensearch.client.Client; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.unit.TimeValue; import org.opensearch.commons.authuser.User; +import org.opensearch.search.SearchHits; import org.opensearch.threadpool.ThreadPool; public class ExecuteADResultResponseRecorder { @@ -55,6 +60,9 @@ public class ExecuteADResultResponseRecorder { private DiscoveryNodeFilterer nodeFilter; private ThreadPool threadPool; private Client client; + private NodeStateManager nodeStateManager; + private ADTaskCacheManager adTaskCacheManager; + private int rcfMinSamples; public ExecuteADResultResponseRecorder( AnomalyDetectionIndices anomalyDetectionIndices, @@ -62,7 +70,10 @@ public ExecuteADResultResponseRecorder( ADTaskManager adTaskManager, DiscoveryNodeFilterer nodeFilter, ThreadPool threadPool, - Client client + Client client, + NodeStateManager nodeStateManager, + ADTaskCacheManager adTaskCacheManager, + int rcfMinSamples ) { this.anomalyDetectionIndices = anomalyDetectionIndices; this.anomalyResultHandler = anomalyResultHandler; @@ -70,6 +81,9 @@ public ExecuteADResultResponseRecorder( this.nodeFilter = nodeFilter; this.threadPool = threadPool; this.client = client; + this.nodeStateManager = nodeStateManager; + this.adTaskCacheManager = adTaskCacheManager; + this.rcfMinSamples = rcfMinSamples; } public void indexAnomalyResult( @@ -185,27 +199,66 @@ private void updateLatestRealtimeTask( String error ) { // Don't need info as this will be printed repeatedly in each interval - adTaskManager - .updateLatestRealtimeTaskOnCoordinatingNode( + ActionListener listener = ActionListener.wrap(r -> { + if (r != null) { + log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState); + } + }, e -> { + if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) { + // Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction. + log.error("Can't find latest realtime task of detector " + detectorId); + adTaskManager.removeRealtimeTaskCache(detectorId); + } else { + log.error("Failed to update latest realtime task for detector " + detectorId, e); + } + }); + + // rcfTotalUpdates is null when we save exception messages + if (!adTaskCacheManager.hasQueriedResultIndex(detectorId) && rcfTotalUpdates != null && rcfTotalUpdates < rcfMinSamples) { + // confirm the total updates number since it is possible that we have already had results after job enabling time + // If yes, total updates should be at least rcfMinSamples so that the init progress reaches 100%. + confirmTotalRCFUpdatesFound( detectorId, taskState, rcfTotalUpdates, detectorIntervalInMinutes, error, - ActionListener.wrap(r -> { - if (r != null) { - log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState); - } - }, e -> { - if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) { - // Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction. - log.error("Can't find latest realtime task of detector " + detectorId); - adTaskManager.removeRealtimeTaskCache(detectorId); - } else { - log.error("Failed to update latest realtime task for detector " + detectorId, e); - } - }) + ActionListener + .wrap( + r -> adTaskManager + .updateLatestRealtimeTaskOnCoordinatingNode( + detectorId, + taskState, + r, + detectorIntervalInMinutes, + error, + listener + ), + e -> { + log.error("Fail to confirm rcf update", e); + adTaskManager + .updateLatestRealtimeTaskOnCoordinatingNode( + detectorId, + taskState, + rcfTotalUpdates, + detectorIntervalInMinutes, + error, + listener + ); + } + ) ); + } else { + adTaskManager + .updateLatestRealtimeTaskOnCoordinatingNode( + detectorId, + taskState, + rcfTotalUpdates, + detectorIntervalInMinutes, + error, + listener + ); + } } /** @@ -285,4 +338,51 @@ public void indexAnomalyResultException( } } + private void confirmTotalRCFUpdatesFound( + String detectorId, + String taskState, + Long rcfTotalUpdates, + Long detectorIntervalInMinutes, + String error, + ActionListener listener + ) { + nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> { + if (!detectorOptional.isPresent()) { + listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get detector")); + return; + } + nodeStateManager.getAnomalyDetectorJob(detectorId, ActionListener.wrap(jobOptional -> { + if (!jobOptional.isPresent()) { + listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get job")); + return; + } + adTaskCacheManager.markResultIndexQueried(detectorId); + ProfileUtil + .confirmDetectorInitStatus( + detectorOptional.get(), + jobOptional.get().getEnabledTime().toEpochMilli(), + client, + ActionListener.wrap(searchResponse -> { + ActionListener.completeWith(listener, () -> { + SearchHits hits = searchResponse.getHits(); + Long correctedTotalUpdates = rcfTotalUpdates; + if (hits.getTotalHits().value > 0L) { + // correct the number if we have already had results after job enabling time + // so that the detector won't stay initialized + correctedTotalUpdates = Long.valueOf(rcfMinSamples); + } + return correctedTotalUpdates; + }); + }, exception -> { + if (ExceptionUtil.isIndexNotAvailable(exception)) { + // anomaly result index is not created yet + listener.onResponse(0L); + } else { + listener.onFailure(exception); + } + }) + ); + }, e -> listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get job")))); + }, e -> listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get detector")))); + } } diff --git a/src/main/java/org/opensearch/ad/ProfileUtil.java b/src/main/java/org/opensearch/ad/ProfileUtil.java new file mode 100644 index 000000000..2531a97ec --- /dev/null +++ b/src/main/java/org/opensearch/ad/ProfileUtil.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.ad.constant.CommonName; +import org.opensearch.ad.model.AnomalyDetector; +import org.opensearch.ad.model.AnomalyResult; +import org.opensearch.client.Client; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.builder.SearchSourceBuilder; + +public class ProfileUtil { + /** + * Create search request to check if we have at least 1 anomaly score larger than 0 after AD job enabled time + * @param detectorId detector id + * @param enabledTime the time when AD job is enabled in milliseconds + * @return the search request + */ + private static SearchRequest createInittedEverRequest(String detectorId, long enabledTime, String resultIndex) { + BoolQueryBuilder filterQuery = new BoolQueryBuilder(); + filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId)); + filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime)); + filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0)); + + SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1); + + SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS); + request.source(source); + if (resultIndex != null) { + request.indices(resultIndex); + } + return request; + } + + public static void confirmDetectorInitStatus( + AnomalyDetector detector, + long enabledTime, + Client client, + ActionListener listener + ) { + SearchRequest searchLatestResult = createInittedEverRequest(detector.getDetectorId(), enabledTime, detector.getResultIndex()); + client.search(searchLatestResult, listener); + } +} diff --git a/src/main/java/org/opensearch/ad/task/ADRealtimeTaskCache.java b/src/main/java/org/opensearch/ad/task/ADRealtimeTaskCache.java index b7399e4c7..bf8cbb860 100644 --- a/src/main/java/org/opensearch/ad/task/ADRealtimeTaskCache.java +++ b/src/main/java/org/opensearch/ad/task/ADRealtimeTaskCache.java @@ -38,12 +38,17 @@ public class ADRealtimeTaskCache { // detector interval in milliseconds. private long detectorIntervalInMillis; + // we query result index to check if there are any result generated for detector to tell whether it passed initialization of not. + // To avoid repeated query when there is no data, record whether we have done that or not. + private boolean queriedResultIndex; + public ADRealtimeTaskCache(String state, Float initProgress, String error, long detectorIntervalInMillis) { this.state = state; this.initProgress = initProgress; this.error = error; this.lastJobRunTime = Instant.now().toEpochMilli(); this.detectorIntervalInMillis = detectorIntervalInMillis; + this.queriedResultIndex = false; } public String getState() { @@ -74,6 +79,14 @@ public void setLastJobRunTime(long lastJobRunTime) { this.lastJobRunTime = lastJobRunTime; } + public boolean hasQueriedResultIndex() { + return queriedResultIndex; + } + + public void setQueriedResultIndex(boolean queriedResultIndex) { + this.queriedResultIndex = queriedResultIndex; + } + public boolean expired() { return lastJobRunTime + 2 * detectorIntervalInMillis < Instant.now().toEpochMilli(); } diff --git a/src/main/java/org/opensearch/ad/task/ADTaskCacheManager.java b/src/main/java/org/opensearch/ad/task/ADTaskCacheManager.java index 40fa8e2c4..61319f3e2 100644 --- a/src/main/java/org/opensearch/ad/task/ADTaskCacheManager.java +++ b/src/main/java/org/opensearch/ad/task/ADTaskCacheManager.java @@ -1013,33 +1013,38 @@ public void clearPendingEntities(String detectorId) { } /** - * Check if realtime task field value changed or not by comparing with cache. + * Check if realtime task field value change needed or not by comparing with cache. * 1. If new field value is null, will consider this field as not changed. - * 2. If any field value changed, will consider the realtime task changed. - * 3. If realtime task cache not found, will consider the realtime task changed. + * 2. If init progress is larger or the old init progress is null, will consider the realtime task + * change needed; for other fields, as long as field values changed, will consider the realtime + * task changed. We did this so that the init progress won't go backwards. + * 3. If realtime task cache not found, will consider the realtime task change needed. * * @param detectorId detector id * @param newState new task state * @param newInitProgress new init progress * @param newError new error - * @return true if realtime task changed comparing with realtime task cache. + * @return true if realtime task change needed. */ - public boolean isRealtimeTaskChanged(String detectorId, String newState, Float newInitProgress, String newError) { + public boolean isRealtimeTaskChangeNeeded(String detectorId, String newState, Float newInitProgress, String newError) { if (realtimeTaskCaches.containsKey(detectorId)) { ADRealtimeTaskCache realtimeTaskCache = realtimeTaskCaches.get(detectorId); boolean stateChanged = false; if (newState != null && !newState.equals(realtimeTaskCache.getState())) { stateChanged = true; } - boolean initProgressChanged = false; - if (newInitProgress != null && !newInitProgress.equals(realtimeTaskCache.getInitProgress())) { - initProgressChanged = true; + boolean initProgressChangeNeeded = false; + Float existingProgress = realtimeTaskCache.getInitProgress(); + if (newInitProgress != null + && !newInitProgress.equals(existingProgress) + && (existingProgress == null || newInitProgress > existingProgress)) { + initProgressChangeNeeded = true; } boolean errorChanged = false; if (newError != null && !newError.equals(realtimeTaskCache.getError())) { errorChanged = true; } - if (stateChanged || initProgressChanged || errorChanged) { + if (stateChanged || initProgressChangeNeeded || errorChanged) { return true; } return false; @@ -1351,4 +1356,33 @@ public void cleanExpiredHCBatchTaskRunStates() { cleanExpiredHCBatchTaskRunStatesSemaphore.release(); } } + + /** + * We query result index to check if there are any result generated for detector to tell whether it passed initialization of not. + * To avoid repeated query when there is no data, record whether we have done that or not. + * @param id detector id + */ + public void markResultIndexQueried(String id) { + ADRealtimeTaskCache realtimeTaskCache = realtimeTaskCaches.get(id); + // we initialize a real time cache at the beginning of AnomalyResultTransportAction if it + // cannot be found. If the cache is empty, we will return early and wait it for it to be + // initialized. + if (realtimeTaskCache != null) { + realtimeTaskCache.setQueriedResultIndex(true); + } + } + + /** + * We query result index to check if there are any result generated for detector to tell whether it passed initialization of not. + * + * @param id detector id + * @return whether we have queried result index or not. + */ + public boolean hasQueriedResultIndex(String id) { + ADRealtimeTaskCache realtimeTaskCache = realtimeTaskCaches.get(id); + if (realtimeTaskCache != null) { + return realtimeTaskCache.hasQueriedResultIndex(); + } + return false; + } } diff --git a/src/main/java/org/opensearch/ad/task/ADTaskManager.java b/src/main/java/org/opensearch/ad/task/ADTaskManager.java index 1b3a775c8..c1e61c536 100644 --- a/src/main/java/org/opensearch/ad/task/ADTaskManager.java +++ b/src/main/java/org/opensearch/ad/task/ADTaskManager.java @@ -2057,7 +2057,7 @@ public void updateLatestRealtimeTaskOnCoordinatingNode( } error = Optional.ofNullable(error).orElse(""); - if (!adTaskCacheManager.isRealtimeTaskChanged(detectorId, newState, initProgress, error)) { + if (!adTaskCacheManager.isRealtimeTaskChangeNeeded(detectorId, newState, initProgress, error)) { // If task not changed, no need to update, just return listener.onResponse(null); return; @@ -3091,5 +3091,4 @@ public void maintainRunningRealtimeTasks() { } } } - } diff --git a/src/test/java/org/opensearch/ad/AbstractADTest.java b/src/test/java/org/opensearch/ad/AbstractADTest.java index 42a0e6f1c..3d27d89fd 100644 --- a/src/test/java/org/opensearch/ad/AbstractADTest.java +++ b/src/test/java/org/opensearch/ad/AbstractADTest.java @@ -24,17 +24,20 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.Logger; import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; import org.apache.logging.log4j.core.layout.PatternLayout; import org.apache.logging.log4j.util.StackLocatorUtil; import org.opensearch.Version; @@ -79,8 +82,25 @@ public class AbstractADTest extends OpenSearchTestCase { * */ protected class TestAppender extends AbstractAppender { + private static final String EXCEPTION_CLASS = "exception_class"; + private static final String EXCEPTION_MSG = "exception_message"; + private static final String EXCEPTION_STACK_TRACE = "stacktrace"; + + Map, Map> exceptions; + // whether record exception and its stack trace or not. + // If you log(msg, exception), by default we won't record exception and its stack trace. + boolean recordExceptions; + protected TestAppender(String name) { - super(name, null, PatternLayout.createDefaultLayout(), true); + this(name, false); + } + + protected TestAppender(String name, boolean recordExceptions) { + super(name, null, PatternLayout.createDefaultLayout(), true, Property.EMPTY_ARRAY); + this.recordExceptions = recordExceptions; + if (recordExceptions) { + exceptions = new HashMap, Map>(); + } } public List messages = new ArrayList(); @@ -134,9 +154,47 @@ public int countMessage(String msg) { return countMessage(msg, false); } + public Boolean containExceptionClass(Class throwable, String className) { + Map throwableInformation = exceptions.get(throwable); + return Optional.ofNullable(throwableInformation).map(m -> m.get(EXCEPTION_CLASS)).map(s -> s.equals(className)).orElse(false); + } + + public Boolean containExceptionMsg(Class throwable, String msg) { + Map throwableInformation = exceptions.get(throwable); + return Optional + .ofNullable(throwableInformation) + .map(m -> m.get(EXCEPTION_MSG)) + .map(s -> ((String) s).contains(msg)) + .orElse(false); + } + + public Boolean containExceptionTrace(Class throwable, String traceElement) { + Map throwableInformation = exceptions.get(throwable); + return Optional + .ofNullable(throwableInformation) + .map(m -> m.get(EXCEPTION_STACK_TRACE)) + .map(s -> ((String) s).contains(traceElement)) + .orElse(false); + } + @Override public void append(LogEvent event) { messages.add(event.getMessage().getFormattedMessage()); + if (recordExceptions && event.getThrown() != null) { + Map throwableInformation = new HashMap(); + final Throwable throwable = event.getThrown(); + if (throwable.getClass().getCanonicalName() != null) { + throwableInformation.put(EXCEPTION_CLASS, throwable.getClass().getCanonicalName()); + } + if (throwable.getMessage() != null) { + throwableInformation.put(EXCEPTION_MSG, throwable.getMessage()); + } + if (throwable.getMessage() != null) { + StringBuilder stackTrace = new StringBuilder(ExceptionUtils.getStackTrace(throwable)); + throwableInformation.put(EXCEPTION_STACK_TRACE, stackTrace.toString()); + } + exceptions.put(throwable.getClass(), throwableInformation); + } } /** @@ -160,15 +218,19 @@ private String convertToRegex(String formattedStr) { /** * Set up test with junit that a warning was logged with log4j */ - protected void setUpLog4jForJUnit(Class cls) { + protected void setUpLog4jForJUnit(Class cls, boolean recordExceptions) { String loggerName = toLoggerName(callerClass(cls)); logger = (Logger) LogManager.getLogger(loggerName); Loggers.setLevel(logger, Level.DEBUG); - testAppender = new TestAppender(loggerName); + testAppender = new TestAppender(loggerName, recordExceptions); testAppender.start(); logger.addAppender(testAppender); } + protected void setUpLog4jForJUnit(Class cls) { + setUpLog4jForJUnit(cls, false); + } + private static String toLoggerName(final Class cls) { String canonicalName = cls.getCanonicalName(); return canonicalName != null ? canonicalName : cls.getName(); diff --git a/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java b/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java index 0c3d35037..e9462e7a1 100644 --- a/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java +++ b/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java @@ -13,6 +13,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -22,8 +23,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; +import static org.opensearch.ad.settings.AnomalyDetectorSettings.NUM_MIN_SAMPLES; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Arrays; @@ -40,6 +43,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -48,19 +52,25 @@ import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.ad.common.exception.AnomalyDetectionException; import org.opensearch.ad.common.exception.EndRunException; +import org.opensearch.ad.constant.CommonName; import org.opensearch.ad.indices.AnomalyDetectionIndices; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyDetectorJob; import org.opensearch.ad.model.AnomalyResult; +import org.opensearch.ad.model.FeatureData; import org.opensearch.ad.model.IntervalTimeConfiguration; +import org.opensearch.ad.settings.AnomalyDetectorSettings; +import org.opensearch.ad.task.ADTaskCacheManager; import org.opensearch.ad.task.ADTaskManager; +import org.opensearch.ad.transport.AnomalyResultAction; +import org.opensearch.ad.transport.AnomalyResultResponse; import org.opensearch.ad.transport.handler.AnomalyIndexHandler; import org.opensearch.ad.util.ClientUtil; import org.opensearch.ad.util.DiscoveryNodeFilterer; -import org.opensearch.ad.util.IndexUtils; import org.opensearch.client.Client; -import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.settings.Settings; @@ -79,6 +89,8 @@ import org.opensearch.jobscheduler.spi.utils.LockService; import org.opensearch.threadpool.ThreadPool; +import com.google.common.collect.ImmutableList; + public class AnomalyDetectorJobRunnerTests extends AbstractADTest { @Mock @@ -114,9 +126,6 @@ public class AnomalyDetectorJobRunnerTests extends AbstractADTest { @Mock private ADTaskManager adTaskManager; - @Mock - private AnomalyDetectionIndices indexUtil; - private ExecuteADResultResponseRecorder recorder; @Mock @@ -124,6 +133,12 @@ public class AnomalyDetectorJobRunnerTests extends AbstractADTest { private AnomalyDetector detector; + @Mock + private ADTaskCacheManager adTaskCacheManager; + + @Mock + private NodeStateManager nodeStateManager; + @BeforeClass public static void setUpBeforeClass() { setUpThreadPool(AnomalyDetectorJobRunnerTests.class.getSimpleName()); @@ -161,10 +176,8 @@ public void setup() throws Exception { runner.setSettings(settings); AnomalyDetectionIndices anomalyDetectionIndices = mock(AnomalyDetectionIndices.class); - IndexNameExpressionResolver indexNameResolver = mock(IndexNameExpressionResolver.class); - IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameResolver); - runner.setAnomalyDetectionIndices(indexUtil); + runner.setAnomalyDetectionIndices(anomalyDetectionIndices); lockService = new LockService(client, clusterService); doReturn(lockService).when(context).getLockService(); @@ -204,17 +217,28 @@ public void setup() throws Exception { return null; }).when(client).index(any(), any()); - recorder = new ExecuteADResultResponseRecorder(indexUtil, anomalyResultHandler, adTaskManager, nodeFilter, threadPool, client); - runner.setExecuteADResultResponseRecorder(recorder); - detector = TestHelpers.randomAnomalyDetectorWithEmptyFeature(); + when(adTaskCacheManager.hasQueriedResultIndex(anyString())).thenReturn(false); - NodeStateManager stateManager = mock(NodeStateManager.class); + detector = TestHelpers.randomAnomalyDetectorWithEmptyFeature(); doAnswer(invocation -> { ActionListener> listener = invocation.getArgument(1); listener.onResponse(Optional.of(detector)); return null; - }).when(stateManager).getAnomalyDetector(any(String.class), any(ActionListener.class)); - runner.setNodeStateManager(stateManager); + }).when(nodeStateManager).getAnomalyDetector(any(String.class), any(ActionListener.class)); + runner.setNodeStateManager(nodeStateManager); + + recorder = new ExecuteADResultResponseRecorder( + anomalyDetectionIndices, + anomalyResultHandler, + adTaskManager, + nodeFilter, + threadPool, + client, + nodeStateManager, + adTaskCacheManager, + 32 + ); + runner.setExecuteADResultResponseRecorder(recorder); } @Rule @@ -499,4 +523,202 @@ private void setUpJobParameter() { when(jobParameter.getWindowDelay()).thenReturn(new IntervalTimeConfiguration(10, ChronoUnit.SECONDS)); } + /** + * Test updateLatestRealtimeTask.confirmTotalRCFUpdatesFound + * @throws InterruptedException + */ + public Instant confirmInitializedSetup() { + // clear the appender created in setUp before creating another association; otherwise + // we will have unexpected error (e.g., some appender does not record messages even + // though we have configured to do so). + super.tearDownLog4jForJUnit(); + setUpLog4jForJUnit(ExecuteADResultResponseRecorder.class, true); + Schedule schedule = mock(IntervalSchedule.class); + when(jobParameter.getSchedule()).thenReturn(schedule); + Instant executionStartTime = Instant.now(); + when(schedule.getNextExecutionTime(executionStartTime)).thenReturn(executionStartTime.plusSeconds(5)); + + AnomalyResultResponse response = new AnomalyResultResponse( + 4d, + 0.993, + 1.01, + Collections.singletonList(new FeatureData("123", "abc", 0d)), + randomAlphaOfLength(4), + // not fully initialized + Long.valueOf(AnomalyDetectorSettings.NUM_MIN_SAMPLES - 1), + randomLong(), + // not an HC detector + false, + randomInt(), + new double[] { randomDoubleBetween(0, 1.0, true), randomDoubleBetween(0, 1.0, true) }, + new double[] { randomDouble(), randomDouble() }, + new double[][] { new double[] { randomDouble(), randomDouble() } }, + new double[] { randomDouble() }, + randomDoubleBetween(1.1, 10.0, true) + ); + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(2); + listener.onResponse(response); + return null; + }).when(client).execute(eq(AnomalyResultAction.INSTANCE), any(), any()); + return executionStartTime; + } + + @SuppressWarnings("unchecked") + public void testFailtoFindDetector() { + Instant executionStartTime = confirmInitializedSetup(); + + doAnswer(invocation -> { + ActionListener> listener = invocation.getArgument(1); + listener.onFailure(new RuntimeException()); + return null; + }).when(nodeStateManager).getAnomalyDetector(any(String.class), any(ActionListener.class)); + + LockModel lock = new LockModel(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + + verify(client, times(1)).execute(eq(AnomalyResultAction.INSTANCE), any(), any()); + verify(adTaskCacheManager, times(1)).hasQueriedResultIndex(anyString()); + verify(nodeStateManager, times(1)).getAnomalyDetector(any(String.class), any(ActionListener.class)); + verify(nodeStateManager, times(0)).getAnomalyDetectorJob(any(String.class), any(ActionListener.class)); + verify(adTaskManager, times(1)).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + assertEquals(1, testAppender.countMessage("Fail to confirm rcf update")); + assertTrue(testAppender.containExceptionMsg(AnomalyDetectionException.class, "fail to get detector")); + } + + @SuppressWarnings("unchecked") + public void testFailtoFindJob() { + Instant executionStartTime = confirmInitializedSetup(); + + doAnswer(invocation -> { + ActionListener> listener = invocation.getArgument(1); + listener.onResponse(Optional.of(detector)); + return null; + }).when(nodeStateManager).getAnomalyDetector(any(String.class), any(ActionListener.class)); + + doAnswer(invocation -> { + ActionListener> listener = invocation.getArgument(1); + listener.onFailure(new RuntimeException()); + return null; + }).when(nodeStateManager).getAnomalyDetectorJob(any(String.class), any(ActionListener.class)); + + LockModel lock = new LockModel(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + + verify(client, times(1)).execute(eq(AnomalyResultAction.INSTANCE), any(), any()); + verify(adTaskCacheManager, times(1)).hasQueriedResultIndex(anyString()); + verify(nodeStateManager, times(1)).getAnomalyDetector(any(String.class), any(ActionListener.class)); + verify(nodeStateManager, times(1)).getAnomalyDetectorJob(any(String.class), any(ActionListener.class)); + verify(adTaskManager, times(1)).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + assertEquals(1, testAppender.countMessage("Fail to confirm rcf update")); + assertTrue(testAppender.containExceptionMsg(AnomalyDetectionException.class, "fail to get job")); + } + + @SuppressWarnings("unchecked") + public void testEmptyDetector() { + Instant executionStartTime = confirmInitializedSetup(); + + doAnswer(invocation -> { + ActionListener> listener = invocation.getArgument(1); + listener.onResponse(Optional.empty()); + return null; + }).when(nodeStateManager).getAnomalyDetector(any(String.class), any(ActionListener.class)); + + LockModel lock = new LockModel(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + + verify(client, times(1)).execute(eq(AnomalyResultAction.INSTANCE), any(), any()); + verify(adTaskCacheManager, times(1)).hasQueriedResultIndex(anyString()); + verify(nodeStateManager, times(1)).getAnomalyDetector(any(String.class), any(ActionListener.class)); + verify(nodeStateManager, times(0)).getAnomalyDetectorJob(any(String.class), any(ActionListener.class)); + verify(adTaskManager, times(1)).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + assertEquals(1, testAppender.countMessage("Fail to confirm rcf update")); + assertTrue(testAppender.containExceptionMsg(AnomalyDetectionException.class, "fail to get detector")); + } + + @SuppressWarnings("unchecked") + public void testEmptyJob() { + Instant executionStartTime = confirmInitializedSetup(); + + doAnswer(invocation -> { + ActionListener> listener = invocation.getArgument(1); + listener.onResponse(Optional.of(detector)); + return null; + }).when(nodeStateManager).getAnomalyDetector(any(String.class), any(ActionListener.class)); + + doAnswer(invocation -> { + ActionListener> listener = invocation.getArgument(1); + listener.onResponse(Optional.empty()); + return null; + }).when(nodeStateManager).getAnomalyDetectorJob(any(String.class), any(ActionListener.class)); + + LockModel lock = new LockModel(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + + verify(client, times(1)).execute(eq(AnomalyResultAction.INSTANCE), any(), any()); + verify(adTaskCacheManager, times(1)).hasQueriedResultIndex(anyString()); + verify(nodeStateManager, times(1)).getAnomalyDetector(any(String.class), any(ActionListener.class)); + verify(nodeStateManager, times(1)).getAnomalyDetectorJob(any(String.class), any(ActionListener.class)); + verify(adTaskManager, times(1)).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + assertEquals(1, testAppender.countMessage("Fail to confirm rcf update")); + assertTrue(testAppender.containExceptionMsg(AnomalyDetectionException.class, "fail to get job")); + } + + @SuppressWarnings("unchecked") + public void testMarkResultIndexQueried() throws IOException { + detector = TestHelpers.AnomalyDetectorBuilder + .newInstance() + .setDetectionInterval(new IntervalTimeConfiguration(1, ChronoUnit.MINUTES)) + .setCategoryFields(ImmutableList.of(randomAlphaOfLength(5))) + .setResultIndex(CommonName.CUSTOM_RESULT_INDEX_PREFIX + "index") + .build(); + Instant executionStartTime = confirmInitializedSetup(); + + doAnswer(invocation -> { + ActionListener> listener = invocation.getArgument(1); + listener.onResponse(Optional.of(detector)); + return null; + }).when(nodeStateManager).getAnomalyDetector(any(String.class), any(ActionListener.class)); + + doAnswer(invocation -> { + ActionListener> listener = invocation.getArgument(1); + listener.onResponse(Optional.of(TestHelpers.randomAnomalyDetectorJob(true, Instant.ofEpochMilli(1602401500000L), null))); + return null; + }).when(nodeStateManager).getAnomalyDetectorJob(any(String.class), any(ActionListener.class)); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + + ActionListener listener = (ActionListener) args[1]; + + SearchResponse mockResponse = mock(SearchResponse.class); + int totalHits = 1001; + when(mockResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); + + listener.onResponse(mockResponse); + + return null; + }).when(client).search(any(), any(ActionListener.class)); + + LockModel lock = new LockModel(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + + verify(client, times(1)).execute(eq(AnomalyResultAction.INSTANCE), any(), any()); + verify(client, times(1)).search(any(), any()); + verify(adTaskCacheManager, times(1)).hasQueriedResultIndex(anyString()); + verify(nodeStateManager, times(1)).getAnomalyDetector(any(String.class), any(ActionListener.class)); + verify(nodeStateManager, times(1)).getAnomalyDetectorJob(any(String.class), any(ActionListener.class)); + + ArgumentCaptor totalUpdates = ArgumentCaptor.forClass(Long.class); + verify(adTaskManager, times(1)) + .updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), totalUpdates.capture(), any(), any(), any()); + assertEquals(NUM_MIN_SAMPLES, totalUpdates.getValue().longValue()); + + verify(adTaskCacheManager, times(1)).markResultIndexQueried(anyString()); + } } diff --git a/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java b/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java index c0366e95a..d2846940c 100644 --- a/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java +++ b/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java @@ -35,6 +35,7 @@ import org.opensearch.action.index.IndexResponse; import org.opensearch.action.update.UpdateResponse; import org.opensearch.ad.ExecuteADResultResponseRecorder; +import org.opensearch.ad.NodeStateManager; import org.opensearch.ad.TestHelpers; import org.opensearch.ad.common.exception.ResourceNotFoundException; import org.opensearch.ad.constant.CommonErrorMessages; @@ -44,6 +45,7 @@ import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyResult; import org.opensearch.ad.model.Feature; +import org.opensearch.ad.task.ADTaskCacheManager; import org.opensearch.ad.task.ADTaskManager; import org.opensearch.ad.transport.AnomalyDetectorJobResponse; import org.opensearch.ad.transport.AnomalyResultAction; @@ -83,6 +85,8 @@ public class IndexAnomalyDetectorJobActionHandlerTests extends OpenSearchTestCas private Client client; private IndexAnomalyDetectorJobActionHandler handler; private AnomalyIndexHandler anomalyResultHandler; + private NodeStateManager nodeStateManager; + private ADTaskCacheManager adTaskCacheManager; @BeforeClass public static void setOnce() throws IOException { @@ -153,13 +157,21 @@ public void setUp() throws Exception { anomalyResultHandler = mock(AnomalyIndexHandler.class); + nodeStateManager = mock(NodeStateManager.class); + + adTaskCacheManager = mock(ADTaskCacheManager.class); + when(adTaskCacheManager.hasQueriedResultIndex(anyString())).thenReturn(true); + recorder = new ExecuteADResultResponseRecorder( anomalyDetectionIndices, anomalyResultHandler, adTaskManager, nodeFilter, threadPool, - client + client, + nodeStateManager, + adTaskCacheManager, + 32 ); handler = new IndexAnomalyDetectorJobActionHandler( @@ -318,6 +330,7 @@ public void testUpdateLatestRealtimeTaskOnCoordinatingException() { verify(adTaskManager, times(1)).isHCRealtimeTaskStartInitializing(anyString()); verify(adTaskManager, times(1)).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); verify(adTaskManager, never()).removeRealtimeTaskCache(anyString()); + verify(adTaskManager, times(1)).skipUpdateHCRealtimeTask(anyString(), anyString()); verify(threadPool, never()).schedule(any(), any(), any()); verify(listener, times(1)).onResponse(any()); } diff --git a/src/test/java/org/opensearch/ad/task/ADTaskCacheManagerTests.java b/src/test/java/org/opensearch/ad/task/ADTaskCacheManagerTests.java index c99314608..6f5111566 100644 --- a/src/test/java/org/opensearch/ad/task/ADTaskCacheManagerTests.java +++ b/src/test/java/org/opensearch/ad/task/ADTaskCacheManagerTests.java @@ -313,12 +313,12 @@ public void testRealtimeTaskCache() { String newState = ADTaskState.INIT.name(); Float newInitProgress = 0.0f; String newError = randomAlphaOfLength(5); - assertTrue(adTaskCacheManager.isRealtimeTaskChanged(detectorId1, newState, newInitProgress, newError)); + assertTrue(adTaskCacheManager.isRealtimeTaskChangeNeeded(detectorId1, newState, newInitProgress, newError)); // Init realtime task cache. adTaskCacheManager.initRealtimeTaskCache(detectorId1, 60_000); adTaskCacheManager.updateRealtimeTaskCache(detectorId1, newState, newInitProgress, newError); - assertFalse(adTaskCacheManager.isRealtimeTaskChanged(detectorId1, newState, newInitProgress, newError)); + assertFalse(adTaskCacheManager.isRealtimeTaskChangeNeeded(detectorId1, newState, newInitProgress, newError)); assertArrayEquals(new String[] { detectorId1 }, adTaskCacheManager.getDetectorIdsInRealtimeTaskCache()); String detectorId2 = randomAlphaOfLength(10); @@ -331,7 +331,7 @@ public void testRealtimeTaskCache() { newState = ADTaskState.RUNNING.name(); newInitProgress = 1.0f; newError = "test error"; - assertTrue(adTaskCacheManager.isRealtimeTaskChanged(detectorId1, newState, newInitProgress, newError)); + assertTrue(adTaskCacheManager.isRealtimeTaskChangeNeeded(detectorId1, newState, newInitProgress, newError)); adTaskCacheManager.updateRealtimeTaskCache(detectorId1, newState, newInitProgress, newError); assertEquals(newInitProgress, adTaskCacheManager.getRealtimeTaskCache(detectorId1).getInitProgress()); assertEquals(newState, adTaskCacheManager.getRealtimeTaskCache(detectorId1).getState()); diff --git a/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java b/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java index 7f67b8155..3643c7b4f 100644 --- a/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java +++ b/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java @@ -720,7 +720,7 @@ public void testUpdateLatestRealtimeTaskOnCoordinatingNode() { String error = randomAlphaOfLength(5); ActionListener actionListener = mock(ActionListener.class); doReturn(node1).when(clusterService).localNode(); - when(adTaskCacheManager.isRealtimeTaskChanged(anyString(), anyString(), anyFloat(), anyString())).thenReturn(true); + when(adTaskCacheManager.isRealtimeTaskChangeNeeded(anyString(), anyString(), anyFloat(), anyString())).thenReturn(true); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(3); listener.onResponse(new UpdateResponse(ShardId.fromString("[test][1]"), "1", 0L, 1L, 1L, DocWriteResponse.Result.UPDATED));