diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index dab260771c427..74311f6cb2c9e 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -481,7 +481,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty_DependentVariableI "classification_training_percent_is_50_boolean", BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES, "boolean"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/67581") public void testStopAndRestart() throws Exception { initialize("classification_stop_and_restart"); String predictedClassField = KEYWORD_FIELD + "_prediction"; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index f73491300ec9f..cc5d2efd8eab9 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -69,23 +69,10 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { private String sourceIndex; private String destIndex; - @Before - public void setupLogging() { - client().admin().cluster() - .prepareUpdateSettings() - .setTransientSettings(Settings.builder() - .put("logger.org.elasticsearch.xpack.ml.dataframe", "DEBUG")) - .get(); - } @After public void cleanup() { cleanUp(); - client().admin().cluster() - .prepareUpdateSettings() - .setTransientSettings(Settings.builder() - .putNull("logger.org.elasticsearch.xpack.ml.dataframe")) - .get(); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java index 7d0dd33a941e7..ed99c7501f5d4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java @@ -57,4 +57,19 @@ public void writeEndOfDataMessage() throws IOException { public Iterator readAnalyticsResults() { return resultsParser.parseResults(processOutStream()); } + + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + // Unlike autodetect where closing the process input stream initiates + // termination and additional output from the process which forces us + // to close the output stream after we've finished processing its results, + // in analytics we wait until we've read all results and then we close the + // process. Thus, we can take care of consuming and closing the output + // stream within close itself. + consumeAndCloseOutputStream(); + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java index 68ec738fe9f94..092ae44cbd7cf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcess.java @@ -25,14 +25,6 @@ public interface AnalyticsProcess extends NativeProcess { */ Iterator readAnalyticsResults(); - /** - * Read anything left in the stream before - * closing the stream otherwise if the process - * tries to write more after the close it gets - * a SIGPIPE - */ - void consumeAndCloseOutputStream(); - /** * * @return the process config diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java index 69ade87dfd588..37eec0fb514d2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java @@ -117,7 +117,6 @@ public void process(AnalyticsProcess process) { completeResultsProgress(); } completionLatch.countDown(); - process.consumeAndCloseOutputStream(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java index 21baac76a2f5d..5c69f0a3c5c12 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java @@ -98,7 +98,6 @@ private MemoryUsageEstimationResult runJob(String jobId, jobId, e.getMessage(), process.readError()).getFormattedMessage(); throw ExceptionsHelper.serverError(errorMsg, e); } finally { - process.consumeAndCloseOutputStream(); try { LOGGER.debug("[{}] Closing process", jobId); process.close(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index f041d9aebb046..8868f30155d23 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -313,12 +313,11 @@ protected boolean isProcessKilled() { } public void consumeAndCloseOutputStream() { - try { + try (InputStream outStream = processOutStream()) { byte[] buff = new byte[512]; - while (processOutStream().read(buff) >= 0) { + while (outStream.read(buff) >= 0) { // Do nothing } - processOutStream().close(); } catch (IOException e) { // Given we are closing down the process there is no point propagating IO exceptions here } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java index 3daf08d9250e1..6550fc4483e1f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java @@ -108,7 +108,6 @@ public void testRunJob_NoResults() throws Exception { InOrder inOrder = inOrder(process); inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).readError(); - inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); } @@ -127,7 +126,6 @@ public void testRunJob_MultipleResults() throws Exception { InOrder inOrder = inOrder(process); inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).readError(); - inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); } @@ -146,7 +144,6 @@ public void testRunJob_OneResult_ParseException() throws Exception { InOrder inOrder = inOrder(process); inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).readError(); - inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); } @@ -164,7 +161,6 @@ public void testRunJob_FailsOnClose() throws Exception { InOrder inOrder = inOrder(process); inOrder.verify(process).readAnalyticsResults(); - inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); inOrder.verify(process).readError(); verifyNoMoreInteractions(process, listener); @@ -186,7 +182,6 @@ public void testRunJob_FailsOnClose_ProcessReportsError() throws Exception { InOrder inOrder = inOrder(process); inOrder.verify(process).readAnalyticsResults(); - inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); inOrder.verify(process).readError(); verifyNoMoreInteractions(process, listener); @@ -201,7 +196,6 @@ public void testRunJob_Ok() throws Exception { InOrder inOrder = inOrder(process); inOrder.verify(process).readAnalyticsResults(); - inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); }