diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java index d7fd41a960a64..d2d085a5fa6b9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java @@ -5,8 +5,10 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -34,11 +36,13 @@ import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor; +import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static org.elasticsearch.ExceptionsHelper.rethrowAndSuppress; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; @@ -120,9 +124,20 @@ protected void masterOperation(SetUpgradeModeAction.Request request, ClusterStat .cluster() .prepareListTasks() .setActions(DATAFEED_TASK_NAME + "[c]", JOB_TASK_NAME + "[c]") + // There is a chance that we failed un-allocating a task due to allocation_id being changed + // This call will timeout in that case and return an error .setWaitForCompletion(true) .setTimeout(request.timeout()).execute(ActionListener.wrap( - r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), + r -> { + try { + // Handle potential node timeouts, + // these should be considered failures as tasks as still potentially executing + rethrowAndSuppress(r.getNodeFailures()); + wrappedListener.onResponse(new AcknowledgedResponse(true)); + } catch (ElasticsearchException ex) { + wrappedListener.onFailure(ex); + } + }, wrappedListener::onFailure)); }, wrappedListener::onFailure @@ -244,10 +259,19 @@ private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMe .stream() .filter(persistentTask -> (persistentTask.getTaskName().equals(MlTasks.JOB_TASK_NAME) || persistentTask.getTaskName().equals(MlTasks.DATAFEED_TASK_NAME))) + // We want to always have the same ordering of which tasks we un-allocate first. + // However, the order in which the distributed tasks handle the un-allocation event is not guaranteed. + .sorted(Comparator.comparing(PersistentTask::getTaskName)) .collect(Collectors.toList()); TypedChainTaskExecutor> chainTaskExecutor = - new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true); + new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), + r -> true, + // Another process could modify tasks and thus we cannot find them via the allocation_id and name + // If the task was removed from the node, all is well + // We handle the case of allocation_id changing later in this transport class by timing out waiting for task completion + // Consequently, if the exception is ResourceNotFoundException, continue execution; circuit break otherwise. + ex -> ex instanceof ResourceNotFoundException == false); for (PersistentTask task : datafeedAndJobTasks) { chainTaskExecutor.add(