Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Allow stop unassigned datafeed and relax unset upgrade mode wait #39034

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,9 @@ protected void masterOperation(SetUpgradeModeAction.Request request, ClusterStat
(t) -> t.getAssignment().equals(AWAITING_UPGRADE))
.isEmpty() &&

// Datafeeds to wait for a non-"Awaiting upgrade" assignment and for the job task allocations to converge
// If we do not wait, deleting datafeeds, or attempting to unallocate them again causes issues as the
// job's task allocationId could have changed during either process.
// Wait for datafeeds to not be "Awaiting upgrade"
persistentTasksCustomMetaData.findTasks(DATAFEED_TASK_NAME,
(t) ->
t.getAssignment().equals(AWAITING_UPGRADE) ||
t.getAssignment().getExplanation().contains("state is stale"))
(t) -> t.getAssignment().equals(AWAITING_UPGRADE))
.isEmpty(),
request.timeout(),
ActionListener.wrap(r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), wrappedListener::onFailure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;

Expand Down Expand Up @@ -104,7 +103,7 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster() == false) {
// Delegates stop datafeed to elected master node, so it becomes the coordinating node.
// See comment in StartDatafeedAction.Transport class for more information.
// See comment in TransportStartDatafeedAction for more information.
if (nodes.getMasterNode() == null) {
listener.onFailure(new MasterNotDiscoveredException("no known master node"));
} else {
Expand Down Expand Up @@ -142,13 +141,21 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A
Set<String> executorNodes = new HashSet<>();
for (String datafeedId : startedDatafeeds) {
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
if (datafeedTask == null || datafeedTask.isAssigned() == false) {
String message = "Cannot stop datafeed [" + datafeedId + "] because the datafeed does not have an assigned node." +
" Use force stop to stop the datafeed";
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
} else {
if (datafeedTask == null) {
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found.";
assert datafeedTask != null : msg;
logger.error(msg);
} else if (datafeedTask.isAssigned()) {
executorNodes.add(datafeedTask.getExecutorNode());
} else {
// This is the easy case - the datafeed is not currently assigned to a node,
// so can be gracefully stopped simply by removing its persistent task. (Usually
// a graceful stop cannot be achieved by simply removing the persistent task, but
// if the datafeed has no running code then graceful/forceful are the same.)
// The listener here can be a no-op, as waitForDatafeedStopped() already waits for
// these persistent tasks to disappear.
persistentTasksService.sendRemoveRequest(datafeedTask.getId(), ActionListener.wrap(r -> {}, e -> {}));
}
}

Expand Down Expand Up @@ -198,9 +205,10 @@ public void onFailure(Exception e) {
}
});
} else {
String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " +
"datafeed's task could not be found.";
logger.warn(msg);
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found.";
assert datafeedTask != null : msg;
logger.error(msg);
final int slot = counter.incrementAndGet();
failures.set(slot - 1, new RuntimeException(msg));
if (slot == startedDatafeeds.size()) {
Expand Down Expand Up @@ -248,19 +256,18 @@ protected void doRun() throws Exception {

private void sendResponseOrFailure(String datafeedId, ActionListener<StopDatafeedAction.Response> listener,
AtomicArray<Exception> failures) {
List<Exception> catchedExceptions = failures.asList();
if (catchedExceptions.size() == 0) {
List<Exception> caughtExceptions = failures.asList();
if (caughtExceptions.size() == 0) {
listener.onResponse(new StopDatafeedAction.Response(true));
return;
}

String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + catchedExceptions.size()
String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + caughtExceptions.size()
+ "] failures, rethrowing last, all Exceptions: ["
+ catchedExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]";

ElasticsearchException e = new ElasticsearchException(msg,
catchedExceptions.get(0));
ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
listener.onFailure(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,22 +157,15 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception {
client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState());

// Can't normal stop an unassigned datafeed
// An unassigned datafeed can be stopped either normally or by force
StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId);
ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class,
() -> client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet());
assertEquals("Cannot stop datafeed [" + datafeedId +
"] because the datafeed does not have an assigned node. Use force stop to stop the datafeed",
statusException.getMessage());

// Can only force stop an unassigned datafeed
stopDatafeedRequest.setForce(true);
stopDatafeedRequest.setForce(randomBoolean());
StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet();
assertTrue(stopDatafeedResponse.isStopped());

// Can't normal stop an unassigned job
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId);
statusException = expectThrows(ElasticsearchStatusException.class,
ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class,
() -> client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet());
assertEquals("Cannot close job [" + jobId +
"] because the job does not have an assigned node. Use force close to close the job",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,20 +206,18 @@ teardown:
ml.get_datafeed_stats:
datafeed_id: set-upgrade-mode-job-datafeed
- match: { datafeeds.0.state: "started" }
- match: { datafeeds.0.assignment_explanation: "" }
# The datafeed will not be assigned until the job has updated its status on the node it's assigned
# to, and that probably won't happen in time for this assertion. That is indicated by an assignment
# reason ending "state is stale". However, the datafeed should NOT be unassigned with a reason of
# "upgrade mode is enabled" - that reason should have gone away before this test.
- match: { datafeeds.0.assignment_explanation: /(^$|.+job.+state.is.stale)/ }

- do:
cat.tasks: {}
- match:
$body: |
/.+job.+/

- do:
cat.tasks: {}
- match:
$body: |
/.+datafeed.+/

---
"Attempt to open job when upgrade_mode is enabled":
- do:
Expand Down