Skip to content

Commit

Permalink
Unassign DFA tasks in SetUpgradeModeAction (#54523)
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Apr 14, 2020
1 parent a862dff commit 7375d85
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import java.util.Set;

import static java.util.stream.Collectors.toList;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand All @@ -50,6 +52,8 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
Expand Down Expand Up @@ -497,6 +501,59 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
}

public void testSetUpgradeMode_ExistingTaskGetsUnassigned() throws Exception {
initialize("classification_set_upgrade_mode");
indexData(sourceIndex, 300, 0, KEYWORD_FIELD);

assertThat(upgradeMode(), is(false));

DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
assertThat(analyticsTaskList(), hasSize(1));
assertThat(analyticsAssignedTaskList(), hasSize(1));

setUpgradeModeTo(true);
assertThat(analyticsTaskList(), hasSize(1));
assertThat(analyticsAssignedTaskList(), is(empty()));

GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId);
assertThat(analyticsStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(analyticsStats.getNode(), is(nullValue()));

setUpgradeModeTo(false);
assertThat(analyticsTaskList(), hasSize(1));
assertBusy(() -> assertThat(analyticsAssignedTaskList(), hasSize(1)));

analyticsStats = getAnalyticsStats(jobId);
assertThat(analyticsStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));

waitUntilAnalyticsIsStopped(jobId);
}

public void testSetUpgradeMode_NewTaskDoesNotStart() throws Exception {
initialize("classification_set_upgrade_mode_task_should_not_start");
indexData(sourceIndex, 100, 0, KEYWORD_FIELD);

assertThat(upgradeMode(), is(false));

DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);

setUpgradeModeTo(true);

ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> startAnalytics(config.getId()));
assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS)));
assertThat(
e.getMessage(),
is(equalTo("Cannot perform cluster:admin/xpack/ml/data_frame/analytics/start action while upgrade mode is enabled")));

assertThat(analyticsTaskList(), is(empty()));
assertThat(analyticsAssignedTaskList(), is(empty()));
}

public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
initialize("classification_delete_expired_data");
indexData(sourceIndex, 100, 0, KEYWORD_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
Expand All @@ -48,6 +52,8 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -60,6 +66,7 @@
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -152,7 +159,7 @@ protected GetDataFrameAnalyticsStatsAction.Response.Stats getAnalyticsStats(Stri
GetDataFrameAnalyticsStatsAction.Response response = client().execute(GetDataFrameAnalyticsStatsAction.INSTANCE, request)
.actionGet();
List<GetDataFrameAnalyticsStatsAction.Response.Stats> stats = response.getResponse().results();
assertThat("Got: " + stats.toString(), stats.size(), equalTo(1));
assertThat("Got: " + stats.toString(), stats, hasSize(1));
return stats.get(0);
}

Expand Down Expand Up @@ -196,7 +203,7 @@ protected void assertProgress(String id, int reindexing, int loadingData, int an
GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id);
assertThat(stats.getId(), equalTo(id));
List<PhaseProgress> progress = stats.getProgress();
assertThat(progress.size(), equalTo(4));
assertThat(progress, hasSize(4));
assertThat(progress.get(0).getPhase(), equalTo("reindexing"));
assertThat(progress.get(1).getPhase(), equalTo("loading_data"));
assertThat(progress.get(2).getPhase(), equalTo("analyzing"));
Expand All @@ -221,6 +228,18 @@ protected void assertInferenceModelPersisted(String jobId) {
assertThat("Hits were: " + Strings.toString(searchResponse.getHits()), searchResponse.getHits().getHits(), arrayWithSize(1));
}

protected Collection<PersistentTasksCustomMetadata.PersistentTask<?>> analyticsTaskList() {
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
PersistentTasksCustomMetadata persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
return persistentTasks != null
? persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> true)
: Collections.emptyList();
}

protected List<TaskInfo> analyticsAssignedTaskList() {
return client().admin().cluster().prepareListTasks().setActions(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME + "[c]").get().getTasks();
}

/**
* Asserts whether the audit messages fetched from index match provided prefixes.
* More specifically, in order to pass:
Expand Down Expand Up @@ -284,7 +303,7 @@ protected static void assertModelStatePersisted(String stateDocId) {
SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
.setQuery(QueryBuilders.idsQuery().addIds(stateDocId))
.get();
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
assertThat("Hits were: " + Strings.toString(searchResponse.getHits()), searchResponse.getHits().getHits(), is(arrayWithSize(1)));
}

protected static void assertMlResultsFieldMappings(String index, String predictedClassField, String expectedType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.elasticsearch.xpack.core.ml.MlTasks.DATAFEED_TASK_NAME;
import static org.elasticsearch.xpack.core.ml.MlTasks.JOB_TASK_NAME;
import static org.elasticsearch.xpack.core.ml.MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME;

public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<SetUpgradeModeAction.Request, AcknowledgedResponse> {

private static final Set<String> ML_TASK_NAMES = Set.of(JOB_TASK_NAME, DATAFEED_TASK_NAME, DATA_FRAME_ANALYTICS_TASK_NAME);

private static final Logger logger = LogManager.getLogger(TransportSetUpgradeModeAction.class);
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final PersistentTasksClusterService persistentTasksClusterService;
Expand Down Expand Up @@ -131,12 +134,12 @@ protected void masterOperation(Task task, SetUpgradeModeAction.Request request,

// <4> We have unassigned the tasks, respond to the listener.
ActionListener<List<PersistentTask<?>>> unassignPersistentTasksListener = ActionListener.wrap(
unassigndPersistentTasks -> {
unassignedPersistentTasks -> {
// Wait for our tasks to all stop
client.admin()
.cluster()
.prepareListTasks()
.setActions(DATAFEED_TASK_NAME + "[c]", JOB_TASK_NAME + "[c]")
.setActions(ML_TASK_NAMES.stream().map(taskName -> taskName + "[c]").toArray(String[]::new))
// There is a chance that we failed un-allocating a task due to allocation_id being changed
// This call will timeout in that case and return an error
.setWaitForCompletion(true)
Expand Down Expand Up @@ -176,8 +179,8 @@ protected void masterOperation(Task task, SetUpgradeModeAction.Request request,
If we are enabling the option, we need to isolate the datafeeds so we can unassign the ML Jobs
</.1>
<.2>
If we are disabling the option, we need to wait to make sure all the job and datafeed tasks no longer have the upgrade mode
assignment
If we are disabling the option, we need to wait to make sure all the job, datafeed and analytics tasks no longer have the
upgrade mode assignment
We make no guarantees around which tasks will be running again once upgrade_mode is disabled.
Expand Down Expand Up @@ -217,16 +220,10 @@ protected void masterOperation(Task task, SetUpgradeModeAction.Request request,
} else {
logger.info("Disabling upgrade mode, must wait for tasks to not have AWAITING_UPGRADE assignment");
persistentTasksService.waitForPersistentTasksCondition(
(persistentTasksCustomMetadata) ->
// Wait for jobs to not be "Awaiting upgrade"
persistentTasksCustomMetadata.findTasks(JOB_TASK_NAME,
(t) -> t.getAssignment().equals(AWAITING_UPGRADE))
.isEmpty() &&

// Wait for datafeeds to not be "Awaiting upgrade"
persistentTasksCustomMetadata.findTasks(DATAFEED_TASK_NAME,
(t) -> t.getAssignment().equals(AWAITING_UPGRADE))
.isEmpty(),
// Wait for jobs, datafeeds and analytics not to be "Awaiting upgrade"
persistentTasksCustomMetadata ->
persistentTasksCustomMetadata.tasks().stream()
.noneMatch(t -> ML_TASK_NAMES.contains(t.getTaskName()) && t.getAssignment().equals(AWAITING_UPGRADE)),
request.timeout(),
ActionListener.wrap(r -> {
logger.info("Done waiting for tasks to be out of AWAITING_UPGRADE");
Expand Down Expand Up @@ -266,9 +263,9 @@ protected ClusterBlockException checkBlock(SetUpgradeModeAction.Request request,
}

/**
* Unassigns all Job and Datafeed tasks.
* Unassigns all Job, Datafeed and Data Frame Analytics tasks.
* <p>
* The reason for unassigning both types is that we want the Datafeed to attempt re-assignment once `upgrade_mode` is
* The reason for unassigning both Job and Datafeed is that we want the Datafeed to attempt re-assignment once `upgrade_mode` is
* disabled.
* <p>
* If we do not force an allocation change for the Datafeed tasks, they will never start again, since they were isolated.
Expand All @@ -280,18 +277,17 @@ protected ClusterBlockException checkBlock(SetUpgradeModeAction.Request request,
*/
private void unassignPersistentTasks(PersistentTasksCustomMetadata tasksCustomMetadata,
ActionListener<List<PersistentTask<?>>> listener) {
List<PersistentTask<?>> datafeedAndJobTasks = tasksCustomMetadata
List<PersistentTask<?>> mlTasks = tasksCustomMetadata
.tasks()
.stream()
.filter(persistentTask -> (persistentTask.getTaskName().equals(MlTasks.JOB_TASK_NAME) ||
persistentTask.getTaskName().equals(MlTasks.DATAFEED_TASK_NAME)))
.filter(persistentTask -> ML_TASK_NAMES.contains(persistentTask.getTaskName()))
// We want to always have the same ordering of which tasks we un-allocate first.
// However, the order in which the distributed tasks handle the un-allocation event is not guaranteed.
.sorted(Comparator.comparing(PersistentTask::getTaskName))
.collect(Collectors.toList());

logger.info("Un-assigning persistent tasks : " +
datafeedAndJobTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]")));
mlTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]")));

TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()),
Expand All @@ -302,7 +298,7 @@ private void unassignPersistentTasks(PersistentTasksCustomMetadata tasksCustomMe
// Consequently, if the exception is ResourceNotFoundException, continue execution; circuit break otherwise.
ex -> ExceptionsHelper.unwrapCause(ex) instanceof ResourceNotFoundException == false);

for (PersistentTask<?> task : datafeedAndJobTasks) {
for (PersistentTask<?> task : mlTasks) {
chainTaskExecutor.add(
chainedTask -> persistentTasksClusterService.unassignPersistentTask(task.getId(),
task.getAllocationId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@

/**
* Starts the persistent task for running data frame analytics.
*
* TODO Add to the upgrade mode action
*/
public class TransportStartDataFrameAnalyticsAction
extends TransportMasterNodeAction<StartDataFrameAnalyticsAction.Request, AcknowledgedResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@

/**
* Stops the persistent task for running data frame analytics.
*
* TODO Add to the upgrade mode action
*/
public class TransportStopDataFrameAnalyticsAction
extends TransportTasksAction<DataFrameAnalyticsTask, StopDataFrameAnalyticsAction.Request,
Expand Down

0 comments on commit 7375d85

Please sign in to comment.