Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
not return estimated minutes remaining until cold start is finished (#…
Browse files Browse the repository at this point in the history
…210)

* Change profile API to not return estimated minutes remaining until cold start is finished

Currently, the progress bar on AD Kibana will show the estimated time remaining to initialize a detector. This can be confusing if this message is displayed before cold start is finished, where the actual initialization time may be much shorter if sufficient historical data exists. This PR changes the profile api to not return any estimated time left until the cold start is finished to prevent this.

Testing done:
1. Manually verified the problem is fixed.
2. added unit test for the issue.
  • Loading branch information
kaituo authored and yizheliu-amazon committed Aug 28, 2020
1 parent 4ec5462 commit 46c8574
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public AnomalyDetectorProfileRunner(
this.client = client;
this.xContentRegistry = xContentRegistry;
this.nodeFilter = nodeFilter;
if (requiredSamples <= 0) {
throw new IllegalArgumentException("required samples should be a positive number, but was " + requiredSamples);
}
this.requiredSamples = requiredSamples;
}

Expand Down Expand Up @@ -287,14 +290,7 @@ private ActionListener<GetResponse> onGetDetectorForInitProgress(
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyDetector detector = AnomalyDetector.parse(parser, detectorId);
long intervalMins = ((IntervalTimeConfiguration) detector.getDetectionInterval()).toDuration().toMinutes();
float percent = (100.0f * totalUpdates) / requiredSamples;
int neededPoints = (int) (requiredSamples - totalUpdates);
InitProgressProfile initProgress = new InitProgressProfile(
// rounding: 93.456 => 93%, 93.556 => 94%
String.format("%.0f%%", percent),
intervalMins * neededPoints,
neededPoints
);
InitProgressProfile initProgress = computeInitProgressProfile(totalUpdates, intervalMins);

listener.onResponse(new DetectorProfile.Builder().initProgress(initProgress).build());
} catch (Exception t) {
Expand All @@ -308,6 +304,17 @@ private ActionListener<GetResponse> onGetDetectorForInitProgress(
}, exception -> { listener.failImmediately(FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception); });
}

private InitProgressProfile computeInitProgressProfile(long totalUpdates, long intervalMins) {
float percent = (100.0f * totalUpdates) / requiredSamples;
int neededPoints = (int) (requiredSamples - totalUpdates);
return new InitProgressProfile(
// rounding: 93.456 => 93%, 93.556 => 94%
String.format("%.0f%%", percent),
intervalMins * neededPoints,
neededPoints
);
}

private void profileModels(
String detectorId,
Set<ProfileName> profiles,
Expand Down Expand Up @@ -357,7 +364,7 @@ private ActionListener<RCFPollingResponse> onPollRCFUpdates(
return ActionListener.wrap(rcfPollResponse -> {
long totalUpdates = rcfPollResponse.getTotalUpdates();
if (totalUpdates < requiredSamples) {
processInitResponse(detectorId, profilesToCollect, listener, totalUpdates);
processInitResponse(detectorId, profilesToCollect, listener, totalUpdates, false);
} else {
if (profilesToCollect.contains(ProfileName.STATE)) {
listener.onResponse(new DetectorProfile.Builder().state(DetectorState.RUNNING).build());
Expand All @@ -379,7 +386,11 @@ private ActionListener<RCFPollingResponse> onPollRCFUpdates(
|| (causeException instanceof IndexNotFoundException
&& causeException.getMessage().contains(CommonName.CHECKPOINT_INDEX_NAME))) {
// cannot find checkpoint
processInitResponse(detectorId, profilesToCollect, listener, 0L);
// We don't want to show the estimated time remaining to initialize
// a detector before cold start finishes, where the actual
// initialization time may be much shorter if sufficient historical
// data exists.
processInitResponse(detectorId, profilesToCollect, listener, 0L, true);
} else {
logger.error(new ParameterizedMessage("Fail to get init progress through messaging for {}", detectorId), exception);
listener.failImmediately(FAIL_TO_GET_PROFILE_MSG + detectorId, exception);
Expand All @@ -391,19 +402,26 @@ private void processInitResponse(
String detectorId,
Set<ProfileName> profilesToCollect,
MultiResponsesDelegateActionListener<DetectorProfile> listener,
long totalUpdates
long totalUpdates,
boolean hideMinutesLeft
) {
if (profilesToCollect.contains(ProfileName.STATE)) {
listener.onResponse(new DetectorProfile.Builder().state(DetectorState.INIT).build());
}

if (profilesToCollect.contains(ProfileName.INIT_PROGRESS)) {
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
client
.get(
getDetectorRequest,
onGetDetectorForInitProgress(listener, detectorId, profilesToCollect, totalUpdates, requiredSamples)
);
if (hideMinutesLeft) {
InitProgressProfile initProgress = computeInitProgressProfile(totalUpdates, 0);
listener.onResponse(new DetectorProfile.Builder().initProgress(initProgress).build());
} else {
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
client
.get(
getDetectorRequest,
onGetDetectorForInitProgress(listener, detectorId, profilesToCollect, totalUpdates, requiredSamples)
);
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -663,4 +663,30 @@ public void testInitNoUpdateNoIndex() throws IOException, InterruptedException {
}), stateInitProgress);
assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS));
}

public void testInitNoIndex() throws IOException, InterruptedException {
setUpClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, RCFPollingStatus.INDEX_NOT_FOUND, ErrorResultStatus.NO_ERROR);
DetectorProfile expectedProfile = new DetectorProfile.Builder()
.state(DetectorState.INIT)
.initProgress(new InitProgressProfile("0%", 0, requiredSamples))
.build();
final CountDownLatch inProgressLatch = new CountDownLatch(1);

runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> {
assertEquals(expectedProfile, response);
inProgressLatch.countDown();
}, exception -> {
logger.error(exception);
for (StackTraceElement ste : exception.getStackTrace()) {
logger.info(ste);
}
assertTrue("Should not reach here ", false);
inProgressLatch.countDown();
}), stateInitProgress);
assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS));
}

public void testInvalidRequiredSamples() {
expectThrows(IllegalArgumentException.class, () -> new AnomalyDetectorProfileRunner(client, xContentRegistry(), nodeFilter, 0));
}
}

0 comments on commit 46c8574

Please sign in to comment.