From 78abf95ec2cb95b82469255cac95fc33188bca67 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Tue, 11 Aug 2020 21:22:43 -0700 Subject: [PATCH 1/3] Change profile API to not return estimated minutes remaining until cold start is finished Currently, the progress bar on AD Kibana will show the estimated time remaining to initialize a detector. This can be confusing if this message is displayed before cold start is finished, where the actual initialization time may be much shorter if sufficient historical data exists. This PR changes the profile api to not return any estimated time left until the cold start is finished to prevent this. Testing done: 1. Manually verified the problem is fixed. 2. added unit test for the issue. --- .../ad/AnomalyDetectorProfileRunner.java | 49 ++++++++++++------- .../ad/AnomalyDetectorProfileRunnerTests.java | 22 +++++++++ 2 files changed, 54 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java index aa92d855..5a064928 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java @@ -287,14 +287,7 @@ private ActionListener onGetDetectorForInitProgress( ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); AnomalyDetector detector = AnomalyDetector.parse(parser, detectorId); long intervalMins = ((IntervalTimeConfiguration) detector.getDetectionInterval()).toDuration().toMinutes(); - float percent = (100.0f * totalUpdates) / requiredSamples; - int neededPoints = (int) (requiredSamples - totalUpdates); - InitProgressProfile initProgress = new InitProgressProfile( - // rounding: 93.456 => 93%, 93.556 => 94% - String.format("%.0f%%", percent), - intervalMins * neededPoints, - neededPoints - ); + InitProgressProfile initProgress = computeInitProgressProfile(totalUpdates, intervalMins); listener.onResponse(new DetectorProfile.Builder().initProgress(initProgress).build()); } catch (Exception t) { @@ -308,6 +301,17 @@ private ActionListener onGetDetectorForInitProgress( }, exception -> { listener.failImmediately(FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception); }); } + private InitProgressProfile computeInitProgressProfile(long totalUpdates, long intervalMins) { + float percent = (100.0f * totalUpdates) / requiredSamples; + int neededPoints = (int) (requiredSamples - totalUpdates); + return new InitProgressProfile( + // rounding: 93.456 => 93%, 93.556 => 94% + String.format("%.0f%%", percent), + intervalMins * neededPoints, + neededPoints + ); + } + private void profileModels( String detectorId, Set profiles, @@ -357,7 +361,7 @@ private ActionListener onPollRCFUpdates( return ActionListener.wrap(rcfPollResponse -> { long totalUpdates = rcfPollResponse.getTotalUpdates(); if (totalUpdates < requiredSamples) { - processInitResponse(detectorId, profilesToCollect, listener, totalUpdates); + processInitResponse(detectorId, profilesToCollect, listener, totalUpdates, false); } else { if (profilesToCollect.contains(ProfileName.STATE)) { listener.onResponse(new DetectorProfile.Builder().state(DetectorState.RUNNING).build()); @@ -379,7 +383,11 @@ private ActionListener onPollRCFUpdates( || (causeException instanceof IndexNotFoundException && causeException.getMessage().contains(CommonName.CHECKPOINT_INDEX_NAME))) { // cannot find checkpoint - processInitResponse(detectorId, profilesToCollect, listener, 0L); + // We don't want to show the estimated time remaining to initialize + // a detector before cold start finishes, where the actual + // initialization time may be much shorter if sufficient historical + // data exists. + processInitResponse(detectorId, profilesToCollect, listener, 0L, true); } else { logger.error(new ParameterizedMessage("Fail to get init progress through messaging for {}", detectorId), exception); listener.failImmediately(FAIL_TO_GET_PROFILE_MSG + detectorId, exception); @@ -391,19 +399,26 @@ private void processInitResponse( String detectorId, Set profilesToCollect, MultiResponsesDelegateActionListener listener, - long totalUpdates + long totalUpdates, + boolean hideMinutesLeft ) { if (profilesToCollect.contains(ProfileName.STATE)) { listener.onResponse(new DetectorProfile.Builder().state(DetectorState.INIT).build()); } if (profilesToCollect.contains(ProfileName.INIT_PROGRESS)) { - GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId); - client - .get( - getDetectorRequest, - onGetDetectorForInitProgress(listener, detectorId, profilesToCollect, totalUpdates, requiredSamples) - ); + if (hideMinutesLeft) { + InitProgressProfile initProgress = computeInitProgressProfile(totalUpdates, 0); + listener.onResponse(new DetectorProfile.Builder().initProgress(initProgress).build()); + } else { + GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId); + client + .get( + getDetectorRequest, + onGetDetectorForInitProgress(listener, detectorId, profilesToCollect, totalUpdates, requiredSamples) + ); + } + } } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java index 73142fc4..cccf4b4f 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java @@ -663,4 +663,26 @@ public void testInitNoUpdateNoIndex() throws IOException, InterruptedException { }), stateInitProgress); assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); } + + public void testInitNoIndex() throws IOException, InterruptedException { + setUpClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, RCFPollingStatus.INDEX_NOT_FOUND, ErrorResultStatus.NO_ERROR); + DetectorProfile expectedProfile = new DetectorProfile.Builder() + .state(DetectorState.INIT) + .initProgress(new InitProgressProfile("0%", 0, requiredSamples)) + .build(); + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertEquals(expectedProfile, response); + inProgressLatch.countDown(); + }, exception -> { + logger.error(exception); + for (StackTraceElement ste : exception.getStackTrace()) { + logger.info(ste); + } + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }), stateInitProgress); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } } From db10243041fb930a77840e57416647caacab8e37 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Thu, 13 Aug 2020 12:56:11 -0700 Subject: [PATCH 2/3] throw exception if passed in requiredSamples is <= 0 --- .../ad/AnomalyDetectorProfileRunner.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java index 5a064928..7cbe84ba 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java @@ -76,6 +76,9 @@ public AnomalyDetectorProfileRunner( this.client = client; this.xContentRegistry = xContentRegistry; this.nodeFilter = nodeFilter; + if (requiredSamples <= 0) { + throw new IllegalArgumentException("required samples should be a positive number, but was " + requiredSamples); + } this.requiredSamples = requiredSamples; } From 84f7cd17d27e39141d7b7e20f9afcc83e56e5b70 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Mon, 17 Aug 2020 12:19:32 -0700 Subject: [PATCH 3/3] Added test case for invalid required samples --- .../ad/AnomalyDetectorProfileRunnerTests.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java index cccf4b4f..165ac6ed 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java @@ -685,4 +685,8 @@ public void testInitNoIndex() throws IOException, InterruptedException { }), stateInitProgress); assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); } + + public void testInvalidRequiredSamples() { + expectThrows(IllegalArgumentException.class, () -> new AnomalyDetectorProfileRunner(client, xContentRegistry(), nodeFilter, 0)); + } }