Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ML: better handle task state race condition #38040

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
*/
package org.elasticsearch.xpack.ml.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -33,11 +35,13 @@
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;

import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.elasticsearch.ExceptionsHelper.rethrowAndSuppress;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
Expand Down Expand Up @@ -119,9 +123,20 @@ protected void masterOperation(SetUpgradeModeAction.Request request, ClusterStat
.cluster()
.prepareListTasks()
.setActions(DATAFEED_TASK_NAME + "[c]", JOB_TASK_NAME + "[c]")
// There is a chance that we failed un-allocating a task due to allocation_id being changed
// This call will timeout in that case and return an error
.setWaitForCompletion(true)
.setTimeout(request.timeout()).execute(ActionListener.wrap(
r -> wrappedListener.onResponse(new AcknowledgedResponse(true)),
r -> {
try {
// Handle potential node timeouts,
// these should be considered failures as tasks as still potentially executing
rethrowAndSuppress(r.getNodeFailures());
wrappedListener.onResponse(new AcknowledgedResponse(true));
} catch (ElasticsearchException ex) {
wrappedListener.onFailure(ex);
}
},
wrappedListener::onFailure));
},
wrappedListener::onFailure
Expand Down Expand Up @@ -243,10 +258,19 @@ private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMe
.stream()
.filter(persistentTask -> (persistentTask.getTaskName().equals(MlTasks.JOB_TASK_NAME) ||
persistentTask.getTaskName().equals(MlTasks.DATAFEED_TASK_NAME)))
// We want to always have the same ordering of which tasks we un-allocate first.
// However, the order in which the distributed tasks handle the un-allocation event is not guaranteed.
.sorted(Comparator.comparing(PersistentTask::getTaskName))
.collect(Collectors.toList());

TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true);
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()),
r -> true,
// Another process could modify tasks and thus we cannot find them via the allocation_id and name
// If the task was removed from the node, all is well
// We handle the case of allocation_id changing later in this transport class by timing out waiting for task completion
// Consequently, if the exception is ResourceNotFoundException, continue execution; circuit break otherwise.
ex -> ex instanceof ResourceNotFoundException == false);

for (PersistentTask<?> task : datafeedAndJobTasks) {
chainTaskExecutor.add(
Expand Down