From cfbd114ccb7d7ea6dda7bc04ddeba8cbfc0b8f9b Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 14 Apr 2021 13:50:06 -0400 Subject: [PATCH] [ML] fix machine learning job close/kill race condition (#71656) If a machine learning job is killed while it is attempting to open, there is a race condition that may cause it to not close. This is most evident during the `reset_feature` API call. The reset feature API will kill the jobs, then call close quickly to wait for the persistent tasks to complete. But, if this is called while a job is attempting to be assigned to a node, there is a window where the process continues to start even though we attempted to kill and close it. This commit locks the process context on `kill`, and sets the job to `closing`. This way if the process context is already locked (to start), we won't try to kill it until it is fully started. Setting the job to `closing` allows the starting process to exit early if the `kill` command has already been completed (before the communicator was created). closes https://github.com/elastic/elasticsearch/issues/71646 --- .../autodetect/AutodetectCommunicator.java | 13 +--- .../autodetect/AutodetectProcessManager.java | 64 ++++++++++++------- .../process/autodetect/ProcessContext.java | 13 ++++ .../xpack/ml/job/task/JobTask.java | 2 +- .../AutodetectProcessManagerTests.java | 18 +++--- .../action/TransportStopTransformAction.java | 6 +- 6 files changed, 71 insertions(+), 45 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 5e04697c2b405..af581714f1394 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -143,18 +143,11 @@ public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistr handler); } - @Override - public void close() { - close(false, null); - } - /** * Closes job this communicator is encapsulating. - * - * @param restart Whether the job should be restarted by persistent tasks - * @param reason The reason for closing the job */ - public void close(boolean restart, String reason) { + @Override + public void close() { Future future = autodetectWorkerExecutor.submit(() -> { checkProcessIsAlive(); try { @@ -166,7 +159,7 @@ public void close(boolean restart, String reason) { } autodetectResultProcessor.awaitCompletion(); } finally { - onFinishHandler.accept(restart ? new ElasticsearchException(reason) : null, true); + onFinishHandler.accept(null, true); } LOGGER.info("[{}] job closed", job.getId()); return null; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 795bf7e39515a..a3a680ce9adaf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -164,7 +164,7 @@ public synchronized void closeAllJobsOnThisNode(String reason) { logger.info("Closing [{}] jobs, because [{}]", numJobs, reason); for (ProcessContext process : processByAllocation.values()) { - closeJob(process.getJobTask(), false, reason); + closeJob(process.getJobTask(), reason); } } } @@ -179,11 +179,11 @@ public void killProcess(JobTask jobTask, boolean awaitCompletion, String reason) ProcessContext processContext = processByAllocation.remove(jobTask.getAllocationId()); if (processContext != null) { processContext.newKillBuilder() - .setAwaitCompletion(awaitCompletion) - .setFinish(true) - .setReason(reason) - .setShouldFinalizeJob(upgradeInProgress == false && resetInProgress == false) - .kill(); + .setAwaitCompletion(awaitCompletion) + .setFinish(true) + .setReason(reason) + .setShouldFinalizeJob(upgradeInProgress == false && resetInProgress == false) + .kill(); } else { // If the process is missing but the task exists this is most likely // due to 2 reasons. The first is because the job went into the failed @@ -536,6 +536,18 @@ protected void doRun() { try { if (createProcessAndSetRunning(processContext, job, params, closeHandler)) { + if (processContext.getJobTask().isClosing()) { + logger.debug("Aborted opening job [{}] as it is being closed", job.getId()); + closeProcessAndTask(processContext, jobTask, "job is already closing"); + return; + } + // It is possible that a `kill` request came in before the communicator was set + // This means that the kill was not handled appropriately and we continued down this execution path + if (processContext.shouldBeKilled()) { + logger.debug("Aborted opening job [{}] as it is being killed", job.getId()); + processContext.killIt(); + return; + } processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot()); setJobState(jobTask, JobState.OPENED); } @@ -716,25 +728,9 @@ private Consumer onProcessCrash(JobTask jobTask) { }; } - /** - * Stop the running job and mark it as finished. - * - * @param jobTask The job to stop - * @param restart Whether the job should be restarted by persistent tasks - * @param reason The reason for closing the job - */ - public void closeJob(JobTask jobTask, boolean restart, String reason) { + private void closeProcessAndTask(ProcessContext processContext, JobTask jobTask, String reason) { String jobId = jobTask.getJobId(); long allocationId = jobTask.getAllocationId(); - logger.debug("Attempting to close job [{}], because [{}]", jobId, reason); - // don't remove the process context immediately, because we need to ensure - // it is reachable to enable killing a job while it is closing - ProcessContext processContext = processByAllocation.get(allocationId); - if (processContext == null) { - logger.debug("Cannot close job [{}] as it has already been closed or is closing", jobId); - return; - } - processContext.tryLock(); try { if (processContext.setDying() == false) { @@ -755,7 +751,7 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) { logger.debug("Job [{}] is being closed before its process is started", jobId); jobTask.markAsCompleted(); } else { - communicator.close(restart, reason); + communicator.close(); } processByAllocation.remove(allocationId); @@ -781,6 +777,26 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) { } } + /** + * Stop the running job and mark it as finished. + * + * @param jobTask The job to stop + * @param reason The reason for closing the job + */ + public void closeJob(JobTask jobTask, String reason) { + String jobId = jobTask.getJobId(); + long allocationId = jobTask.getAllocationId(); + logger.debug("Attempting to close job [{}], because [{}]", jobId, reason); + // don't remove the process context immediately, because we need to ensure + // it is reachable to enable killing a job while it is closing + ProcessContext processContext = processByAllocation.get(allocationId); + if (processContext == null) { + logger.debug("Cannot close job [{}] as it has already been closed or is closing", jobId); + return; + } + closeProcessAndTask(processContext, jobTask, reason); + } + int numberOfOpenJobs() { return (int) processByAllocation.values().stream() .filter(p -> p.getState() != ProcessContext.ProcessStateName.DYING) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/ProcessContext.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/ProcessContext.java index 13ef3f269a23b..5f3d7e5d0d3c1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/ProcessContext.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/ProcessContext.java @@ -28,6 +28,7 @@ final class ProcessContext { private final JobTask jobTask; private volatile AutodetectCommunicator autodetectCommunicator; private volatile ProcessState state; + private volatile KillBuilder latestKillRequest = null; ProcessContext(JobTask jobTask) { this.jobTask = jobTask; @@ -46,6 +47,17 @@ private void setAutodetectCommunicator(AutodetectCommunicator autodetectCommunic this.autodetectCommunicator = autodetectCommunicator; } + boolean shouldBeKilled() { + return latestKillRequest != null; + } + + void killIt() { + if (latestKillRequest == null) { + throw new IllegalArgumentException("Unable to kill job as previous request is not completed"); + } + latestKillRequest.kill(); + } + ProcessStateName getState() { return state.getName(); } @@ -117,6 +129,7 @@ KillBuilder setShouldFinalizeJob(boolean shouldFinalizeJob) { void kill() { if (autodetectCommunicator == null) { + latestKillRequest = this; return; } String jobId = jobTask.getJobId(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/JobTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/JobTask.java index a3d835f343894..d9e331874ff30 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/JobTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/JobTask.java @@ -51,7 +51,7 @@ public boolean isClosing() { public void closeJob(String reason) { isClosing = true; - autodetectProcessManager.closeJob(this, false, reason); + autodetectProcessManager.closeJob(this, reason); } void setAutodetectProcessManager(AutodetectProcessManager autodetectProcessManager) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 27899fe6eeb16..bf8a4842d082a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -319,7 +319,7 @@ public void testOpenJob_exceedMaxNumJobs() { jobTask = mock(JobTask.class); when(jobTask.getAllocationId()).thenReturn(2L); when(jobTask.getJobId()).thenReturn("baz"); - manager.closeJob(jobTask, false, null); + manager.closeJob(jobTask, null); assertEquals(2, manager.numberOfOpenJobs()); manager.openJob(jobTask, clusterState, (e1, b) -> {}); assertEquals(3, manager.numberOfOpenJobs()); @@ -372,7 +372,7 @@ public void testCloseJob() { // job is created assertEquals(1, manager.numberOfOpenJobs()); - manager.closeJob(jobTask, false, null); + manager.closeJob(jobTask, null); assertEquals(0, manager.numberOfOpenJobs()); } @@ -384,7 +384,7 @@ public void testCanCloseClosingJob() throws Exception { // the middle of the AutodetectProcessManager.close() method Thread.yield(); return null; - }).when(autodetectCommunicator).close(anyBoolean(), anyString()); + }).when(autodetectCommunicator).close(); AutodetectProcessManager manager = createSpyManager(); assertEquals(0, manager.numberOfOpenJobs()); @@ -397,12 +397,12 @@ public void testCanCloseClosingJob() throws Exception { assertEquals(1, manager.numberOfOpenJobs()); // Close the job in a separate thread - Thread closeThread = new Thread(() -> manager.closeJob(jobTask, false, "in separate thread")); + Thread closeThread = new Thread(() -> manager.closeJob(jobTask, "in separate thread")); closeThread.start(); Thread.yield(); // Also close the job in the current thread, so that we have two simultaneous close requests - manager.closeJob(jobTask, false, "in main test thread"); + manager.closeJob(jobTask, "in main test thread"); // The 10 second timeout here is usually far in excess of what is required. In the vast // majority of cases the other thread will exit within a few milliseconds. However, it @@ -427,7 +427,7 @@ public void testCanKillClosingJob() throws Exception { closeInterruptedLatch.countDown(); } return null; - }).when(autodetectCommunicator).close(anyBoolean(), anyString()); + }).when(autodetectCommunicator).close(); doAnswer(invocationOnMock -> { killLatch.countDown(); return null; @@ -442,7 +442,7 @@ public void testCanKillClosingJob() throws Exception { mock(DataLoadParams.class), (dataCounts1, e) -> {}); // Close the job in a separate thread so that it can simulate taking a long time to close - Thread closeThread = new Thread(() -> manager.closeJob(jobTask, false, null)); + Thread closeThread = new Thread(() -> manager.closeJob(jobTask, null)); closeThread.start(); assertTrue(closeStartedLatch.await(3, TimeUnit.SECONDS)); @@ -509,7 +509,7 @@ public void testCloseThrows() { // let the communicator throw, simulating a problem with the underlying // autodetect, e.g. a crash - doThrow(Exception.class).when(autodetectCommunicator).close(anyBoolean(), anyString()); + doThrow(Exception.class).when(autodetectCommunicator).close(); // create a jobtask JobTask jobTask = mock(JobTask.class); @@ -521,7 +521,7 @@ public void testCloseThrows() { verify(manager).setJobState(any(), eq(JobState.OPENED)); // job is created assertEquals(1, manager.numberOfOpenJobs()); - expectThrows(ElasticsearchException.class, () -> manager.closeJob(jobTask, false, null)); + expectThrows(ElasticsearchException.class, () -> manager.closeJob(jobTask, null)); assertEquals(0, manager.numberOfOpenJobs()); verify(manager).setJobState(any(), eq(JobState.FAILED), any()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index 4e7484d62a1a1..8bf05c6d99c0d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -27,6 +28,7 @@ import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.persistent.PersistentTasksService; @@ -286,7 +288,9 @@ private ActionListener waitForStopListener(Request request, ActionList ActionListener onStopListener = ActionListener.wrap( waitResponse -> transformConfigManager.refresh(ActionListener.wrap(r -> listener.onResponse(waitResponse), e -> { - logger.warn("Could not refresh state, state information might be outdated", e); + if ((ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) == false) { + logger.warn("Could not refresh state, state information might be outdated", e); + } listener.onResponse(waitResponse); })), listener::onFailure