From 8ce449345e43dd34c7891b9f7d5835c3f242f27f Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Fri, 16 Dec 2022 10:30:51 -0800 Subject: [PATCH] Speed up cold start (#753) (#763) If historical data is enough, a single stream detector takes 1 interval for cold start to be triggered + 1 interval for the state document to be updated. Similar to single stream detectors, HCAD cold start needs 2 intervals and one more interval to make sure an entity appears more than once. So HCAD needs three intervals to complete cold starts. Long initialization is the single most complained problem of AD. This PR reduces both single stream and HCAD detectors' initialization time to 1 minute by * delaying real time cache update by one minute when we receive ResourceNotFoundException in single stream detectors or when the init progress of HCAD real time cache is 0. Thus, we can finish the cold start and writing checkpoint one minute later and update the state document accordingly. This optimization saves one interval to wait for the state document update. * disable the door keeper by default so that we won't have to wait an extra interval in HCAD. * trigger cold start when starting a real time detector. This optimization saves one interval to wait for the cold start to be triggered. Testing done: * verified the cold start time is reduced to 1 minute. * added tests for new code. Signed-off-by: Kaituo Li --- build.gradle | 8 +- .../ad/AnomalyDetectorJobRunner.java | 373 +++++++++--------- .../opensearch/ad/AnomalyDetectorPlugin.java | 48 ++- .../ad/ExecuteADResultResponseRecorder.java | 288 ++++++++++++++ .../opensearch/ad/caching/PriorityCache.java | 49 +-- .../opensearch/ad/model/AnomalyDetector.java | 1 + .../ad/ratelimit/EntityColdStartWorker.java | 49 ++- .../RestExecuteAnomalyDetectorAction.java | 1 - .../IndexAnomalyDetectorJobActionHandler.java | 122 ++++-- .../ad/settings/EnabledSetting.java | 20 +- .../org/opensearch/ad/task/ADTaskManager.java | 11 +- .../AnomalyDetectorJobTransportAction.java | 10 +- .../AnomalyResultTransportAction.java | 2 +- .../ad/AnomalyDetectorJobRunnerTests.java | 84 +++- .../ad/caching/PriorityCacheTests.java | 36 +- .../ad/e2e/DetectionResultEvalutationIT.java | 25 +- .../ad/ml/EntityColdStarterTests.java | 1 - ...alyDetectorJobTransportActionWithUser.java | 10 +- .../ratelimit/EntityColdStartWorkerTests.java | 29 +- ...xAnomalyDetectorJobActionHandlerTests.java | 354 +++++++++++++++++ .../AnomalyDetectorJobActionTests.java | 4 +- 21 files changed, 1193 insertions(+), 332 deletions(-) create mode 100644 src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java create mode 100644 src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java diff --git a/build.gradle b/build.gradle index c66674c16..594fd7813 100644 --- a/build.gradle +++ b/build.gradle @@ -134,7 +134,7 @@ configurations.all { if (it.state != Configuration.State.UNRESOLVED) return resolutionStrategy { force "joda-time:joda-time:${versions.joda}" - force "com.fasterxml.jackson.core:jackson-core:2.14.0" + force "com.fasterxml.jackson.core:jackson-core:2.14.1" force "commons-logging:commons-logging:${versions.commonslogging}" force "org.apache.httpcomponents:httpcore:${versions.httpcore}" force "commons-codec:commons-codec:${versions.commonscodec}" @@ -677,9 +677,9 @@ dependencies { implementation 'software.amazon.randomcutforest:randomcutforest-core:3.0-rc3' // force Jackson version to avoid version conflict issue - implementation "com.fasterxml.jackson.core:jackson-core:2.14.0" - implementation "com.fasterxml.jackson.core:jackson-databind:2.14.0" - implementation "com.fasterxml.jackson.core:jackson-annotations:2.14.0" + implementation "com.fasterxml.jackson.core:jackson-core:2.14.1" + implementation "com.fasterxml.jackson.core:jackson-databind:2.14.1" + implementation "com.fasterxml.jackson.core:jackson-annotations:2.14.1" // used for serializing/deserializing rcf models. implementation group: 'io.protostuff', name: 'protostuff-core', version: '1.8.0' diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java index 655267dcf..aa0ce8075 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java @@ -14,21 +14,18 @@ import static org.opensearch.action.DocWriteResponse.Result.CREATED; import static org.opensearch.action.DocWriteResponse.Result.UPDATED; import static org.opensearch.ad.AnomalyDetectorPlugin.AD_THREAD_POOL_NAME; -import static org.opensearch.ad.constant.CommonErrorMessages.CAN_NOT_FIND_LATEST_TASK; import static org.opensearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE; import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import java.io.IOException; import java.time.Instant; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; @@ -37,15 +34,10 @@ import org.opensearch.ad.common.exception.AnomalyDetectionException; import org.opensearch.ad.common.exception.EndRunException; import org.opensearch.ad.common.exception.InternalFailure; -import org.opensearch.ad.common.exception.ResourceNotFoundException; -import org.opensearch.ad.indices.ADIndex; import org.opensearch.ad.indices.AnomalyDetectionIndices; import org.opensearch.ad.model.ADTaskState; +import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyDetectorJob; -import org.opensearch.ad.model.AnomalyResult; -import org.opensearch.ad.model.DetectorProfileName; -import org.opensearch.ad.model.FeatureData; -import org.opensearch.ad.model.IntervalTimeConfiguration; import org.opensearch.ad.rest.handler.AnomalyDetectorFunction; import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.ad.task.ADTaskManager; @@ -53,12 +45,7 @@ import org.opensearch.ad.transport.AnomalyResultRequest; import org.opensearch.ad.transport.AnomalyResultResponse; import org.opensearch.ad.transport.AnomalyResultTransportAction; -import org.opensearch.ad.transport.ProfileAction; -import org.opensearch.ad.transport.ProfileRequest; -import org.opensearch.ad.transport.handler.AnomalyIndexHandler; -import org.opensearch.ad.util.DiscoveryNodeFilterer; import org.opensearch.client.Client; -import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.NamedXContentRegistry; @@ -66,7 +53,6 @@ import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; import org.opensearch.commons.InjectSecurity; -import org.opensearch.commons.authuser.User; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; @@ -88,11 +74,11 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner { private int maxRetryForEndRunException; private Client client; private ThreadPool threadPool; - private AnomalyIndexHandler anomalyResultHandler; private ConcurrentHashMap detectorEndRunExceptionCount; private AnomalyDetectionIndices anomalyDetectionIndices; - private DiscoveryNodeFilterer nodeFilter; private ADTaskManager adTaskManager; + private NodeStateManager nodeStateManager; + private ExecuteADResultResponseRecorder recorder; public static AnomalyDetectorJobRunner getJobRunnerInstance() { if (INSTANCE != null) { @@ -120,10 +106,6 @@ public void setThreadPool(ThreadPool threadPool) { this.threadPool = threadPool; } - public void setAnomalyResultHandler(AnomalyIndexHandler anomalyResultHandler) { - this.anomalyResultHandler = anomalyResultHandler; - } - public void setSettings(Settings settings) { this.settings = settings; this.maxRetryForEndRunException = AnomalyDetectorSettings.MAX_RETRY_FOR_END_RUN_EXCEPTION.get(settings); @@ -137,8 +119,12 @@ public void setAnomalyDetectionIndices(AnomalyDetectionIndices anomalyDetectionI this.anomalyDetectionIndices = anomalyDetectionIndices; } - public void setNodeFilter(DiscoveryNodeFilterer nodeFilter) { - this.nodeFilter = nodeFilter; + public void setNodeStateManager(NodeStateManager nodeStateManager) { + this.nodeStateManager = nodeStateManager; + } + + public void setExecuteADResultResponseRecorder(ExecuteADResultResponseRecorder recorder) { + this.recorder = recorder; } @Override @@ -159,28 +145,49 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont final LockService lockService = context.getLockService(); Runnable runnable = () -> { - if (jobParameter.getLockDurationSeconds() != null) { - lockService - .acquireLock( - jobParameter, - context, - ActionListener - .wrap(lock -> runAdJob(jobParameter, lockService, lock, detectionStartTime, executionStartTime), exception -> { - indexAnomalyResultException( - jobParameter, - lockService, - null, - detectionStartTime, - executionStartTime, - exception, - false - ); - throw new IllegalStateException("Failed to acquire lock for AD job: " + detectorId); - }) - ); - } else { - log.warn("Can't get lock for AD job: " + detectorId); - } + nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> { + if (!detectorOptional.isPresent()) { + log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId)); + return; + } + AnomalyDetector detector = detectorOptional.get(); + + if (jobParameter.getLockDurationSeconds() != null) { + lockService + .acquireLock( + jobParameter, + context, + ActionListener + .wrap( + lock -> runAdJob( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + recorder, + detector + ), + exception -> { + indexAnomalyResultException( + jobParameter, + lockService, + null, + detectionStartTime, + executionStartTime, + exception, + false, + recorder, + detector + ); + throw new IllegalStateException("Failed to acquire lock for AD job: " + detectorId); + } + ) + ); + } else { + log.warn("Can't get lock for AD job: " + detectorId); + } + }, e -> log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId), e))); }; ExecutorService executor = threadPool.executor(AD_THREAD_POOL_NAME); @@ -195,13 +202,17 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont * @param lock lock to run job * @param detectionStartTime detection start time * @param executionStartTime detection end time + * @param recorder utility to record job execution result + * @param detector associated detector accessor */ protected void runAdJob( AnomalyDetectorJob jobParameter, LockService lockService, LockModel lock, Instant detectionStartTime, - Instant executionStartTime + Instant executionStartTime, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { String detectorId = jobParameter.getName(); if (lock == null) { @@ -212,7 +223,9 @@ protected void runAdJob( detectionStartTime, executionStartTime, "Can't run AD job due to null lock", - false + false, + recorder, + detector ); return; } @@ -242,16 +255,38 @@ protected void runAdJob( } String resultIndex = jobParameter.getResultIndex(); if (resultIndex == null) { - runAnomalyDetectionJob(jobParameter, lockService, lock, detectionStartTime, executionStartTime, detectorId, user, roles); + runAnomalyDetectionJob( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + detectorId, + user, + roles, + recorder, + detector + ); return; } ActionListener listener = ActionListener.wrap(r -> { log.debug("Custom index is valid"); }, e -> { Exception exception = new EndRunException(detectorId, e.getMessage(), true); - handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, exception); + handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, exception, recorder, detector); }); anomalyDetectionIndices.validateCustomIndexForBackendJob(resultIndex, detectorId, user, roles, () -> { listener.onResponse(true); - runAnomalyDetectionJob(jobParameter, lockService, lock, detectionStartTime, executionStartTime, detectorId, user, roles); + runAnomalyDetectionJob( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + detectorId, + user, + roles, + recorder, + detector + ); }, listener); } @@ -263,7 +298,9 @@ private void runAnomalyDetectionJob( Instant executionStartTime, String detectorId, String user, - List roles + List roles, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { try (InjectSecurity injectSecurity = new InjectSecurity(detectorId, settings, client.threadPool().getThreadContext())) { @@ -282,15 +319,43 @@ private void runAnomalyDetectionJob( ActionListener .wrap( response -> { - indexAnomalyResult(jobParameter, lockService, lock, detectionStartTime, executionStartTime, response); + indexAnomalyResult( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + response, + recorder, + detector + ); }, exception -> { - handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, exception); + handleAdException( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + exception, + recorder, + detector + ); } ) ); } catch (Exception e) { - indexAnomalyResultException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, e, true); + indexAnomalyResultException( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + e, + true, + recorder, + detector + ); log.error("Failed to execute AD job " + detectorId, e); } } @@ -331,6 +396,8 @@ private void runAnomalyDetectionJob( * @param detectionStartTime detection start time * @param executionStartTime detection end time * @param exception exception + * @param recorder utility to record job execution result + * @param detector associated detector accessor */ protected void handleAdException( AnomalyDetectorJob jobParameter, @@ -338,7 +405,9 @@ protected void handleAdException( LockModel lock, Instant detectionStartTime, Instant executionStartTime, - Exception exception + Exception exception, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { String detectorId = jobParameter.getName(); if (exception instanceof EndRunException) { @@ -353,7 +422,9 @@ protected void handleAdException( lock, detectionStartTime, executionStartTime, - (EndRunException) exception + (EndRunException) exception, + recorder, + detector ); } else { detectorEndRunExceptionCount.compute(detectorId, (k, v) -> { @@ -378,7 +449,9 @@ protected void handleAdException( lock, detectionStartTime, executionStartTime, - (EndRunException) exception + (EndRunException) exception, + recorder, + detector ); return; } @@ -389,7 +462,9 @@ protected void handleAdException( detectionStartTime, executionStartTime, exception.getMessage(), - true + true, + recorder, + detector ); } } else { @@ -399,7 +474,17 @@ protected void handleAdException( } else { log.error("Failed to execute anomaly result action for " + detectorId, exception); } - indexAnomalyResultException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, exception, true); + indexAnomalyResultException( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + exception, + true, + recorder, + detector + ); } } @@ -409,7 +494,9 @@ private void stopAdJobForEndRunException( LockModel lock, Instant detectionStartTime, Instant executionStartTime, - EndRunException exception + EndRunException exception, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { String detectorId = jobParameter.getName(); detectorEndRunExceptionCount.remove(detectorId); @@ -427,7 +514,9 @@ private void stopAdJobForEndRunException( executionStartTime, error, true, - ADTaskState.STOPPED.name() + ADTaskState.STOPPED.name(), + recorder, + detector ) ); } @@ -491,81 +580,22 @@ private void indexAnomalyResult( LockModel lock, Instant detectionStartTime, Instant executionStartTime, - AnomalyResultResponse response + AnomalyResultResponse response, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { String detectorId = jobParameter.getName(); detectorEndRunExceptionCount.remove(detectorId); try { - // skipping writing to the result index if not necessary - // For a single-entity detector, the result is not useful if error is null - // and rcf score (thus anomaly grade/confidence) is null. - // For a HCAD detector, we don't need to save on the detector level. - // We return 0 or Double.NaN rcf score if there is no error. - if ((response.getAnomalyScore() <= 0 || Double.isNaN(response.getAnomalyScore())) && response.getError() == null) { - updateRealtimeTask(response, detectorId); - return; - } - IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) jobParameter.getWindowDelay(); - Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); - Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); - User user = jobParameter.getUser(); - - if (response.getError() != null) { - log.info("Anomaly result action run successfully for {} with error {}", detectorId, response.getError()); - } - - AnomalyResult anomalyResult = response - .toAnomalyResult( - detectorId, - dataStartTime, - dataEndTime, - executionStartTime, - Instant.now(), - anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT), - user, - response.getError() - ); - - String resultIndex = jobParameter.getResultIndex(); - anomalyResultHandler.index(anomalyResult, detectorId, resultIndex); - updateRealtimeTask(response, detectorId); + recorder.indexAnomalyResult(detectionStartTime, executionStartTime, response, detector); } catch (EndRunException e) { - handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, e); + handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, e, recorder, detector); } catch (Exception e) { log.error("Failed to index anomaly result for " + detectorId, e); } finally { releaseLock(jobParameter, lockService, lock); } - } - private void updateRealtimeTask(AnomalyResultResponse response, String detectorId) { - if (response.isHCDetector() != null - && response.isHCDetector() - && !adTaskManager.skipUpdateHCRealtimeTask(detectorId, response.getError())) { - DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes(); - Set profiles = new HashSet<>(); - profiles.add(DetectorProfileName.INIT_PROGRESS); - ProfileRequest profileRequest = new ProfileRequest(detectorId, profiles, true, dataNodes); - client.execute(ProfileAction.INSTANCE, profileRequest, ActionListener.wrap(r -> { - log.debug("Update latest realtime task for HC detector {}, total updates: {}", detectorId, r.getTotalUpdates()); - updateLatestRealtimeTask( - detectorId, - null, - r.getTotalUpdates(), - response.getDetectorIntervalInMinutes(), - response.getError() - ); - }, e -> { log.error("Failed to update latest realtime task for " + detectorId, e); })); - } else { - log.debug("Update latest realtime task for SINGLE detector {}, total updates: {}", detectorId, response.getRcfTotalUpdates()); - updateLatestRealtimeTask( - detectorId, - null, - response.getRcfTotalUpdates(), - response.getDetectorIntervalInMinutes(), - response.getError() - ); - } } private void indexAnomalyResultException( @@ -575,13 +605,25 @@ private void indexAnomalyResultException( Instant detectionStartTime, Instant executionStartTime, Exception exception, - boolean releaseLock + boolean releaseLock, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { try { String errorMessage = exception instanceof AnomalyDetectionException ? exception.getMessage() : Throwables.getStackTraceAsString(exception); - indexAnomalyResultException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, errorMessage, releaseLock); + indexAnomalyResultException( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + errorMessage, + releaseLock, + recorder, + detector + ); } catch (Exception e) { log.error("Failed to index anomaly result for " + jobParameter.getName(), e); } @@ -594,7 +636,9 @@ private void indexAnomalyResultException( Instant detectionStartTime, Instant executionStartTime, String errorMessage, - boolean releaseLock + boolean releaseLock, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { indexAnomalyResultException( jobParameter, @@ -604,7 +648,9 @@ private void indexAnomalyResultException( executionStartTime, errorMessage, releaseLock, - null + null, + recorder, + detector ); } @@ -616,40 +662,12 @@ private void indexAnomalyResultException( Instant executionStartTime, String errorMessage, boolean releaseLock, - String taskState + String taskState, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { - String detectorId = jobParameter.getName(); try { - IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) jobParameter.getWindowDelay(); - Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); - Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); - User user = jobParameter.getUser(); - - AnomalyResult anomalyResult = new AnomalyResult( - detectorId, - null, // no task id - new ArrayList(), - dataStartTime, - dataEndTime, - executionStartTime, - Instant.now(), - errorMessage, - null, // single-stream detectors have no entity - user, - anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT), - null // no model id - ); - String resultIndex = jobParameter.getResultIndex(); - if (resultIndex != null && !anomalyDetectionIndices.doesIndexExist(resultIndex)) { - // Set result index as null, will write exception to default result index. - anomalyResultHandler.index(anomalyResult, detectorId, null); - } else { - anomalyResultHandler.index(anomalyResult, detectorId, resultIndex); - } - - updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage); - } catch (Exception e) { - log.error("Failed to index anomaly result for " + detectorId, e); + recorder.indexAnomalyResultException(detectionStartTime, executionStartTime, errorMessage, taskState, detector); } finally { if (releaseLock) { releaseLock(jobParameter, lockService, lock); @@ -657,37 +675,6 @@ private void indexAnomalyResultException( } } - private void updateLatestRealtimeTask( - String detectorId, - String taskState, - Long rcfTotalUpdates, - Long detectorIntervalInMinutes, - String error - ) { - // Don't need info as this will be printed repeatedly in each interval - adTaskManager - .updateLatestRealtimeTaskOnCoordinatingNode( - detectorId, - taskState, - rcfTotalUpdates, - detectorIntervalInMinutes, - error, - ActionListener.wrap(r -> { - if (r != null) { - log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState); - } - }, e -> { - if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) { - // Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction. - log.error("Can't find latest realtime task of detector " + detectorId); - adTaskManager.removeRealtimeTaskCache(detectorId); - } else { - log.error("Failed to update latest realtime task for detector " + detectorId, e); - } - }) - ); - } - private void releaseLock(AnomalyDetectorJob jobParameter, LockService lockService, LockModel lock) { lockService .release( diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java b/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java index b21d11cc5..cace64e3f 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java @@ -234,11 +234,12 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip private ClientUtil clientUtil; private DiscoveryNodeFilterer nodeFilter; private IndexUtils indexUtils; - private ADTaskCacheManager adTaskCacheManager; private ADTaskManager adTaskManager; private ADBatchTaskRunner adBatchTaskRunner; // package private for testing GenericObjectPool serializeRCFBufferPool; + private NodeStateManager stateManager; + private ExecuteADResultResponseRecorder adResultResponseRecorder; static { SpecialPermission.check(); @@ -259,25 +260,14 @@ public List getRestHandlers( IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster ) { - AnomalyIndexHandler anomalyResultHandler = new AnomalyIndexHandler( - client, - settings, - threadPool, - CommonName.ANOMALY_RESULT_INDEX_ALIAS, - anomalyDetectionIndices, - this.clientUtil, - this.indexUtils, - clusterService - ); - AnomalyDetectorJobRunner jobRunner = AnomalyDetectorJobRunner.getJobRunnerInstance(); jobRunner.setClient(client); jobRunner.setThreadPool(threadPool); - jobRunner.setAnomalyResultHandler(anomalyResultHandler); jobRunner.setSettings(settings); jobRunner.setAnomalyDetectionIndices(anomalyDetectionIndices); - jobRunner.setNodeFilter(nodeFilter); jobRunner.setAdTaskManager(adTaskManager); + jobRunner.setNodeStateManager(stateManager); + jobRunner.setExecuteADResultResponseRecorder(adResultResponseRecorder); RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(); RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(settings, clusterService); @@ -383,7 +373,7 @@ public Collection createComponents( adCircuitBreakerService ); - NodeStateManager stateManager = new NodeStateManager( + stateManager = new NodeStateManager( client, xContentRegistry, settings, @@ -568,7 +558,8 @@ public PooledObject wrap(LinkedBuffer obj) { AnomalyDetectorSettings.QUEUE_MAINTENANCE, entityColdStarter, AnomalyDetectorSettings.HOURLY_MAINTENANCE, - stateManager + stateManager, + cacheProvider ); ModelManager modelManager = new ModelManager( @@ -714,7 +705,7 @@ public PooledObject wrap(LinkedBuffer obj) { anomalyDetectorRunner = new AnomalyDetectorRunner(modelManager, featureManager, AnomalyDetectorSettings.MAX_PREVIEW_RESULTS); - adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker); + ADTaskCacheManager adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker); adTaskManager = new ADTaskManager( settings, clusterService, @@ -754,6 +745,26 @@ public PooledObject wrap(LinkedBuffer obj) { ADSearchHandler adSearchHandler = new ADSearchHandler(settings, clusterService, client); + AnomalyIndexHandler anomalyResultHandler = new AnomalyIndexHandler( + client, + settings, + threadPool, + CommonName.ANOMALY_RESULT_INDEX_ALIAS, + anomalyDetectionIndices, + this.clientUtil, + this.indexUtils, + clusterService + ); + + adResultResponseRecorder = new ExecuteADResultResponseRecorder( + anomalyDetectionIndices, + anomalyResultHandler, + adTaskManager, + nodeFilter, + threadPool, + client + ); + // return objects used by Guice to inject dependencies for e.g., // transport action handler constructors return ImmutableList @@ -795,7 +806,8 @@ public PooledObject wrap(LinkedBuffer obj) { checkpointWriteQueue, coldEntityQueue, entityColdStarter, - adTaskCacheManager + adTaskCacheManager, + adResultResponseRecorder ); } diff --git a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java new file mode 100644 index 000000000..19710f0cb --- /dev/null +++ b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java @@ -0,0 +1,288 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad; + +import static org.opensearch.ad.constant.CommonErrorMessages.CAN_NOT_FIND_LATEST_TASK; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.ad.common.exception.EndRunException; +import org.opensearch.ad.common.exception.ResourceNotFoundException; +import org.opensearch.ad.constant.CommonErrorMessages; +import org.opensearch.ad.indices.ADIndex; +import org.opensearch.ad.indices.AnomalyDetectionIndices; +import org.opensearch.ad.model.AnomalyDetector; +import org.opensearch.ad.model.AnomalyResult; +import org.opensearch.ad.model.DetectorProfileName; +import org.opensearch.ad.model.FeatureData; +import org.opensearch.ad.model.IntervalTimeConfiguration; +import org.opensearch.ad.task.ADTaskManager; +import org.opensearch.ad.transport.AnomalyResultResponse; +import org.opensearch.ad.transport.ProfileAction; +import org.opensearch.ad.transport.ProfileRequest; +import org.opensearch.ad.transport.RCFPollingAction; +import org.opensearch.ad.transport.RCFPollingRequest; +import org.opensearch.ad.transport.handler.AnomalyIndexHandler; +import org.opensearch.ad.util.DiscoveryNodeFilterer; +import org.opensearch.client.Client; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.commons.authuser.User; +import org.opensearch.threadpool.ThreadPool; + +public class ExecuteADResultResponseRecorder { + private static final Logger log = LogManager.getLogger(ExecuteADResultResponseRecorder.class); + + private AnomalyDetectionIndices anomalyDetectionIndices; + private AnomalyIndexHandler anomalyResultHandler; + private ADTaskManager adTaskManager; + private DiscoveryNodeFilterer nodeFilter; + private ThreadPool threadPool; + private Client client; + + public ExecuteADResultResponseRecorder( + AnomalyDetectionIndices anomalyDetectionIndices, + AnomalyIndexHandler anomalyResultHandler, + ADTaskManager adTaskManager, + DiscoveryNodeFilterer nodeFilter, + ThreadPool threadPool, + Client client + ) { + this.anomalyDetectionIndices = anomalyDetectionIndices; + this.anomalyResultHandler = anomalyResultHandler; + this.adTaskManager = adTaskManager; + this.nodeFilter = nodeFilter; + this.threadPool = threadPool; + this.client = client; + } + + public void indexAnomalyResult( + Instant detectionStartTime, + Instant executionStartTime, + AnomalyResultResponse response, + AnomalyDetector detector + ) { + String detectorId = detector.getDetectorId(); + try { + // skipping writing to the result index if not necessary + // For a single-entity detector, the result is not useful if error is null + // and rcf score (thus anomaly grade/confidence) is null. + // For a HCAD detector, we don't need to save on the detector level. + // We return 0 or Double.NaN rcf score if there is no error. + if ((response.getAnomalyScore() <= 0 || Double.isNaN(response.getAnomalyScore())) && response.getError() == null) { + updateRealtimeTask(response, detectorId); + return; + } + IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) detector.getWindowDelay(); + Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); + Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); + User user = detector.getUser(); + + if (response.getError() != null) { + log.info("Anomaly result action run successfully for {} with error {}", detectorId, response.getError()); + } + + AnomalyResult anomalyResult = response + .toAnomalyResult( + detectorId, + dataStartTime, + dataEndTime, + executionStartTime, + Instant.now(), + anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT), + user, + response.getError() + ); + + String resultIndex = detector.getResultIndex(); + anomalyResultHandler.index(anomalyResult, detectorId, resultIndex); + updateRealtimeTask(response, detectorId); + } catch (EndRunException e) { + throw e; + } catch (Exception e) { + log.error("Failed to index anomaly result for " + detectorId, e); + } + } + + /** + * Update real time task (one document per detector in state index). If the real-time task has no changes compared with local cache, + * the task won't update. Task only updates when the state changed, or any error happened, or AD job stopped. Task is mainly consumed + * by the front-end to track detector status. For single-stream detectors, we embed model total updates in AnomalyResultResponse and + * update state accordingly. For HCAD, we won't wait for model finishing updating before returning a response to the job scheduler + * since it might be long before all entities finish execution. So we don't embed model total updates in AnomalyResultResponse. + * Instead, we issue a profile request to poll each model node and get the maximum total updates among all models. + * @param response response returned from executing AnomalyResultAction + * @param detectorId Detector Id + */ + private void updateRealtimeTask(AnomalyResultResponse response, String detectorId) { + if (response.isHCDetector() != null && response.isHCDetector()) { + if (adTaskManager.skipUpdateHCRealtimeTask(detectorId, response.getError())) { + return; + } + DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes(); + Set profiles = new HashSet<>(); + profiles.add(DetectorProfileName.INIT_PROGRESS); + ProfileRequest profileRequest = new ProfileRequest(detectorId, profiles, true, dataNodes); + Runnable profileHCInitProgress = () -> { + client.execute(ProfileAction.INSTANCE, profileRequest, ActionListener.wrap(r -> { + log.debug("Update latest realtime task for HC detector {}, total updates: {}", detectorId, r.getTotalUpdates()); + updateLatestRealtimeTask( + detectorId, + null, + r.getTotalUpdates(), + response.getDetectorIntervalInMinutes(), + response.getError() + ); + }, e -> { log.error("Failed to update latest realtime task for " + detectorId, e); })); + }; + if (!adTaskManager.isHCRealtimeTaskStartInitializing(detectorId)) { + // real time init progress is 0 may mean this is a newly started detector + // Delay real time cache update by one minute. If we are in init status, the delay may give the model training time to + // finish. We can change the detector running immediately instead of waiting for the next interval. + threadPool.schedule(profileHCInitProgress, new TimeValue(60, TimeUnit.SECONDS), AnomalyDetectorPlugin.AD_THREAD_POOL_NAME); + } else { + profileHCInitProgress.run(); + } + + } else { + log + .debug( + "Update latest realtime task for single stream detector {}, total updates: {}", + detectorId, + response.getRcfTotalUpdates() + ); + updateLatestRealtimeTask( + detectorId, + null, + response.getRcfTotalUpdates(), + response.getDetectorIntervalInMinutes(), + response.getError() + ); + } + } + + private void updateLatestRealtimeTask( + String detectorId, + String taskState, + Long rcfTotalUpdates, + Long detectorIntervalInMinutes, + String error + ) { + // Don't need info as this will be printed repeatedly in each interval + adTaskManager + .updateLatestRealtimeTaskOnCoordinatingNode( + detectorId, + taskState, + rcfTotalUpdates, + detectorIntervalInMinutes, + error, + ActionListener.wrap(r -> { + if (r != null) { + log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState); + } + }, e -> { + if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) { + // Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction. + log.error("Can't find latest realtime task of detector " + detectorId); + adTaskManager.removeRealtimeTaskCache(detectorId); + } else { + log.error("Failed to update latest realtime task for detector " + detectorId, e); + } + }) + ); + } + + /** + * The function is not only indexing the result with the exception, but also updating the task state after + * 60s if the exception is related to cold start (index not found exceptions) for a single stream detector. + * + * @param detectionStartTime execution start time + * @param executionStartTime execution end time + * @param errorMessage Error message to record + * @param taskState AD task state (e.g., stopped) + * @param detector Detector config accessor + */ + public void indexAnomalyResultException( + Instant detectionStartTime, + Instant executionStartTime, + String errorMessage, + String taskState, + AnomalyDetector detector + ) { + String detectorId = detector.getDetectorId(); + try { + IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) detector.getWindowDelay(); + Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); + Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); + User user = detector.getUser(); + + AnomalyResult anomalyResult = new AnomalyResult( + detectorId, + null, // no task id + new ArrayList(), + dataStartTime, + dataEndTime, + executionStartTime, + Instant.now(), + errorMessage, + null, // single-stream detectors have no entity + user, + anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT), + null // no model id + ); + String resultIndex = detector.getResultIndex(); + if (resultIndex != null && !anomalyDetectionIndices.doesIndexExist(resultIndex)) { + // Set result index as null, will write exception to default result index. + anomalyResultHandler.index(anomalyResult, detectorId, null); + } else { + anomalyResultHandler.index(anomalyResult, detectorId, resultIndex); + } + + if (errorMessage.contains(CommonErrorMessages.NO_CHECKPOINT_ERR_MSG) && !detector.isMultiCategoryDetector()) { + // single stream detector raises ResourceNotFoundException containing CommonErrorMessages.NO_CHECKPOINT_ERR_MSG + // when there is no checkpoint. + // Delay real time cache update by one minute so we will have trained models by then and update the state + // document accordingly. + threadPool.schedule(() -> { + RCFPollingRequest request = new RCFPollingRequest(detectorId); + client.execute(RCFPollingAction.INSTANCE, request, ActionListener.wrap(rcfPollResponse -> { + long totalUpdates = rcfPollResponse.getTotalUpdates(); + // if there are updates, don't record failures + updateLatestRealtimeTask( + detectorId, + taskState, + totalUpdates, + detector.getDetectorIntervalInMinutes(), + totalUpdates > 0 ? "" : errorMessage + ); + }, e -> { + log.error("Fail to execute RCFRollingAction", e); + updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage); + })); + }, new TimeValue(60, TimeUnit.SECONDS), AnomalyDetectorPlugin.AD_THREAD_POOL_NAME); + } else { + updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage); + } + + } catch (Exception e) { + log.error("Failed to index anomaly result for " + detectorId, e); + } + } + +} diff --git a/src/main/java/org/opensearch/ad/caching/PriorityCache.java b/src/main/java/org/opensearch/ad/caching/PriorityCache.java index 0e94663d2..dfae6b931 100644 --- a/src/main/java/org/opensearch/ad/caching/PriorityCache.java +++ b/src/main/java/org/opensearch/ad/caching/PriorityCache.java @@ -55,6 +55,7 @@ import org.opensearch.ad.ratelimit.CheckpointMaintainWorker; import org.opensearch.ad.ratelimit.CheckpointWriteWorker; import org.opensearch.ad.settings.AnomalyDetectorSettings; +import org.opensearch.ad.settings.EnabledSetting; import org.opensearch.ad.util.DateUtils; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; @@ -161,29 +162,31 @@ public ModelState get(String modelId, AnomalyDetector detector) { // during maintenance period, stop putting new entries if (!maintenanceLock.isLocked() && modelState == null) { - DoorKeeper doorKeeper = doorKeepers - .computeIfAbsent( - detectorId, - id -> { - // reset every 60 intervals - return new DoorKeeper( - AnomalyDetectorSettings.DOOR_KEEPER_FOR_CACHE_MAX_INSERTION, - AnomalyDetectorSettings.DOOR_KEEPER_FAULSE_POSITIVE_RATE, - detector.getDetectionIntervalDuration().multipliedBy(AnomalyDetectorSettings.DOOR_KEEPER_MAINTENANCE_FREQ), - clock - ); - } - ); - - // first hit, ignore - // since door keeper may get reset during maintenance, it is possible - // the entity is still active even though door keeper has no record of - // this model Id. We have to call isActive method to make sure. Otherwise, - // the entity might miss an anomaly result every 60 intervals due to door keeper - // reset. - if (!doorKeeper.mightContain(modelId) && !isActive(detectorId, modelId)) { - doorKeeper.put(modelId); - return null; + if (EnabledSetting.isDoorKeeperInCacheEnabled()) { + DoorKeeper doorKeeper = doorKeepers + .computeIfAbsent( + detectorId, + id -> { + // reset every 60 intervals + return new DoorKeeper( + AnomalyDetectorSettings.DOOR_KEEPER_FOR_CACHE_MAX_INSERTION, + AnomalyDetectorSettings.DOOR_KEEPER_FAULSE_POSITIVE_RATE, + detector.getDetectionIntervalDuration().multipliedBy(AnomalyDetectorSettings.DOOR_KEEPER_MAINTENANCE_FREQ), + clock + ); + } + ); + + // first hit, ignore + // since door keeper may get reset during maintenance, it is possible + // the entity is still active even though door keeper has no record of + // this model Id. We have to call isActive method to make sure. Otherwise, + // the entity might miss an anomaly result every 60 intervals due to door keeper + // reset. + if (!doorKeeper.mightContain(modelId) && !isActive(detectorId, modelId)) { + doorKeeper.put(modelId); + return null; + } } try { diff --git a/src/main/java/org/opensearch/ad/model/AnomalyDetector.java b/src/main/java/org/opensearch/ad/model/AnomalyDetector.java index da64b30ea..b7492300d 100644 --- a/src/main/java/org/opensearch/ad/model/AnomalyDetector.java +++ b/src/main/java/org/opensearch/ad/model/AnomalyDetector.java @@ -123,6 +123,7 @@ public class AnomalyDetector implements Writeable, ToXContentObject { private DetectionDateRange detectionDateRange; public static final int MAX_RESULT_INDEX_NAME_SIZE = 255; + // OS doesn’t allow uppercase: https://tinyurl.com/yse2xdbx public static final String RESULT_INDEX_NAME_PATTERN = "[a-z0-9_-]+"; /** diff --git a/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java b/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java index c1166d4d9..8702fafcc 100644 --- a/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java +++ b/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java @@ -22,13 +22,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.ad.NodeStateManager; import org.opensearch.ad.breaker.ADCircuitBreakerService; +import org.opensearch.ad.caching.CacheProvider; import org.opensearch.ad.ml.EntityColdStarter; import org.opensearch.ad.ml.EntityModel; import org.opensearch.ad.ml.ModelManager.ModelType; import org.opensearch.ad.ml.ModelState; +import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.util.ExceptionUtil; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; @@ -49,6 +52,7 @@ public class EntityColdStartWorker extends SingleRequestWorker { public static final String WORKER_NAME = "cold-start"; private final EntityColdStarter entityColdStarter; + private final CacheProvider cacheProvider; public EntityColdStartWorker( long heapSizeInBytes, @@ -67,7 +71,8 @@ public EntityColdStartWorker( Duration executionTtl, EntityColdStarter entityColdStarter, Duration stateTtl, - NodeStateManager nodeStateManager + NodeStateManager nodeStateManager, + CacheProvider cacheProvider ) { super( WORKER_NAME, @@ -90,6 +95,7 @@ public EntityColdStartWorker( nodeStateManager ); this.entityColdStarter = entityColdStarter; + this.cacheProvider = cacheProvider; } @Override @@ -114,15 +120,42 @@ protected void executeRequest(EntityRequest coldStartRequest, ActionListener failureListener = ActionListener.delegateResponse(listener, (delegateListener, e) -> { - if (ExceptionUtil.isOverloaded(e)) { - LOG.error("OpenSearch is overloaded"); - setCoolDownStart(); + ActionListener coldStartListener = ActionListener.wrap(r -> { + nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> { + try { + if (!detectorOptional.isPresent()) { + LOG + .error( + new ParameterizedMessage( + "fail to load trained model [{}] to cache due to the detector not being found.", + modelState.getModelId() + ) + ); + return; + } + AnomalyDetector detector = detectorOptional.get(); + EntityModel model = modelState.getModel(); + // load to cache if cold start succeeds + if (model != null && model.getTrcf() != null) { + cacheProvider.get().hostIfPossible(detector, modelState); + } + } finally { + listener.onResponse(null); + } + }, listener::onFailure)); + + }, e -> { + try { + if (ExceptionUtil.isOverloaded(e)) { + LOG.error("OpenSearch is overloaded"); + setCoolDownStart(); + } + nodeStateManager.setException(detectorId, e); + } finally { + listener.onFailure(e); } - nodeStateManager.setException(detectorId, e); - delegateListener.onFailure(e); }); - entityColdStarter.trainModel(coldStartRequest.getEntity(), detectorId, modelState, failureListener); + entityColdStarter.trainModel(coldStartRequest.getEntity(), detectorId, modelState, coldStartListener); } } diff --git a/src/main/java/org/opensearch/ad/rest/RestExecuteAnomalyDetectorAction.java b/src/main/java/org/opensearch/ad/rest/RestExecuteAnomalyDetectorAction.java index 6e2ac9131..363199d58 100644 --- a/src/main/java/org/opensearch/ad/rest/RestExecuteAnomalyDetectorAction.java +++ b/src/main/java/org/opensearch/ad/rest/RestExecuteAnomalyDetectorAction.java @@ -70,7 +70,6 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli } AnomalyDetectorExecutionInput input = getAnomalyDetectorExecutionInput(request); return channel -> { - String rawPath = request.rawPath(); String error = validateAdExecutionInput(input); if (StringUtils.isNotBlank(error)) { channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, error)); diff --git a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java index b4c7eba3e..31256f3ed 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java @@ -31,6 +31,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; +import org.opensearch.ad.ExecuteADResultResponseRecorder; import org.opensearch.ad.indices.AnomalyDetectionIndices; import org.opensearch.ad.model.ADTaskState; import org.opensearch.ad.model.AnomalyDetector; @@ -38,6 +39,8 @@ import org.opensearch.ad.model.IntervalTimeConfiguration; import org.opensearch.ad.task.ADTaskManager; import org.opensearch.ad.transport.AnomalyDetectorJobResponse; +import org.opensearch.ad.transport.AnomalyResultAction; +import org.opensearch.ad.transport.AnomalyResultRequest; import org.opensearch.ad.transport.StopDetectorAction; import org.opensearch.ad.transport.StopDetectorRequest; import org.opensearch.ad.transport.StopDetectorResponse; @@ -52,6 +55,8 @@ import org.opensearch.rest.RestStatus; import org.opensearch.transport.TransportService; +import com.google.common.base.Throwables; + /** * Anomaly detector job REST action handler to process POST/PUT request. */ @@ -62,19 +67,18 @@ public class IndexAnomalyDetectorJobActionHandler { private final Long seqNo; private final Long primaryTerm; private final Client client; - private final ActionListener listener; private final NamedXContentRegistry xContentRegistry; private final TransportService transportService; private final ADTaskManager adTaskManager; private final Logger logger = LogManager.getLogger(IndexAnomalyDetectorJobActionHandler.class); private final TimeValue requestTimeout; + private final ExecuteADResultResponseRecorder recorder; /** * Constructor function. * * @param client ES node client that executes actions on the local node - * @param listener Listener to send responses * @param anomalyDetectionIndices anomaly detector index manager * @param detectorId detector identifier * @param seqNo sequence number of last modification @@ -83,10 +87,10 @@ public class IndexAnomalyDetectorJobActionHandler { * @param xContentRegistry Registry which is used for XContentParser * @param transportService transport service * @param adTaskManager AD task manager + * @param recorder Utility to record AnomalyResultAction execution result */ public IndexAnomalyDetectorJobActionHandler( Client client, - ActionListener listener, AnomalyDetectionIndices anomalyDetectionIndices, String detectorId, Long seqNo, @@ -94,10 +98,10 @@ public IndexAnomalyDetectorJobActionHandler( TimeValue requestTimeout, NamedXContentRegistry xContentRegistry, TransportService transportService, - ADTaskManager adTaskManager + ADTaskManager adTaskManager, + ExecuteADResultResponseRecorder recorder ) { this.client = client; - this.listener = listener; this.anomalyDetectionIndices = anomalyDetectionIndices; this.detectorId = detectorId; this.seqNo = seqNo; @@ -106,6 +110,7 @@ public IndexAnomalyDetectorJobActionHandler( this.xContentRegistry = xContentRegistry; this.transportService = transportService; this.adTaskManager = adTaskManager; + this.recorder = recorder; } /** @@ -113,16 +118,56 @@ public IndexAnomalyDetectorJobActionHandler( * 1. If job doesn't exist, create new job. * 2. If job exists: a). if job enabled, return error message; b). if job disabled, enable job. * @param detector anomaly detector + * @param listener Listener to send responses */ - public void startAnomalyDetectorJob(AnomalyDetector detector) { + public void startAnomalyDetectorJob(AnomalyDetector detector, ActionListener listener) { + // this start listener is created & injected throughout the job handler so that whenever the job response is received, + // there's the extra step of trying to index results and update detector state with a 60s delay. + ActionListener startListener = ActionListener.wrap(r -> { + try { + Instant executionEndTime = Instant.now(); + IntervalTimeConfiguration schedule = (IntervalTimeConfiguration) detector.getDetectionInterval(); + Instant executionStartTime = executionEndTime.minus(schedule.getInterval(), schedule.getUnit()); + AnomalyResultRequest getRequest = new AnomalyResultRequest( + detector.getDetectorId(), + executionStartTime.toEpochMilli(), + executionEndTime.toEpochMilli() + ); + client + .execute( + AnomalyResultAction.INSTANCE, + getRequest, + ActionListener + .wrap( + response -> recorder.indexAnomalyResult(executionStartTime, executionEndTime, response, detector), + exception -> { + + recorder + .indexAnomalyResultException( + executionStartTime, + executionEndTime, + Throwables.getStackTraceAsString(exception), + null, + detector + ); + } + ) + ); + } catch (Exception ex) { + listener.onFailure(ex); + return; + } + listener.onResponse(r); + + }, listener::onFailure); if (!anomalyDetectionIndices.doesAnomalyDetectorJobIndexExist()) { anomalyDetectionIndices.initAnomalyDetectorJobIndex(ActionListener.wrap(response -> { if (response.isAcknowledged()) { logger.info("Created {} with mappings.", ANOMALY_DETECTORS_INDEX); - createJob(detector); + createJob(detector, startListener); } else { logger.warn("Created {} with mappings call not acknowledged.", ANOMALY_DETECTORS_INDEX); - listener + startListener .onFailure( new OpenSearchStatusException( "Created " + ANOMALY_DETECTORS_INDEX + " with mappings call not acknowledged.", @@ -130,13 +175,13 @@ public void startAnomalyDetectorJob(AnomalyDetector detector) { ) ); } - }, exception -> listener.onFailure(exception))); + }, exception -> startListener.onFailure(exception))); } else { - createJob(detector); + createJob(detector, startListener); } } - private void createJob(AnomalyDetector detector) { + private void createJob(AnomalyDetector detector, ActionListener listener) { try { IntervalTimeConfiguration interval = (IntervalTimeConfiguration) detector.getDetectionInterval(); Schedule schedule = new IntervalSchedule(Instant.now(), (int) interval.getInterval(), interval.getUnit()); @@ -155,7 +200,7 @@ private void createJob(AnomalyDetector detector) { detector.getResultIndex() ); - getAnomalyDetectorJobForWrite(detector, job); + getAnomalyDetectorJobForWrite(detector, job, listener); } catch (Exception e) { String message = "Failed to parse anomaly detector job " + detectorId; logger.error(message, e); @@ -163,19 +208,30 @@ private void createJob(AnomalyDetector detector) { } } - private void getAnomalyDetectorJobForWrite(AnomalyDetector detector, AnomalyDetectorJob job) { + private void getAnomalyDetectorJobForWrite( + AnomalyDetector detector, + AnomalyDetectorJob job, + ActionListener listener + ) { GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId); client .get( getRequest, ActionListener - .wrap(response -> onGetAnomalyDetectorJobForWrite(response, detector, job), exception -> listener.onFailure(exception)) + .wrap( + response -> onGetAnomalyDetectorJobForWrite(response, detector, job, listener), + exception -> listener.onFailure(exception) + ) ); } - private void onGetAnomalyDetectorJobForWrite(GetResponse response, AnomalyDetector detector, AnomalyDetectorJob job) - throws IOException { + private void onGetAnomalyDetectorJobForWrite( + GetResponse response, + AnomalyDetector detector, + AnomalyDetectorJob job, + ActionListener listener + ) throws IOException { if (response.isExists()) { try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); @@ -207,7 +263,7 @@ private void onGetAnomalyDetectorJobForWrite(GetResponse response, AnomalyDetect transportService, ActionListener .wrap( - r -> { indexAnomalyDetectorJob(newJob, null); }, + r -> { indexAnomalyDetectorJob(newJob, null, listener); }, e -> { // Have logged error message in ADTaskManager#startDetector listener.onFailure(e); @@ -227,12 +283,16 @@ private void onGetAnomalyDetectorJobForWrite(GetResponse response, AnomalyDetect null, job.getUser(), transportService, - ActionListener.wrap(r -> { indexAnomalyDetectorJob(job, null); }, e -> listener.onFailure(e)) + ActionListener.wrap(r -> { indexAnomalyDetectorJob(job, null, listener); }, e -> listener.onFailure(e)) ); } } - private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunction function) throws IOException { + private void indexAnomalyDetectorJob( + AnomalyDetectorJob job, + AnomalyDetectorFunction function, + ActionListener listener + ) throws IOException { IndexRequest indexRequest = new IndexRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(job.toXContent(XContentFactory.jsonBuilder(), RestHandlerUtils.XCONTENT_WITH_TYPE)) @@ -244,11 +304,18 @@ private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunc .index( indexRequest, ActionListener - .wrap(response -> onIndexAnomalyDetectorJobResponse(response, function), exception -> listener.onFailure(exception)) + .wrap( + response -> onIndexAnomalyDetectorJobResponse(response, function, listener), + exception -> listener.onFailure(exception) + ) ); } - private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) { + private void onIndexAnomalyDetectorJobResponse( + IndexResponse response, + AnomalyDetectorFunction function, + ActionListener listener + ) { if (response == null || (response.getResult() != CREATED && response.getResult() != UPDATED)) { String errorMsg = getShardsFailure(response); listener.onFailure(new OpenSearchStatusException(errorMsg, response.status())); @@ -274,8 +341,9 @@ private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDe * 2.If job exists: a).if job state is disabled, return error message; b).if job state is enabled, disable job. * * @param detectorId detector identifier + * @param listener Listener to send responses */ - public void stopAnomalyDetectorJob(String detectorId) { + public void stopAnomalyDetectorJob(String detectorId, ActionListener listener) { GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId); client.get(getRequest, ActionListener.wrap(response -> { @@ -304,8 +372,9 @@ public void stopAnomalyDetectorJob(String detectorId) { .execute( StopDetectorAction.INSTANCE, new StopDetectorRequest(detectorId), - stopAdDetectorListener(detectorId) - ) + stopAdDetectorListener(detectorId, listener) + ), + listener ); } } catch (IOException e) { @@ -319,7 +388,10 @@ public void stopAnomalyDetectorJob(String detectorId) { }, exception -> listener.onFailure(exception))); } - private ActionListener stopAdDetectorListener(String detectorId) { + private ActionListener stopAdDetectorListener( + String detectorId, + ActionListener listener + ) { return new ActionListener() { @Override public void onResponse(StopDetectorResponse stopDetectorResponse) { diff --git a/src/main/java/org/opensearch/ad/settings/EnabledSetting.java b/src/main/java/org/opensearch/ad/settings/EnabledSetting.java index 6a22b769b..797df2a2b 100644 --- a/src/main/java/org/opensearch/ad/settings/EnabledSetting.java +++ b/src/main/java/org/opensearch/ad/settings/EnabledSetting.java @@ -39,8 +39,9 @@ public class EnabledSetting extends AbstractSetting { public static final String LEGACY_OPENDISTRO_AD_BREAKER_ENABLED = "opendistro.anomaly_detection.breaker.enabled"; - public static final String INTERPOLATION_IN_HCAD_COLD_START_ENABLED = - "plugins.anomaly_detection.hcad_cold_start_interpolation.enabled";; + public static final String INTERPOLATION_IN_HCAD_COLD_START_ENABLED = "plugins.anomaly_detection.hcad_cold_start_interpolation.enabled"; + + public static final String DOOR_KEEPER_IN_CACHE_ENABLED = "plugins.anomaly_detection.door_keeper_in_cache.enabled";; public static final Map> settings = unmodifiableMap(new HashMap>() { { @@ -75,6 +76,13 @@ public class EnabledSetting extends AbstractSetting { INTERPOLATION_IN_HCAD_COLD_START_ENABLED, Setting.boolSetting(INTERPOLATION_IN_HCAD_COLD_START_ENABLED, false, NodeScope, Dynamic) ); + + /** + * We have a bloom filter placed in front of inactive entity cache to + * filter out unpopular items that are not likely to appear more + * than once. Whether this bloom filter is enabled or not. + */ + put(DOOR_KEEPER_IN_CACHE_ENABLED, Setting.boolSetting(DOOR_KEEPER_IN_CACHE_ENABLED, false, NodeScope, Dynamic)); } }); @@ -112,4 +120,12 @@ public static boolean isADBreakerEnabled() { public static boolean isInterpolationInColdStartEnabled() { return EnabledSetting.getInstance().getSettingValue(EnabledSetting.INTERPOLATION_IN_HCAD_COLD_START_ENABLED); } + + /** + * If enabled, we filter out unpopular items that are not likely to appear more than once + * @return wWhether door keeper in cache is enabled or not. + */ + public static boolean isDoorKeeperInCacheEnabled() { + return EnabledSetting.getInstance().getSettingValue(EnabledSetting.DOOR_KEEPER_IN_CACHE_ENABLED); + } } diff --git a/src/main/java/org/opensearch/ad/task/ADTaskManager.java b/src/main/java/org/opensearch/ad/task/ADTaskManager.java index 3d8f0ea95..1b3a775c8 100644 --- a/src/main/java/org/opensearch/ad/task/ADTaskManager.java +++ b/src/main/java/org/opensearch/ad/task/ADTaskManager.java @@ -334,7 +334,7 @@ private void startRealtimeOrHistoricalDetection( try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { if (detectionDateRange == null) { // start realtime job - handler.startAnomalyDetectorJob(detector.get()); + handler.startAnomalyDetectorJob(detector.get(), listener); } else { // start historical analysis task forwardApplyForTaskSlotsRequestToLeadNode(detector.get(), detectionDateRange, user, transportService, listener); @@ -852,7 +852,7 @@ public void stopDetector( ); } else { // stop realtime detector job - handler.stopAnomalyDetectorJob(detectorId); + handler.stopAnomalyDetectorJob(detectorId, listener); } }, listener); } @@ -2820,6 +2820,13 @@ public boolean skipUpdateHCRealtimeTask(String detectorId, String error) { && Objects.equals(error, realtimeTaskCache.getError()); } + public boolean isHCRealtimeTaskStartInitializing(String detectorId) { + ADRealtimeTaskCache realtimeTaskCache = adTaskCacheManager.getRealtimeTaskCache(detectorId); + return realtimeTaskCache != null + && realtimeTaskCache.getInitProgress() != null + && realtimeTaskCache.getInitProgress().floatValue() > 0; + } + public String convertEntityToString(ADTask adTask) { if (adTask == null || !adTask.isEntityTask()) { return null; diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java b/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java index c7f2beddf..181dfc797 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java @@ -24,6 +24,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.ad.ExecuteADResultResponseRecorder; import org.opensearch.ad.indices.AnomalyDetectionIndices; import org.opensearch.ad.model.DetectionDateRange; import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler; @@ -51,6 +52,7 @@ public class AnomalyDetectorJobTransportAction extends HandledTransportAction filterByEnabled = it); + this.recorder = recorder; } @Override @@ -131,7 +135,6 @@ private void executeDetector( ) { IndexAnomalyDetectorJobActionHandler handler = new IndexAnomalyDetectorJobActionHandler( client, - listener, anomalyDetectionIndices, detectorId, seqNo, @@ -139,7 +142,8 @@ private void executeDetector( requestTimeout, xContentRegistry, transportService, - adTaskManager + adTaskManager, + recorder ); if (rawPath.endsWith(RestHandlerUtils.START_JOB)) { adTaskManager.startDetector(detectorId, detectionDateRange, handler, user, transportService, context, listener); diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java index 0aa359a8f..921a5dc55 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java @@ -696,7 +696,7 @@ private Exception coldStartIfNoModel(AtomicReference failure, Anomaly } LOG.info("Trigger cold start for {}", detector.getDetectorId()); coldStart(detector); - return previousException.orElse(new InternalFailure(adID, NO_MODEL_ERR_MSG)); + return previousException.orElse(exp); } private void findException(Throwable cause, String adID, AtomicReference failure, String nodeId) { diff --git a/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java b/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java index a56addd72..0c3d35037 100644 --- a/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java +++ b/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Locale; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; @@ -49,12 +50,14 @@ import org.opensearch.action.index.IndexResponse; import org.opensearch.ad.common.exception.EndRunException; import org.opensearch.ad.indices.AnomalyDetectionIndices; +import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyDetectorJob; import org.opensearch.ad.model.AnomalyResult; import org.opensearch.ad.model.IntervalTimeConfiguration; import org.opensearch.ad.task.ADTaskManager; import org.opensearch.ad.transport.handler.AnomalyIndexHandler; import org.opensearch.ad.util.ClientUtil; +import org.opensearch.ad.util.DiscoveryNodeFilterer; import org.opensearch.ad.util.IndexUtils; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; @@ -114,6 +117,13 @@ public class AnomalyDetectorJobRunnerTests extends AbstractADTest { @Mock private AnomalyDetectionIndices indexUtil; + private ExecuteADResultResponseRecorder recorder; + + @Mock + private DiscoveryNodeFilterer nodeFilter; + + private AnomalyDetector detector; + @BeforeClass public static void setUpBeforeClass() { setUpThreadPool(AnomalyDetectorJobRunnerTests.class.getSimpleName()); @@ -138,7 +148,6 @@ public void setup() throws Exception { Mockito.doReturn(threadContext).when(mockedThreadPool).getThreadContext(); runner.setThreadPool(mockedThreadPool); runner.setClient(client); - runner.setAnomalyResultHandler(anomalyResultHandler); runner.setAdTaskManager(adTaskManager); Settings settings = Settings @@ -154,7 +163,6 @@ public void setup() throws Exception { AnomalyDetectionIndices anomalyDetectionIndices = mock(AnomalyDetectionIndices.class); IndexNameExpressionResolver indexNameResolver = mock(IndexNameExpressionResolver.class); IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameResolver); - NodeStateManager stateManager = mock(NodeStateManager.class); runner.setAnomalyDetectionIndices(indexUtil); @@ -195,6 +203,18 @@ public void setup() throws Exception { return null; }).when(client).index(any(), any()); + + recorder = new ExecuteADResultResponseRecorder(indexUtil, anomalyResultHandler, adTaskManager, nodeFilter, threadPool, client); + runner.setExecuteADResultResponseRecorder(recorder); + detector = TestHelpers.randomAnomalyDetectorWithEmptyFeature(); + + NodeStateManager stateManager = mock(NodeStateManager.class); + doAnswer(invocation -> { + ActionListener> listener = invocation.getArgument(1); + listener.onResponse(Optional.of(detector)); + return null; + }).when(stateManager).getAnomalyDetector(any(String.class), any(ActionListener.class)); + runner.setNodeStateManager(stateManager); } @Rule @@ -222,7 +242,7 @@ public void testRunJobWithNullLockDuration() throws InterruptedException { when(jobParameter.getLockDurationSeconds()).thenReturn(null); when(jobParameter.getSchedule()).thenReturn(new IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES)); runner.runJob(jobParameter, context); - Thread.sleep(1000); + Thread.sleep(2000); assertTrue(testAppender.containsMessage("Can't get lock for AD job")); } @@ -239,7 +259,7 @@ public void testRunJobWithLockDuration() throws InterruptedException { @Test public void testRunAdJobWithNullLock() { LockModel lock = null; - runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now()); + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), recorder, detector); verify(client, never()).execute(any(), any(), any()); } @@ -247,7 +267,7 @@ public void testRunAdJobWithNullLock() { public void testRunAdJobWithLock() { LockModel lock = new LockModel("indexName", "jobId", Instant.now(), 10, false); - runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now()); + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), recorder, detector); verify(client, times(1)).execute(any(), any(), any()); } @@ -257,7 +277,7 @@ public void testRunAdJobWithExecuteException() { doThrow(RuntimeException.class).when(client).execute(any(), any(), any()); - runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now()); + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), recorder, detector); verify(client, times(1)).execute(any(), any(), any()); assertTrue(testAppender.containsMessage("Failed to execute AD job")); } @@ -266,7 +286,17 @@ public void testRunAdJobWithExecuteException() { public void testRunAdJobWithEndRunExceptionNow() { LockModel lock = new LockModel("indexName", "jobId", Instant.now(), 10, false); Exception exception = new EndRunException(jobParameter.getName(), randomAlphaOfLength(5), true); - runner.handleAdException(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), exception); + runner + .handleAdException( + jobParameter, + lockService, + lock, + Instant.now().minusMillis(1000 * 60), + Instant.now(), + exception, + recorder, + detector + ); verify(anomalyResultHandler).index(any(), any(), any()); } @@ -366,7 +396,17 @@ private void testRunAdJobWithEndRunExceptionNowAndStopAdJob(boolean jobExists, b return null; }).when(client).index(any(IndexRequest.class), any()); - runner.handleAdException(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), exception); + runner + .handleAdException( + jobParameter, + lockService, + lock, + Instant.now().minusMillis(1000 * 60), + Instant.now(), + exception, + recorder, + detector + ); } @Test @@ -380,7 +420,17 @@ public void testRunAdJobWithEndRunExceptionNowAndGetJobException() { return null; }).when(client).get(any(GetRequest.class), any()); - runner.handleAdException(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), exception); + runner + .handleAdException( + jobParameter, + lockService, + lock, + Instant.now().minusMillis(1000 * 60), + Instant.now(), + exception, + recorder, + detector + ); assertTrue(testAppender.containsMessage("JobRunner will stop AD job due to EndRunException for")); assertTrue(testAppender.containsMessage("JobRunner failed to get detector job")); verify(anomalyResultHandler).index(any(), any(), any()); @@ -404,7 +454,17 @@ public void testRunAdJobWithEndRunExceptionNowAndFailToGetJob() { return null; }).when(client).get(any(), any()); - runner.handleAdException(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), exception); + runner + .handleAdException( + jobParameter, + lockService, + lock, + Instant.now().minusMillis(1000 * 60), + Instant.now(), + exception, + recorder, + detector + ); verify(anomalyResultHandler).index(any(), any(), any()); assertEquals(1, testAppender.countMessage("JobRunner failed to get detector job")); } @@ -425,10 +485,10 @@ public void testRunAdJobWithEndRunExceptionNotNowAndRetryUntilStop() throws Inte }).when(client).execute(any(), any(), any()); for (int i = 0; i < 3; i++) { - runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime); + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); assertEquals(i + 1, testAppender.countMessage("EndRunException happened for")); } - runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime); + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); assertEquals(1, testAppender.countMessage("JobRunner will stop AD job due to EndRunException retry exceeds upper limit")); } diff --git a/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java b/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java index a4e0ccc13..d939884bf 100644 --- a/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java +++ b/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java @@ -56,6 +56,7 @@ import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.Entity; import org.opensearch.ad.settings.AnomalyDetectorSettings; +import org.opensearch.ad.settings.EnabledSetting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -650,21 +651,26 @@ public void testSelectEmpty() { // test that detector interval is more than 1 hour that maintenance is called before // the next get method public void testLongDetectorInterval() { - when(clock.instant()).thenReturn(Instant.ofEpochSecond(1000)); - when(detector.getDetectionIntervalDuration()).thenReturn(Duration.ofHours(12)); - String modelId = entity1.getModelId(detectorId).get(); - // record last access time 1000 - entityCache.get(modelId, detector); - assertEquals(-1, entityCache.getLastActiveMs(detectorId, modelId)); - // 2 hour = 7200 seconds have passed - long currentTimeEpoch = 8200; - when(clock.instant()).thenReturn(Instant.ofEpochSecond(currentTimeEpoch)); - // door keeper should not be expired since we reclaim space every 60 intervals - entityCache.maintenance(); - // door keeper still has the record and won't blocks entity state being created - entityCache.get(modelId, detector); - // * 1000 to convert to milliseconds - assertEquals(currentTimeEpoch * 1000, entityCache.getLastActiveMs(detectorId, modelId)); + try { + EnabledSetting.getInstance().setSettingValue(EnabledSetting.DOOR_KEEPER_IN_CACHE_ENABLED, true); + when(clock.instant()).thenReturn(Instant.ofEpochSecond(1000)); + when(detector.getDetectionIntervalDuration()).thenReturn(Duration.ofHours(12)); + String modelId = entity1.getModelId(detectorId).get(); + // record last access time 1000 + assertTrue(null == entityCache.get(modelId, detector)); + assertEquals(-1, entityCache.getLastActiveMs(detectorId, modelId)); + // 2 hour = 7200 seconds have passed + long currentTimeEpoch = 8200; + when(clock.instant()).thenReturn(Instant.ofEpochSecond(currentTimeEpoch)); + // door keeper should not be expired since we reclaim space every 60 intervals + entityCache.maintenance(); + // door keeper still has the record and won't blocks entity state being created + entityCache.get(modelId, detector); + // * 1000 to convert to milliseconds + assertEquals(currentTimeEpoch * 1000, entityCache.getLastActiveMs(detectorId, modelId)); + } finally { + EnabledSetting.getInstance().setSettingValue(EnabledSetting.DOOR_KEEPER_IN_CACHE_ENABLED, false); + } } public void testGetNoPriorityUpdate() { diff --git a/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java b/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java index 0c2cce832..074b15552 100644 --- a/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java +++ b/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java @@ -16,8 +16,6 @@ import java.text.SimpleDateFormat; import java.time.Clock; import java.time.Instant; -import java.time.format.DateTimeFormatter; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; @@ -45,7 +43,7 @@ public class DetectionResultEvalutationIT extends AbstractSyntheticDataTest { protected static final Logger LOG = (Logger) LogManager.getLogger(DetectionResultEvalutationIT.class); /** - * Simulate starting the given HCAD detector. + * Wait for HCAD cold start to finish. * @param detectorId Detector Id * @param data Data in Json format * @param trainTestSplit Training data size @@ -54,7 +52,7 @@ public class DetectionResultEvalutationIT extends AbstractSyntheticDataTest { * @param client OpenSearch Client * @throws Exception when failing to query/indexing from/to OpenSearch */ - private void simulateHCADStartDetector( + private void waitForHCADStartDetector( String detectorId, List data, int trainTestSplit, @@ -63,18 +61,6 @@ private void simulateHCADStartDetector( RestClient client ) throws Exception { - Instant trainTime = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(data.get(trainTestSplit - 1).get("timestamp").getAsString())); - - Instant begin = null; - Instant end = null; - for (int i = 0; i < shingleSize; i++) { - begin = trainTime.minus(intervalMinutes * (shingleSize - 1 - i), ChronoUnit.MINUTES); - end = begin.plus(intervalMinutes, ChronoUnit.MINUTES); - try { - getDetectionResult(detectorId, begin, end, client); - } catch (Exception e) {} - } - // It takes time to wait for model initialization long startTime = System.currentTimeMillis(); long duration = 0; do { @@ -94,7 +80,7 @@ private void simulateHCADStartDetector( break; } try { - getDetectionResult(detectorId, begin, end, client); + profileDetectorInitProgress(detectorId, client); } catch (Exception e) {} duration = System.currentTimeMillis() - startTime; } while (duration <= 60_000); @@ -280,14 +266,15 @@ private void verifyRestart(String datasetName, int intervalMinutes, int shingleS String detectorId = createDetector(datasetName, intervalMinutes, client, categoricalField, 0); // cannot stop without actually starting detector because ad complains no ad job index startDetector(detectorId, client); + profileDetectorInitProgress(detectorId, client); // it would be long if we wait for the job actually run the work periodically; speed it up by using simulateHCADStartDetector - simulateHCADStartDetector(detectorId, data, trainTestSplit, shingleSize, intervalMinutes, client); + waitForHCADStartDetector(detectorId, data, trainTestSplit, shingleSize, intervalMinutes, client); String initProgress = profileDetectorInitProgress(detectorId, client); assertEquals("init progress is " + initProgress, "100%", initProgress); stopDetector(detectorId, client); // restart detector startDetector(detectorId, client); - simulateHCADStartDetector(detectorId, data, trainTestSplit, shingleSize, intervalMinutes, client); + waitForHCADStartDetector(detectorId, data, trainTestSplit, shingleSize, intervalMinutes, client); initProgress = profileDetectorInitProgress(detectorId, client); assertEquals("init progress is " + initProgress, "100%", initProgress); } diff --git a/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java b/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java index 1236c6217..4f4ca09c4 100644 --- a/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java +++ b/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java @@ -114,7 +114,6 @@ public void testTrainUsingSamples() throws InterruptedException { public void testColdStart() throws InterruptedException, IOException { Queue samples = MLUtil.createQueueSamples(1); - double[] savedSample = samples.peek(); EntityModel model = new EntityModel(entity, samples, null); modelState = new ModelState<>(model, modelId, detectorId, ModelType.ENTITY.getName(), clock, priority); diff --git a/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java b/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java index 4b7b8b00d..caa36cdaa 100644 --- a/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java +++ b/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java @@ -20,6 +20,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.ad.ExecuteADResultResponseRecorder; import org.opensearch.ad.indices.AnomalyDetectionIndices; import org.opensearch.ad.model.DetectionDateRange; import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler; @@ -52,6 +53,7 @@ public class MockAnomalyDetectorJobTransportActionWithUser extends private ThreadContext.StoredContext context; private final ADTaskManager adTaskManager; private final TransportService transportService; + private final ExecuteADResultResponseRecorder recorder; @Inject public MockAnomalyDetectorJobTransportActionWithUser( @@ -62,7 +64,8 @@ public MockAnomalyDetectorJobTransportActionWithUser( Settings settings, AnomalyDetectionIndices anomalyDetectionIndices, NamedXContentRegistry xContentRegistry, - ADTaskManager adTaskManager + ADTaskManager adTaskManager, + ExecuteADResultResponseRecorder recorder ) { super(MockAnomalyDetectorJobAction.NAME, transportService, actionFilters, AnomalyDetectorJobRequest::new); this.transportService = transportService; @@ -77,6 +80,7 @@ public MockAnomalyDetectorJobTransportActionWithUser( ThreadContext threadContext = new ThreadContext(settings); context = threadContext.stashContext(); + this.recorder = recorder; } @Override @@ -131,7 +135,6 @@ private void executeDetector( ) { IndexAnomalyDetectorJobActionHandler handler = new IndexAnomalyDetectorJobActionHandler( client, - listener, anomalyDetectionIndices, detectorId, seqNo, @@ -139,7 +142,8 @@ private void executeDetector( requestTimeout, xContentRegistry, transportService, - adTaskManager + adTaskManager, + recorder ); if (rawPath.endsWith(RestHandlerUtils.START_JOB)) { adTaskManager.startDetector(detectorId, detectionDateRange, handler, user, transportService, context, listener); diff --git a/src/test/java/org/opensearch/ad/ratelimit/EntityColdStartWorkerTests.java b/src/test/java/org/opensearch/ad/ratelimit/EntityColdStartWorkerTests.java index 77dc78015..0bc23d60c 100644 --- a/src/test/java/org/opensearch/ad/ratelimit/EntityColdStartWorkerTests.java +++ b/src/test/java/org/opensearch/ad/ratelimit/EntityColdStartWorkerTests.java @@ -30,7 +30,10 @@ import org.opensearch.OpenSearchStatusException; import org.opensearch.action.ActionListener; import org.opensearch.ad.breaker.ADCircuitBreakerService; +import org.opensearch.ad.caching.CacheProvider; import org.opensearch.ad.ml.EntityColdStarter; +import org.opensearch.ad.ml.EntityModel; +import org.opensearch.ad.ml.ModelState; import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; @@ -38,10 +41,13 @@ import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.rest.RestStatus; +import test.org.opensearch.ad.util.MLUtil; + public class EntityColdStartWorkerTests extends AbstractRateLimitingTest { ClusterService clusterService; EntityColdStartWorker worker; EntityColdStarter entityColdStarter; + CacheProvider cacheProvider; @Override public void setUp() throws Exception { @@ -64,6 +70,8 @@ public void setUp() throws Exception { entityColdStarter = mock(EntityColdStarter.class); + cacheProvider = mock(CacheProvider.class); + // Integer.MAX_VALUE makes a huge heap worker = new EntityColdStartWorker( Integer.MAX_VALUE, @@ -82,7 +90,8 @@ public void setUp() throws Exception { AnomalyDetectorSettings.QUEUE_MAINTENANCE, entityColdStarter, AnomalyDetectorSettings.HOURLY_MAINTENANCE, - nodeStateManager + nodeStateManager, + cacheProvider ); } @@ -135,4 +144,22 @@ public void testException() { verify(entityColdStarter, times(2)).trainModel(any(), anyString(), any(), any()); verify(nodeStateManager, times(2)).setException(eq(detectorId), any(OpenSearchStatusException.class)); } + + public void testModelHosted() { + EntityRequest request = new EntityRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, entity); + + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(3); + + ModelState state = invocation.getArgument(2); + state.setModel(MLUtil.createNonEmptyModel(detectorId)); + listener.onResponse(null); + + return null; + }).when(entityColdStarter).trainModel(any(), anyString(), any(), any()); + + worker.put(request); + + verify(cacheProvider, times(1)).get(); + } } diff --git a/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java b/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java new file mode 100644 index 000000000..c0366e95a --- /dev/null +++ b/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java @@ -0,0 +1,354 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad.rest.handler; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.action.DocWriteResponse.Result.CREATED; +import static org.opensearch.ad.constant.CommonErrorMessages.CAN_NOT_FIND_LATEST_TASK; + +import java.io.IOException; +import java.util.Arrays; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.opensearch.action.ActionListener; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.ad.ExecuteADResultResponseRecorder; +import org.opensearch.ad.TestHelpers; +import org.opensearch.ad.common.exception.ResourceNotFoundException; +import org.opensearch.ad.constant.CommonErrorMessages; +import org.opensearch.ad.constant.CommonName; +import org.opensearch.ad.indices.AnomalyDetectionIndices; +import org.opensearch.ad.mock.model.MockSimpleLog; +import org.opensearch.ad.model.AnomalyDetector; +import org.opensearch.ad.model.AnomalyResult; +import org.opensearch.ad.model.Feature; +import org.opensearch.ad.task.ADTaskManager; +import org.opensearch.ad.transport.AnomalyDetectorJobResponse; +import org.opensearch.ad.transport.AnomalyResultAction; +import org.opensearch.ad.transport.AnomalyResultResponse; +import org.opensearch.ad.transport.ProfileAction; +import org.opensearch.ad.transport.ProfileResponse; +import org.opensearch.ad.transport.handler.AnomalyIndexHandler; +import org.opensearch.ad.util.DiscoveryNodeFilterer; +import org.opensearch.client.Client; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import com.google.common.collect.ImmutableList; + +public class IndexAnomalyDetectorJobActionHandlerTests extends OpenSearchTestCase { + + private static AnomalyDetectionIndices anomalyDetectionIndices; + private static String detectorId; + private static Long seqNo; + private static Long primaryTerm; + + private static NamedXContentRegistry xContentRegistry; + private static TransportService transportService; + private static TimeValue requestTimeout; + private static DiscoveryNodeFilterer nodeFilter; + private static AnomalyDetector detector; + + private ADTaskManager adTaskManager; + + private ThreadPool threadPool; + + private ExecuteADResultResponseRecorder recorder; + private Client client; + private IndexAnomalyDetectorJobActionHandler handler; + private AnomalyIndexHandler anomalyResultHandler; + + @BeforeClass + public static void setOnce() throws IOException { + detectorId = "123"; + seqNo = 1L; + primaryTerm = 2L; + anomalyDetectionIndices = mock(AnomalyDetectionIndices.class); + xContentRegistry = NamedXContentRegistry.EMPTY; + transportService = mock(TransportService.class); + + requestTimeout = TimeValue.timeValueMinutes(60); + when(anomalyDetectionIndices.doesAnomalyDetectorJobIndexExist()).thenReturn(true); + + nodeFilter = mock(DiscoveryNodeFilterer.class); + detector = TestHelpers.randomAnomalyDetectorUsingCategoryFields(detectorId, Arrays.asList("a")); + } + + @SuppressWarnings("unchecked") + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + client = mock(Client.class); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[1]; + + GetResponse response = mock(GetResponse.class); + when(response.isExists()).thenReturn(false); + listener.onResponse(response); + + return null; + }).when(client).get(any(GetRequest.class), any()); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[1]; + + IndexResponse response = mock(IndexResponse.class); + when(response.getResult()).thenReturn(CREATED); + listener.onResponse(response); + + return null; + }).when(client).index(any(IndexRequest.class), any()); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + + AnomalyResultResponse response = new AnomalyResultResponse(null, "", 0L, 10L, true); + listener.onResponse(response); + + return null; + }).when(client).execute(any(AnomalyResultAction.class), any(), any()); + + adTaskManager = mock(ADTaskManager.class); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[4]; + + AnomalyDetectorJobResponse response = mock(AnomalyDetectorJobResponse.class); + listener.onResponse(response); + + return null; + }).when(adTaskManager).startDetector(any(), any(), any(), any(), any()); + + threadPool = mock(ThreadPool.class); + + anomalyResultHandler = mock(AnomalyIndexHandler.class); + + recorder = new ExecuteADResultResponseRecorder( + anomalyDetectionIndices, + anomalyResultHandler, + adTaskManager, + nodeFilter, + threadPool, + client + ); + + handler = new IndexAnomalyDetectorJobActionHandler( + client, + anomalyDetectionIndices, + detectorId, + seqNo, + primaryTerm, + requestTimeout, + xContentRegistry, + transportService, + adTaskManager, + recorder + ); + } + + @SuppressWarnings("unchecked") + public void testDelayHCProfile() { + when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(false); + + ActionListener listener = mock(ActionListener.class); + + handler.startAnomalyDetectorJob(detector, listener); + + verify(client, times(1)).get(any(), any()); + verify(client, times(1)).execute(any(), any(), any()); + verify(adTaskManager, times(1)).startDetector(any(), any(), any(), any(), any()); + verify(adTaskManager, times(1)).isHCRealtimeTaskStartInitializing(anyString()); + verify(threadPool, times(1)).schedule(any(), any(), any()); + verify(listener, times(1)).onResponse(any()); + } + + @SuppressWarnings("unchecked") + public void testNoDelayHCProfile() { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + + ProfileResponse response = mock(ProfileResponse.class); + when(response.getTotalUpdates()).thenReturn(3L); + listener.onResponse(response); + + return null; + }).when(client).execute(any(ProfileAction.class), any(), any()); + + when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(true); + + ActionListener listener = mock(ActionListener.class); + + handler.startAnomalyDetectorJob(detector, listener); + + verify(client, times(1)).get(any(), any()); + verify(client, times(2)).execute(any(), any(), any()); + verify(adTaskManager, times(1)).startDetector(any(), any(), any(), any(), any()); + verify(adTaskManager, times(1)).isHCRealtimeTaskStartInitializing(anyString()); + verify(adTaskManager, times(1)).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + verify(threadPool, never()).schedule(any(), any(), any()); + verify(listener, times(1)).onResponse(any()); + } + + @SuppressWarnings("unchecked") + public void testHCProfileException() { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + + listener.onFailure(new RuntimeException()); + + return null; + }).when(client).execute(any(ProfileAction.class), any(), any()); + + when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(true); + + ActionListener listener = mock(ActionListener.class); + + handler.startAnomalyDetectorJob(detector, listener); + + verify(client, times(1)).get(any(), any()); + verify(client, times(2)).execute(any(), any(), any()); + verify(adTaskManager, times(1)).startDetector(any(), any(), any(), any(), any()); + verify(adTaskManager, times(1)).isHCRealtimeTaskStartInitializing(anyString()); + verify(adTaskManager, never()).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + verify(threadPool, never()).schedule(any(), any(), any()); + verify(listener, times(1)).onResponse(any()); + } + + @SuppressWarnings("unchecked") + public void testUpdateLatestRealtimeTaskOnCoordinatingNodeResourceNotFoundException() { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + + ProfileResponse response = mock(ProfileResponse.class); + when(response.getTotalUpdates()).thenReturn(3L); + listener.onResponse(response); + + return null; + }).when(client).execute(any(ProfileAction.class), any(), any()); + + when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(true); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[5]; + + listener.onFailure(new ResourceNotFoundException(CAN_NOT_FIND_LATEST_TASK)); + + return null; + }).when(adTaskManager).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + + ActionListener listener = mock(ActionListener.class); + + handler.startAnomalyDetectorJob(detector, listener); + + verify(client, times(1)).get(any(), any()); + verify(client, times(2)).execute(any(), any(), any()); + verify(adTaskManager, times(1)).startDetector(any(), any(), any(), any(), any()); + verify(adTaskManager, times(1)).isHCRealtimeTaskStartInitializing(anyString()); + verify(adTaskManager, times(1)).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + verify(adTaskManager, times(1)).removeRealtimeTaskCache(anyString()); + verify(threadPool, never()).schedule(any(), any(), any()); + verify(listener, times(1)).onResponse(any()); + } + + @SuppressWarnings("unchecked") + public void testUpdateLatestRealtimeTaskOnCoordinatingException() { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + + ProfileResponse response = mock(ProfileResponse.class); + when(response.getTotalUpdates()).thenReturn(3L); + listener.onResponse(response); + + return null; + }).when(client).execute(any(ProfileAction.class), any(), any()); + + when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(true); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[5]; + + listener.onFailure(new RuntimeException()); + + return null; + }).when(adTaskManager).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + + ActionListener listener = mock(ActionListener.class); + + handler.startAnomalyDetectorJob(detector, listener); + + verify(client, times(1)).get(any(), any()); + verify(client, times(2)).execute(any(), any(), any()); + verify(adTaskManager, times(1)).startDetector(any(), any(), any(), any(), any()); + verify(adTaskManager, times(1)).isHCRealtimeTaskStartInitializing(anyString()); + verify(adTaskManager, times(1)).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + verify(adTaskManager, never()).removeRealtimeTaskCache(anyString()); + verify(threadPool, never()).schedule(any(), any(), any()); + verify(listener, times(1)).onResponse(any()); + } + + @SuppressWarnings("unchecked") + public void testIndexException() throws IOException { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + + listener.onFailure(new ResourceNotFoundException(detectorId, CommonErrorMessages.NO_CHECKPOINT_ERR_MSG)); + + return null; + }).when(client).execute(any(AnomalyResultAction.class), any(), any()); + + ActionListener listener = mock(ActionListener.class); + AggregationBuilder aggregationBuilder = TestHelpers + .parseAggregation("{\"test\":{\"max\":{\"field\":\"" + MockSimpleLog.VALUE_FIELD + "\"}}}"); + Feature feature = new Feature(randomAlphaOfLength(5), randomAlphaOfLength(10), true, aggregationBuilder); + detector = TestHelpers + .randomDetector( + ImmutableList.of(feature), + "test", + 10, + MockSimpleLog.TIME_FIELD, + null, + CommonName.CUSTOM_RESULT_INDEX_PREFIX + "index" + ); + when(anomalyDetectionIndices.doesIndexExist(anyString())).thenReturn(false); + handler.startAnomalyDetectorJob(detector, listener); + verify(anomalyResultHandler, times(1)).index(any(), any(), eq(null)); + verify(threadPool, times(1)).schedule(any(), any(), any()); + } +} diff --git a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobActionTests.java b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobActionTests.java index cefb64f05..860347a21 100644 --- a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobActionTests.java @@ -25,6 +25,7 @@ import org.junit.Test; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; +import org.opensearch.ad.ExecuteADResultResponseRecorder; import org.opensearch.ad.indices.AnomalyDetectionIndices; import org.opensearch.ad.model.DetectionDateRange; import org.opensearch.ad.settings.AnomalyDetectorSettings; @@ -76,7 +77,8 @@ public void setUp() throws Exception { indexSettings(), mock(AnomalyDetectionIndices.class), xContentRegistry(), - mock(ADTaskManager.class) + mock(ADTaskManager.class), + mock(ExecuteADResultResponseRecorder.class) ); task = mock(Task.class); request = new AnomalyDetectorJobRequest("1234", 4567, 7890, "_start");