Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Better error when persistent task assignment disabled #52014

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class EnableAssignmentDecider {

public static final Setting<Allocation> CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING =
new Setting<>("cluster.persistent_tasks.allocation.enable", Allocation.ALL.toString(), Allocation::fromString, Dynamic, NodeScope);
public static final String ALLOCATION_NONE_EXPLANATION = "no persistent task assignments are allowed due to cluster settings";

private volatile Allocation enableAssignment;

Expand All @@ -64,7 +65,7 @@ public void setEnableAssignment(final Allocation enableAssignment) {
*/
public AssignmentDecision canAssign() {
if (enableAssignment == Allocation.NONE) {
return new AssignmentDecision(AssignmentDecision.Type.NO, "no persistent task assignments are allowed due to cluster settings");
return new AssignmentDecision(AssignmentDecision.Type.NO, ALLOCATION_NONE_EXPLANATION);
}
return AssignmentDecision.YES;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,42 @@ public void testCreateJob_WithClashingFieldMappingsFails() throws Exception {
"avoid the clash by assigning a dedicated results index"));
}

public void testOpenJobFailsWhenPersistentTaskAssignmentDisabled() throws Exception {
String jobId = "open-job-with-persistent-task-assignment-disabled";
createFarequoteJob(jobId);

Request disablePersistentTaskAssignmentRequest = new Request("PUT", "_cluster/settings");
disablePersistentTaskAssignmentRequest.setJsonEntity("{\n" +
" \"transient\": {\n" +
" \"cluster.persistent_tasks.allocation.enable\": \"none\"\n" +
" }\n" +
"}");
Response disablePersistentTaskAssignmentResponse = client().performRequest(disablePersistentTaskAssignmentRequest);
assertThat(entityAsMap(disablePersistentTaskAssignmentResponse), hasEntry("acknowledged", true));

try {
ResponseException exception = expectThrows(
ResponseException.class,
() -> client().performRequest(
new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open")));
assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(429));
assertThat(EntityUtils.toString(exception.getResponse().getEntity()),
containsString("Cannot open jobs because persistent task assignment is disabled by the " +
"[cluster.persistent_tasks.allocation.enable] setting"));
} finally {
// Try to revert the cluster setting change even if the test fails,
// because otherwise this setting will cause many other tests to fail
Request enablePersistentTaskAssignmentRequest = new Request("PUT", "_cluster/settings");
enablePersistentTaskAssignmentRequest.setJsonEntity("{\n" +
" \"transient\": {\n" +
" \"cluster.persistent_tasks.allocation.enable\": \"all\"\n" +
" }\n" +
"}");
Response enablePersistentTaskAssignmentResponse = client().performRequest(disablePersistentTaskAssignmentRequest);
assertThat(entityAsMap(enablePersistentTaskAssignmentResponse), hasEntry("acknowledged", true));
}
}

public void testDeleteJob() throws Exception {
String jobId = "delete-job-job";
String indexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -560,7 +561,13 @@ public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTa
assignment.isAssigned() == false) {
OpenJobAction.JobParams params = (OpenJobAction.JobParams) persistentTask.getParams();
// Assignment has failed on the master node despite passing our "fast fail" validation
exception = makeNoSuitableNodesException(logger, params.getJobId(), assignment.getExplanation());
if (assignment.equals(AWAITING_UPGRADE)) {
exception = makeCurrentlyBeingUpgradedException(logger, params.getJobId(), assignment.getExplanation());
} else if (assignment.getExplanation().contains("[" + EnableAssignmentDecider.ALLOCATION_NONE_EXPLANATION + "]")) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not ideal that this is done using contains. The better solution would be to make Assignments contain a reason number or enum value as well as a text reason. Then we could check that. But that's obviously a pretty far-reaching change. It may be worth waiting until there are more use cases for a reason code so that any implementation can take all the use cases into account.

exception = makeAssignmentsNotAllowedException(logger, params.getJobId());
} else {
exception = makeNoSuitableNodesException(logger, params.getJobId(), assignment.getExplanation());
}
// The persistent task should be cancelled so that the observed outcome is the
// same as if the "fast fail" validation on the coordinating node had failed
shouldCancel = true;
Expand Down Expand Up @@ -600,6 +607,13 @@ static ElasticsearchException makeNoSuitableNodesException(Logger logger, String
RestStatus.TOO_MANY_REQUESTS, detail);
}

static ElasticsearchException makeAssignmentsNotAllowedException(Logger logger, String jobId) {
String msg = "Cannot open jobs because persistent task assignment is disabled by the ["
+ EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey() + "] setting";
logger.warn("[{}] {}", jobId, msg);
return new ElasticsearchStatusException(msg, RestStatus.TOO_MANY_REQUESTS);
}

static ElasticsearchException makeCurrentlyBeingUpgradedException(Logger logger, String jobId, String explanation) {
String msg = "Cannot open jobs when upgrade mode is enabled";
logger.warn("[{}] {}", jobId, msg);
Expand Down