Skip to content

Commit

Permalink
[7.x][ML] Wait longer for DFA jobs in stop-and-restart tests to make … (
Browse files Browse the repository at this point in the history
#67663)

In tests where we start a DFA job and wait until some progress was made
before we stop and restart the job we need to wait a bit longer for
progress to be made. Also, we should be targetting different phases
to check we resume correctly regardless of the phase the job is
being stopped in.

Relates #67581

Backport of #67567
  • Loading branch information
dimitris-athanasiou authored Jan 18, 2021
1 parent b21e7d5 commit 06941b3
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,8 @@ public void testStopAndRestart() throws Exception {
NodeAcknowledgedResponse response = startAnalytics(jobId);
assertThat(response.getNode(), not(emptyString()));

// Wait until progress for first phase is over 1
assertBusy(() -> {
List<PhaseProgress> progress = getAnalyticsStats(jobId).getProgress();
assertThat(progress.get(0).getProgressPercent(), greaterThan(1));
});
String phaseToWait = randomFrom("reindexing", "loading_data", "feature_selection", "fine_tuning_parameters");
waitUntilSomeProgressHasBeenMadeForPhase(jobId, phaseToWait);
stopAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -69,6 +70,7 @@
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -272,6 +274,14 @@ protected List<TaskInfo> analyticsAssignedTaskList() {
return client().admin().cluster().prepareListTasks().setActions(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME + "[c]").get().getTasks();
}

protected void waitUntilSomeProgressHasBeenMadeForPhase(String jobId, String phase) throws Exception {
assertBusy(() -> {
List<PhaseProgress> progress = getAnalyticsStats(jobId).getProgress();
Optional<PhaseProgress> phaseProgress = progress.stream().filter(p -> phase.equals(p.getPhase())).findFirst();
assertThat("unexpected phase [" + phase + "]; progress was " + progress, phaseProgress.isPresent(), is(true));
assertThat(phaseProgress.get().getProgressPercent(), greaterThan(1));
}, 60, TimeUnit.SECONDS);
}
/**
* Asserts whether the audit messages fetched from index match provided prefixes.
* More specifically, in order to pass:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,8 @@ public void testStopAndRestart() throws Exception {
NodeAcknowledgedResponse response = startAnalytics(jobId);
assertThat(response.getNode(), not(emptyString()));

// Wait until progress for first phase is over 1
assertBusy(() -> {
List<PhaseProgress> progress = getAnalyticsStats(jobId).getProgress();
assertThat(progress.get(0).getProgressPercent(), greaterThan(1));
});
String phaseToWait = randomFrom("reindexing", "loading_data", "feature_selection", "fine_tuning_parameters");
waitUntilSomeProgressHasBeenMadeForPhase(jobId, phaseToWait);
stopAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,8 @@ public void testOutlierDetectionStopAndRestart() throws Exception {
NodeAcknowledgedResponse response = startAnalytics(id);
assertThat(response.getNode(), not(emptyString()));

// Wait until progress for first phase is over 1
assertBusy(() -> {
List<PhaseProgress> progress = getAnalyticsStats(id).getProgress();
assertThat(progress.get(0).getProgressPercent(), greaterThan(1));
});
String phaseToWait = randomFrom("reindexing", "loading_data", "computing_outliers");
waitUntilSomeProgressHasBeenMadeForPhase(id, phaseToWait);
stopAnalytics(id);
waitUntilAnalyticsIsStopped(id);

Expand Down

0 comments on commit 06941b3

Please sign in to comment.