Skip to content

Commit

Permalink
[ML] Wait longer for DFA jobs in stop-and-restart tests to make progr…
Browse files Browse the repository at this point in the history
…ess (#67657)

In tests where we start a DFA job and wait until some progress was made
before we stop and restart the job we need to wait a bit longer for
progress to be made. Also, we should be targetting different phases
to check we resume correctly regardless of the phase the job is
being stopped in.

Relates #67581
  • Loading branch information
dimitris-athanasiou authored Jan 18, 2021
1 parent 0dac6dc commit 84b2536
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -495,11 +495,8 @@ public void testStopAndRestart() throws Exception {
NodeAcknowledgedResponse response = startAnalytics(jobId);
assertThat(response.getNode(), not(emptyString()));

// Wait until progress for first phase is over 1
assertBusy(() -> {
List<PhaseProgress> progress = getAnalyticsStats(jobId).getProgress();
assertThat(progress.get(0).getProgressPercent(), greaterThan(1));
});
String phaseToWait = randomFrom("reindexing", "loading_data", "feature_selection", "fine_tuning_parameters");
waitUntilSomeProgressHasBeenMadeForPhase(jobId, phaseToWait);
stopAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -69,6 +70,7 @@
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -272,6 +274,14 @@ protected List<TaskInfo> analyticsAssignedTaskList() {
return client().admin().cluster().prepareListTasks().setActions(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME + "[c]").get().getTasks();
}

protected void waitUntilSomeProgressHasBeenMadeForPhase(String jobId, String phase) throws Exception {
assertBusy(() -> {
List<PhaseProgress> progress = getAnalyticsStats(jobId).getProgress();
Optional<PhaseProgress> phaseProgress = progress.stream().filter(p -> phase.equals(p.getPhase())).findFirst();
assertThat("unexpected phase [" + phase + "]; progress was " + progress, phaseProgress.isEmpty(), is(false));
assertThat(phaseProgress.get().getProgressPercent(), greaterThan(1));
}, 60, TimeUnit.SECONDS);
}
/**
* Asserts whether the audit messages fetched from index match provided prefixes.
* More specifically, in order to pass:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,8 @@ public void testStopAndRestart() throws Exception {
NodeAcknowledgedResponse response = startAnalytics(jobId);
assertThat(response.getNode(), not(emptyString()));

// Wait until progress for first phase is over 1
assertBusy(() -> {
List<PhaseProgress> progress = getAnalyticsStats(jobId).getProgress();
assertThat(progress.get(0).getProgressPercent(), greaterThan(1));
});
String phaseToWait = randomFrom("reindexing", "loading_data", "feature_selection", "fine_tuning_parameters");
waitUntilSomeProgressHasBeenMadeForPhase(jobId, phaseToWait);
stopAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,11 +611,8 @@ public void testOutlierDetectionStopAndRestart() throws Exception {
NodeAcknowledgedResponse response = startAnalytics(id);
assertThat(response.getNode(), not(emptyString()));

// Wait until progress for first phase is over 1
assertBusy(() -> {
List<PhaseProgress> progress = getAnalyticsStats(id).getProgress();
assertThat(progress.get(0).getProgressPercent(), greaterThan(1));
});
String phaseToWait = randomFrom("reindexing", "loading_data", "computing_outliers");
waitUntilSomeProgressHasBeenMadeForPhase(id, phaseToWait);
stopAnalytics(id);
waitUntilAnalyticsIsStopped(id);

Expand Down

0 comments on commit 84b2536

Please sign in to comment.