From bec993d223714c21db8f7a714ce8fae6c4aa684a Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 21 Jan 2021 16:19:48 +0200 Subject: [PATCH 1/2] [ML] Close results stream before data frame analytics job stops Investigating the failures in #67581 it looked like after restarting the regression job the process was started but no data was loaded. So the process was getting stuck waiting for data. Looking into the code it looks like this can be explained by the fact that `AnalyticsResultProcessor` counts down its completion latch before it closes the results stream. This means the job may go to `stopped` state while the out stream is still alive, which on windows results to the directory with the named pipes staying around. Then when the job is started again, which the test does immediately, the old pipes are used and thus the data is not sent to the the new process. This commit fixes this while refactoring ML processes to consume and close the out stream in their close method so that calling code is not responsible for doing that. Fixes #67581 --- .../ml/integration/ClassificationIT.java | 1 - .../xpack/ml/integration/RegressionIT.java | 16 ----------- .../dataframe/process/AnalyticsProcess.java | 8 ------ .../process/AnalyticsResultProcessor.java | 3 +- .../MemoryUsageEstimationProcessManager.java | 1 - .../process/autodetect/AutodetectProcess.java | 8 ------ .../BlackHoleAutodetectProcess.java | 4 --- .../output/AutodetectResultProcessor.java | 28 ++++++++----------- .../JobSnapshotUpgraderResultProcessor.java | 26 ++++++++--------- .../ml/process/AbstractNativeProcess.java | 10 ++++--- ...oryUsageEstimationProcessManagerTests.java | 6 ---- .../NativeAutodetectProcessTests.java | 18 ++---------- .../process/AbstractNativeProcessTests.java | 4 ++- 13 files changed, 35 insertions(+), 98 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index dab260771c427..74311f6cb2c9e 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -481,7 +481,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty_DependentVariableI "classification_training_percent_is_50_boolean", BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES, "boolean"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/67581") public void testStopAndRestart() throws Exception { initialize("classification_stop_and_restart"); String predictedClassField = KEYWORD_FIELD + "_prediction"; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index f73491300ec9f..ed4c4d89d7818 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -33,9 +33,7 @@ import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig; import org.elasticsearch.xpack.core.ml.inference.preprocessing.OneHotEncoding; import org.elasticsearch.xpack.core.ml.inference.preprocessing.PreProcessor; -import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; import org.junit.After; -import org.junit.Before; import java.io.IOException; import java.time.Instant; @@ -69,23 +67,9 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { private String sourceIndex; private String destIndex; - @Before - public void setupLogging() { - client().admin().cluster() - .prepareUpdateSettings() - .setTransientSettings(Settings.builder() - .put("logger.org.elasticsearch.xpack.ml.dataframe", "DEBUG")) - .get(); - } - @After public void cleanup() { cleanUp(); - client().admin().cluster() - .prepareUpdateSettings() - .setTransientSettings(Settings.builder() - .putNull("logger.org.elasticsearch.xpack.ml.dataframe")) - .get(); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java index 68ec738fe9f94..092ae44cbd7cf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java @@ -25,14 +25,6 @@ public interface AnalyticsProcess extends NativeProcess { */ Iterator readAnalyticsResults(); - /** - * Read anything left in the stream before - * closing the stream otherwise if the process - * tries to write more after the close it gets - * a SIGPIPE - */ - void consumeAndCloseOutputStream(); - /** * * @return the process config diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java index 69ade87dfd588..a8ac4dc658902 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java @@ -23,8 +23,8 @@ import org.elasticsearch.xpack.ml.dataframe.process.results.TrainedModelDefinitionChunk; import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder; import org.elasticsearch.xpack.ml.dataframe.stats.StatsPersister; -import org.elasticsearch.xpack.ml.inference.modelsize.ModelSizeInfo; import org.elasticsearch.xpack.ml.extractor.ExtractedFields; +import org.elasticsearch.xpack.ml.inference.modelsize.ModelSizeInfo; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; @@ -117,7 +117,6 @@ public void process(AnalyticsProcess process) { completeResultsProgress(); } completionLatch.countDown(); - process.consumeAndCloseOutputStream(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java index 21baac76a2f5d..5c69f0a3c5c12 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java @@ -98,7 +98,6 @@ private MemoryUsageEstimationResult runJob(String jobId, jobId, e.getMessage(), process.readError()).getFormattedMessage(); throw ExceptionsHelper.serverError(errorMsg, e); } finally { - process.consumeAndCloseOutputStream(); try { LOGGER.debug("[{}] Closing process", jobId); process.close(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index 7b0a5f86b45a9..5c5808928e46b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -110,12 +110,4 @@ public interface AutodetectProcess extends NativeProcess { * @return stream of autodetect results. */ Iterator readAutodetectResults(); - - /** - * Read anything left in the stream before - * closing the stream otherwise if the process - * tries to write more after the close it gets - * a SIGPIPE - */ - void consumeAndCloseOutputStream(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index 04225ff5382fb..5664620f9ab37 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -173,10 +173,6 @@ public AutodetectResult next() { }; } - @Override - public void consumeAndCloseOutputStream() { - } - @Override public ZonedDateTime getProcessStartTime() { return startTime; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index 5bcd7ec3d33cb..7a4ed9c3cb704 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -187,24 +187,20 @@ public void process() { private void readResults() { currentRunBucketCount = 0; - try { - Iterator iterator = process.readAutodetectResults(); - while (iterator.hasNext()) { - try { - AutodetectResult result = iterator.next(); - processResult(result); - if (result.getBucket() != null) { - LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount); - } - } catch (Exception e) { - if (isAlive() == false) { - throw e; - } - LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e); + Iterator iterator = process.readAutodetectResults(); + while (iterator.hasNext()) { + try { + AutodetectResult result = iterator.next(); + processResult(result); + if (result.getBucket() != null) { + LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount); } + } catch (Exception e) { + if (isAlive() == false) { + throw e; + } + LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e); } - } finally { - process.consumeAndCloseOutputStream(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/JobSnapshotUpgraderResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/JobSnapshotUpgraderResultProcessor.java index 093062025db10..22db7fd63d134 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/JobSnapshotUpgraderResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/JobSnapshotUpgraderResultProcessor.java @@ -115,23 +115,19 @@ public void process() { } private void readResults() { - try { - Iterator iterator = process.readAutodetectResults(); - while (iterator.hasNext()) { - try { - AutodetectResult result = iterator.next(); - processResult(result); - } catch (Exception e) { - if (isAlive() == false) { - throw e; - } - LOGGER.warn( - new ParameterizedMessage("[{}] [{}] Error processing model snapshot upgrade result", jobId, snapshotId), - e); + Iterator iterator = process.readAutodetectResults(); + while (iterator.hasNext()) { + try { + AutodetectResult result = iterator.next(); + processResult(result); + } catch (Exception e) { + if (isAlive() == false) { + throw e; } + LOGGER.warn( + new ParameterizedMessage("[{}] [{}] Error processing model snapshot upgrade result", jobId, snapshotId), + e); } - } finally { - process.consumeAndCloseOutputStream(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index f041d9aebb046..6dc8a84471914 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -176,6 +176,9 @@ public void close() throws IOException { if (processInStream() != null) { processInStream().close(); } + + consumeAndCloseOutputStream(); + // wait for the process to exit by waiting for end-of-file on the named pipe connected // to the state processor - it may take a long time for all the model state to be // indexed @@ -312,13 +315,12 @@ protected boolean isProcessKilled() { return processKilled; } - public void consumeAndCloseOutputStream() { - try { + void consumeAndCloseOutputStream() { + try (InputStream outStream = processOutStream()) { byte[] buff = new byte[512]; - while (processOutStream().read(buff) >= 0) { + while (outStream.read(buff) >= 0) { // Do nothing } - processOutStream().close(); } catch (IOException e) { // Given we are closing down the process there is no point propagating IO exceptions here } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java index 3daf08d9250e1..6550fc4483e1f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java @@ -108,7 +108,6 @@ public void testRunJob_NoResults() throws Exception { InOrder inOrder = inOrder(process); inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).readError(); - inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); } @@ -127,7 +126,6 @@ public void testRunJob_MultipleResults() throws Exception { InOrder inOrder = inOrder(process); inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).readError(); - inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); } @@ -146,7 +144,6 @@ public void testRunJob_OneResult_ParseException() throws Exception { InOrder inOrder = inOrder(process); inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).readError(); - inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); } @@ -164,7 +161,6 @@ public void testRunJob_FailsOnClose() throws Exception { InOrder inOrder = inOrder(process); inOrder.verify(process).readAnalyticsResults(); - inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); inOrder.verify(process).readError(); verifyNoMoreInteractions(process, listener); @@ -186,7 +182,6 @@ public void testRunJob_FailsOnClose_ProcessReportsError() throws Exception { InOrder inOrder = inOrder(process); inOrder.verify(process).readAnalyticsResults(); - inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); inOrder.verify(process).readError(); verifyNoMoreInteractions(process, listener); @@ -201,7 +196,6 @@ public void testRunJob_Ok() throws Exception { InOrder inOrder = inOrder(process); inOrder.verify(process).readAnalyticsResults(); - inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 0b6bf3a459745..2efcdef5e737f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -8,15 +8,15 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; -import org.elasticsearch.xpack.ml.process.IndexingStateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectControlMsgWriter; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; +import org.elasticsearch.xpack.ml.process.IndexingStateProcessor; +import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.process.ProcessResultsParser; -import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler; import org.junit.Assert; import org.junit.Before; @@ -37,7 +37,6 @@ import java.util.concurrent.Future; import java.util.function.Consumer; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; @@ -152,19 +151,6 @@ public void testPersistJob() throws IOException { testWriteMessage(NativeAutodetectProcess::persistState, AutodetectControlMsgWriter.BACKGROUND_PERSIST_MESSAGE_CODE); } - @SuppressWarnings("unchecked") - public void testConsumeAndCloseOutputStream() throws IOException { - - try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), - processPipes, NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { - - process.start(executorService); - process.consumeAndCloseOutputStream(); - assertThat(outputStream.available(), equalTo(0)); - } - } - @SuppressWarnings("unchecked") private void testWriteMessage(CheckedConsumer writeFunction, String expectedMessageCode) throws IOException { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java index fbf7f86c51fc3..3d84c8b6edea1 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -78,8 +79,9 @@ public void initialize() throws IOException { } @After - public void terminateExecutorService() { + public void terminateExecutorService() throws IOException { ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS); + assertThat(processPipes.getProcessOutStream().get().available(), equalTo(0)); verifyNoMoreInteractions(onProcessCrash); } From 8f4bced7acbff371f7132b48b9fa9a386f0ee40b Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 21 Jan 2021 17:15:47 +0200 Subject: [PATCH 2/2] Close output stream after state stream --- .../xpack/ml/process/AbstractNativeProcess.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index 6dc8a84471914..169ef1130abde 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -177,14 +177,15 @@ public void close() throws IOException { processInStream().close(); } - consumeAndCloseOutputStream(); - // wait for the process to exit by waiting for end-of-file on the named pipe connected // to the state processor - it may take a long time for all the model state to be // indexed if (stateProcessorFuture != null) { stateProcessorFuture.get(MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES); } + + consumeAndCloseOutputStream(); + // the log processor should have stopped by now too - assume processing the logs will // take no more than 5 seconds longer than processing the state (usually it should // finish first)