Skip to content

Commit

Permalink
Avoid implicit ML/transform master node timeouts (#113536) (#113559)
Browse files Browse the repository at this point in the history
Today in the ML and Transform plugins we use `null` for timeouts related
to persistent tasks, which means to use the implicit default timeout of
30s. As per #107984 we want to eliminate all such uses of the implicit
default timeout. This commit either moves to using the timeout from the
associated transport request, if available, or else makes it explicit
that we're using a hard-coded 30s timeout.
  • Loading branch information
DaveCTurner authored Sep 25, 2024
1 parent c644dbb commit d06f7f2
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,14 @@ public class MachineLearning extends Plugin

public static final String TRAINED_MODEL_CIRCUIT_BREAKER_NAME = "model_inference";

/**
* Hard-coded timeout used for {@link org.elasticsearch.action.support.master.MasterNodeRequest#masterNodeTimeout()} for requests to
* the master node from ML code. Wherever possible, prefer to use a user-controlled timeout instead of this.
*
* @see <a href="https://github.com/elastic/elasticsearch/issues/107984">#107984</a>
*/
public static final TimeValue HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT = TimeValue.THIRTY_SECONDS;

private static final long DEFAULT_MODEL_CIRCUIT_BREAKER_LIMIT = (long) ((0.50) * JvmInfo.jvmInfo().getMem().getHeapMax().getBytes());
private static final double DEFAULT_MODEL_CIRCUIT_BREAKER_OVERHEAD = 1.0D;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskParams;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

import java.util.List;
Expand Down Expand Up @@ -103,47 +104,51 @@ private void removePersistentTasks(
final AtomicArray<Exception> failures = new AtomicArray<>(numberOfTasks);

for (PersistentTasksCustomMetadata.PersistentTask<?> task : upgradeTasksToCancel) {
persistentTasksService.sendRemoveRequest(task.getId(), null, new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
if (counter.incrementAndGet() == numberOfTasks) {
sendResponseOrFailure(listener, failures);
persistentTasksService.sendRemoveRequest(
task.getId(),
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
if (counter.incrementAndGet() == numberOfTasks) {
sendResponseOrFailure(listener, failures);
}
}
}

@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
// Not found is not an error - it just means the upgrade completed before we could cancel it.
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
failures.set(slot - 1, e);
}
if (slot == numberOfTasks) {
sendResponseOrFailure(listener, failures);
@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
// Not found is not an error - it just means the upgrade completed before we could cancel it.
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
failures.set(slot - 1, e);
}
if (slot == numberOfTasks) {
sendResponseOrFailure(listener, failures);
}
}
}

private void sendResponseOrFailure(ActionListener<Response> listener, AtomicArray<Exception> failures) {
List<Exception> caughtExceptions = failures.asList();
if (caughtExceptions.isEmpty()) {
listener.onResponse(new Response(true));
return;
private void sendResponseOrFailure(ActionListener<Response> listener, AtomicArray<Exception> failures) {
List<Exception> caughtExceptions = failures.asList();
if (caughtExceptions.isEmpty()) {
listener.onResponse(new Response(true));
return;
}

String msg = "Failed to cancel model snapshot upgrade for ["
+ request.getSnapshotId()
+ "] on job ["
+ request.getJobId()
+ "]. Total failures ["
+ caughtExceptions.size()
+ "], rethrowing first. All Exceptions: ["
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]";

ElasticsearchStatusException e = exceptionArrayToStatusException(failures, msg);
listener.onFailure(e);
}

String msg = "Failed to cancel model snapshot upgrade for ["
+ request.getSnapshotId()
+ "] on job ["
+ request.getJobId()
+ "]. Total failures ["
+ caughtExceptions.size()
+ "], rethrowing first. All Exceptions: ["
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]";

ElasticsearchStatusException e = exceptionArrayToStatusException(failures, msg);
listener.onFailure(e);
}
});
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen
// these persistent tasks to disappear.
persistentTasksService.sendRemoveRequest(
jobTask.getId(),
null,
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
ActionListener.wrap(
r -> logger.trace(
() -> format("[%s] removed task to close unassigned job", resolvedJobId)
Expand Down Expand Up @@ -517,48 +517,52 @@ private void forceCloseJob(
PersistentTasksCustomMetadata.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
if (jobTask != null) {
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
persistentTasksService.sendRemoveRequest(jobTask.getId(), null, new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
if (counter.incrementAndGet() == numberOfJobs) {
sendResponseOrFailure(request.getJobId(), listener, failures);
persistentTasksService.sendRemoveRequest(
jobTask.getId(),
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
if (counter.incrementAndGet() == numberOfJobs) {
sendResponseOrFailure(request.getJobId(), listener, failures);
}
}
}

@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
failures.set(slot - 1, e);
@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
failures.set(slot - 1, e);
}
if (slot == numberOfJobs) {
sendResponseOrFailure(request.getJobId(), listener, failures);
}
}
if (slot == numberOfJobs) {
sendResponseOrFailure(request.getJobId(), listener, failures);
}
}

private static void sendResponseOrFailure(
String jobId,
ActionListener<CloseJobAction.Response> listener,
AtomicArray<Exception> failures
) {
List<Exception> caughtExceptions = failures.asList();
if (caughtExceptions.isEmpty()) {
listener.onResponse(new CloseJobAction.Response(true));
return;
private static void sendResponseOrFailure(
String jobId,
ActionListener<CloseJobAction.Response> listener,
AtomicArray<Exception> failures
) {
List<Exception> caughtExceptions = failures.asList();
if (caughtExceptions.isEmpty()) {
listener.onResponse(new CloseJobAction.Response(true));
return;
}

String msg = "Failed to force close job ["
+ jobId
+ "] with ["
+ caughtExceptions.size()
+ "] failures, rethrowing first. All Exceptions: ["
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]";

ElasticsearchStatusException e = exceptionArrayToStatusException(failures, msg);
listener.onFailure(e);
}

String msg = "Failed to force close job ["
+ jobId
+ "] with ["
+ caughtExceptions.size()
+ "] failures, rethrowing first. All Exceptions: ["
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]";

ElasticsearchStatusException e = exceptionArrayToStatusException(failures, msg);
listener.onFailure(e);
}
});
);
}
}
}
Expand Down Expand Up @@ -588,7 +592,7 @@ private void normalCloseJob(
PersistentTasksCustomMetadata.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
persistentTasksService.sendRemoveRequest(
jobTask.getId(),
null,
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
ActionListener.wrap(r -> logger.trace("[{}] removed persistent task for relocated job", jobId), e -> {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
logger.debug("[{}] relocated job task already removed", jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
Expand Down Expand Up @@ -103,22 +104,26 @@ private void removeDatafeedTask(DeleteDatafeedAction.Request request, ClusterSta
if (datafeedTask == null) {
listener.onResponse(true);
} else {
persistentTasksService.sendRemoveRequest(datafeedTask.getId(), null, new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
listener.onResponse(Boolean.TRUE);
}
persistentTasksService.sendRemoveRequest(
datafeedTask.getId(),
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
listener.onResponse(Boolean.TRUE);
}

@Override
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
// the task has been removed in between
listener.onResponse(true);
} else {
listener.onFailure(e);
@Override
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
// the task has been removed in between
listener.onResponse(true);
} else {
listener.onFailure(e);
}
}
}
});
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
Expand Down Expand Up @@ -291,7 +292,11 @@ private void removePersistentTask(String jobId, ClusterState currentState, Actio
if (jobTask == null) {
listener.onResponse(null);
} else {
persistentTasksService.sendRemoveRequest(jobTask.getId(), null, listener.safeMap(task -> true));
persistentTasksService.sendRemoveRequest(
jobTask.getId(),
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
listener.safeMap(task -> true)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.JobNodeSelector;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
Expand Down Expand Up @@ -166,7 +167,7 @@ public void onFailure(Exception e) {
MlTasks.jobTaskId(jobParams.getJobId()),
MlTasks.JOB_TASK_NAME,
jobParams,
null,
request.masterNodeTimeout(),
waitForJobToStart
),
listener::onFailure
Expand Down Expand Up @@ -325,27 +326,31 @@ private void cancelJobStart(
Exception exception,
ActionListener<NodeAcknowledgedResponse> listener
) {
persistentTasksService.sendRemoveRequest(persistentTask.getId(), null, new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
// We succeeded in cancelling the persistent task, but the
// problem that caused us to cancel it is the overall result
listener.onFailure(exception);
}
persistentTasksService.sendRemoveRequest(
persistentTask.getId(),
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
// We succeeded in cancelling the persistent task, but the
// problem that caused us to cancel it is the overall result
listener.onFailure(exception);
}

@Override
public void onFailure(Exception e) {
logger.error(
() -> format(
"[%s] Failed to cancel persistent task that could not be assigned due to [%s]",
persistentTask.getParams().getJobId(),
exception.getMessage()
),
e
);
listener.onFailure(exception);
@Override
public void onFailure(Exception e) {
logger.error(
() -> format(
"[%s] Failed to cancel persistent task that could not be assigned due to [%s]",
persistentTask.getParams().getJobId(),
exception.getMessage()
),
e
);
listener.onFailure(exception);
}
}
});
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void onFailure(Exception e) {
MlTasks.dataFrameAnalyticsTaskId(request.getId()),
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
taskParams,
null,
request.masterNodeTimeout(),
waitForAnalyticsToStart
);
}, listener::onFailure);
Expand Down Expand Up @@ -603,8 +603,8 @@ private void cancelAnalyticsStart(
) {
persistentTasksService.sendRemoveRequest(
persistentTask.getId(),
null,
new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() {
MachineLearning.HARD_CODED_MACHINE_LEARNING_MASTER_NODE_TIMEOUT,
new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
// We succeeded in cancelling the persistent task, but the
Expand Down
Loading

0 comments on commit d06f7f2

Please sign in to comment.