diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java index 00d8c15e41876..6512dc075d701 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManager.java @@ -17,12 +17,10 @@ import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult; -import java.io.IOException; import java.util.Iterator; import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.function.Consumer; public class MemoryUsageEstimationProcessManager { @@ -74,24 +72,21 @@ private MemoryUsageEstimationResult runJob(String jobId, "", categoricalFields, config.getAnalysis()); - ProcessHolder processHolder = new ProcessHolder(); AnalyticsProcess process = processFactory.createAnalyticsProcess( jobId, processConfig, executorServiceForProcess, - onProcessCrash(jobId, processHolder)); - processHolder.process = process; - if (process.isProcessAlive() == false) { - String errorMsg = - new ParameterizedMessage("[{}] Error while starting process: {}", jobId, process.readError()).getFormattedMessage(); - throw ExceptionsHelper.serverError(errorMsg); - } + // The handler passed here will never be called as AbstractNativeProcess.detectCrash method returns early when + // (processInStream == null) which is the case for MemoryUsageEstimationProcess. + reason -> {}); try { return readResult(jobId, process); } catch (Exception e) { String errorMsg = - new ParameterizedMessage("[{}] Error while processing result [{}]", jobId, e.getMessage()).getFormattedMessage(); + new ParameterizedMessage( + "[{}] Error while processing process output [{}], process errors: [{}]", + jobId, e.getMessage(), process.readError()).getFormattedMessage(); throw ExceptionsHelper.serverError(errorMsg, e); } finally { process.consumeAndCloseOutputStream(); @@ -101,31 +96,14 @@ private MemoryUsageEstimationResult runJob(String jobId, LOGGER.info("[{}] Closed process", jobId); } catch (Exception e) { String errorMsg = - new ParameterizedMessage("[{}] Error while closing process [{}]", jobId, e.getMessage()).getFormattedMessage(); + new ParameterizedMessage( + "[{}] Error while closing process [{}], process errors: [{}]", + jobId, e.getMessage(), process.readError()).getFormattedMessage(); throw ExceptionsHelper.serverError(errorMsg, e); } } } - private static class ProcessHolder { - volatile AnalyticsProcess process; - } - - private static Consumer onProcessCrash(String jobId, ProcessHolder processHolder) { - return reason -> { - AnalyticsProcess process = processHolder.process; - if (process == null) { - LOGGER.error(new ParameterizedMessage("[{}] Process does not exist", jobId)); - return; - } - try { - process.kill(); - } catch (IOException e) { - LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", jobId), e); - } - }; - } - /** * Extracts {@link MemoryUsageEstimationResult} from process' output. */ diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java index 5a647c8178bf4..9790e0618da72 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/MemoryUsageEstimationProcessManagerTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.dataframe.process; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -65,7 +66,6 @@ public void setUpMocks() { executorServiceForJob = EsExecutors.newDirectExecutorService(); executorServiceForProcess = mock(ExecutorService.class); process = mock(AnalyticsProcess.class); - when(process.isProcessAlive()).thenReturn(true); when(process.readAnalyticsResults()).thenReturn(Arrays.asList(PROCESS_RESULT).iterator()); processFactory = mock(AnalyticsProcessFactory.class); when(processFactory.createAnalyticsProcess(anyString(), any(), any(), any())).thenReturn(process); @@ -93,9 +93,8 @@ public void testRunJob_EmptyDataFrame() { verifyNoMoreInteractions(process, listener); } - public void testRunJob_ProcessNotAlive() { - when(process.isProcessAlive()).thenReturn(false); - when(process.readError()).thenReturn("Error from inside the process"); + public void testRunJob_NoResults() throws Exception { + when(process.readAnalyticsResults()).thenReturn(Arrays.asList().iterator()); processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); @@ -103,16 +102,18 @@ public void testRunJob_ProcessNotAlive() { ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue(); assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); assertThat(exception.getMessage(), containsString(TASK_ID)); - assertThat(exception.getMessage(), containsString("Error while starting process")); - assertThat(exception.getMessage(), containsString("Error from inside the process")); + assertThat(exception.getMessage(), containsString("no results")); - verify(process).isProcessAlive(); - verify(process).readError(); + InOrder inOrder = inOrder(process); + inOrder.verify(process).readAnalyticsResults(); + inOrder.verify(process).readError(); + inOrder.verify(process).consumeAndCloseOutputStream(); + inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); } - public void testRunJob_NoResults() throws Exception { - when(process.readAnalyticsResults()).thenReturn(Arrays.asList().iterator()); + public void testRunJob_MultipleResults() throws Exception { + when(process.readAnalyticsResults()).thenReturn(Arrays.asList(PROCESS_RESULT, PROCESS_RESULT).iterator()); processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); @@ -120,18 +121,18 @@ public void testRunJob_NoResults() throws Exception { ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue(); assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); assertThat(exception.getMessage(), containsString(TASK_ID)); - assertThat(exception.getMessage(), containsString("no results")); + assertThat(exception.getMessage(), containsString("more than one result")); InOrder inOrder = inOrder(process); - inOrder.verify(process).isProcessAlive(); inOrder.verify(process).readAnalyticsResults(); + inOrder.verify(process).readError(); inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); } - public void testRunJob_MultipleResults() throws Exception { - when(process.readAnalyticsResults()).thenReturn(Arrays.asList(PROCESS_RESULT, PROCESS_RESULT).iterator()); + public void testRunJob_OneResult_ParseException() throws Exception { + when(process.readAnalyticsResults()).thenThrow(new ElasticsearchParseException("cannot parse result")); processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); @@ -139,15 +140,14 @@ public void testRunJob_MultipleResults() throws Exception { ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue(); assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); assertThat(exception.getMessage(), containsString(TASK_ID)); - assertThat(exception.getMessage(), containsString("more than one result")); + assertThat(exception.getMessage(), containsString("cannot parse result")); InOrder inOrder = inOrder(process); - inOrder.verify(process).isProcessAlive(); inOrder.verify(process).readAnalyticsResults(); + inOrder.verify(process).readError(); inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); verifyNoMoreInteractions(process, listener); - } public void testRunJob_FailsOnClose() throws Exception { @@ -162,10 +162,32 @@ public void testRunJob_FailsOnClose() throws Exception { assertThat(exception.getMessage(), containsString("Error while closing process")); InOrder inOrder = inOrder(process); - inOrder.verify(process).isProcessAlive(); inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close(); + inOrder.verify(process).readError(); + verifyNoMoreInteractions(process, listener); + } + + public void testRunJob_FailsOnClose_ProcessReportsError() throws Exception { + doThrow(ExceptionsHelper.serverError("some LOG(ERROR) lines coming from cpp process")).when(process).close(); + when(process.readError()).thenReturn("Error from inside the process"); + + processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener); + + verify(listener).onFailure(exceptionCaptor.capture()); + ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue(); + assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + assertThat(exception.getMessage(), containsString(TASK_ID)); + assertThat(exception.getMessage(), containsString("Error while closing process")); + assertThat(exception.getMessage(), containsString("some LOG(ERROR) lines coming from cpp process")); + assertThat(exception.getMessage(), containsString("Error from inside the process")); + + InOrder inOrder = inOrder(process); + inOrder.verify(process).readAnalyticsResults(); + inOrder.verify(process).consumeAndCloseOutputStream(); + inOrder.verify(process).close(); + inOrder.verify(process).readError(); verifyNoMoreInteractions(process, listener); } @@ -177,7 +199,6 @@ public void testRunJob_Ok() throws Exception { assertThat(result, equalTo(PROCESS_RESULT)); InOrder inOrder = inOrder(process); - inOrder.verify(process).isProcessAlive(); inOrder.verify(process).readAnalyticsResults(); inOrder.verify(process).consumeAndCloseOutputStream(); inOrder.verify(process).close();