Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not throw an exception if the process finished quickly but without any error. #46073

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult;

import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

public class MemoryUsageEstimationProcessManager {

Expand Down Expand Up @@ -74,24 +72,21 @@ private MemoryUsageEstimationResult runJob(String jobId,
"",
categoricalFields,
config.getAnalysis());
ProcessHolder processHolder = new ProcessHolder();
AnalyticsProcess<MemoryUsageEstimationResult> process =
processFactory.createAnalyticsProcess(
jobId,
processConfig,
executorServiceForProcess,
onProcessCrash(jobId, processHolder));
processHolder.process = process;
if (process.isProcessAlive() == false) {
String errorMsg =
new ParameterizedMessage("[{}] Error while starting process: {}", jobId, process.readError()).getFormattedMessage();
throw ExceptionsHelper.serverError(errorMsg);
}
// The handler passed here will never be called as AbstractNativeProcess.detectCrash method returns early when
// (processInStream == null) which is the case for MemoryUsageEstimationProcess.
reason -> {});
try {
return readResult(jobId, process);
} catch (Exception e) {
String errorMsg =
new ParameterizedMessage("[{}] Error while processing result [{}]", jobId, e.getMessage()).getFormattedMessage();
new ParameterizedMessage(
"[{}] Error while processing process output [{}], process errors: [{}]",
jobId, e.getMessage(), process.readError()).getFormattedMessage();
throw ExceptionsHelper.serverError(errorMsg, e);
} finally {
process.consumeAndCloseOutputStream();
Expand All @@ -101,31 +96,14 @@ private MemoryUsageEstimationResult runJob(String jobId,
LOGGER.info("[{}] Closed process", jobId);
} catch (Exception e) {
String errorMsg =
new ParameterizedMessage("[{}] Error while closing process [{}]", jobId, e.getMessage()).getFormattedMessage();
new ParameterizedMessage(
"[{}] Error while closing process [{}], process errors: [{}]",
jobId, e.getMessage(), process.readError()).getFormattedMessage();
throw ExceptionsHelper.serverError(errorMsg, e);
}
}
}

private static class ProcessHolder {
volatile AnalyticsProcess<MemoryUsageEstimationResult> process;
}

private static Consumer<String> onProcessCrash(String jobId, ProcessHolder processHolder) {
return reason -> {
AnalyticsProcess<MemoryUsageEstimationResult> process = processHolder.process;
if (process == null) {
LOGGER.error(new ParameterizedMessage("[{}] Process does not exist", jobId));
return;
}
try {
process.kill();
} catch (IOException e) {
LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", jobId), e);
}
};
}

/**
* Extracts {@link MemoryUsageEstimationResult} from process' output.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.dataframe.process;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand Down Expand Up @@ -65,7 +66,6 @@ public void setUpMocks() {
executorServiceForJob = EsExecutors.newDirectExecutorService();
executorServiceForProcess = mock(ExecutorService.class);
process = mock(AnalyticsProcess.class);
when(process.isProcessAlive()).thenReturn(true);
when(process.readAnalyticsResults()).thenReturn(List.of(PROCESS_RESULT).iterator());
processFactory = mock(AnalyticsProcessFactory.class);
when(processFactory.createAnalyticsProcess(anyString(), any(), any(), any())).thenReturn(process);
Expand Down Expand Up @@ -93,61 +93,61 @@ public void testRunJob_EmptyDataFrame() {
verifyNoMoreInteractions(process, listener);
}

public void testRunJob_ProcessNotAlive() {
when(process.isProcessAlive()).thenReturn(false);
when(process.readError()).thenReturn("Error from inside the process");
public void testRunJob_NoResults() throws Exception {
when(process.readAnalyticsResults()).thenReturn(List.<MemoryUsageEstimationResult>of().iterator());

processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener);

verify(listener).onFailure(exceptionCaptor.capture());
ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue();
assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(exception.getMessage(), containsString(TASK_ID));
assertThat(exception.getMessage(), containsString("Error while starting process"));
assertThat(exception.getMessage(), containsString("Error from inside the process"));
assertThat(exception.getMessage(), containsString("no results"));

verify(process).isProcessAlive();
verify(process).readError();
InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
}

public void testRunJob_NoResults() throws Exception {
when(process.readAnalyticsResults()).thenReturn(List.<MemoryUsageEstimationResult>of().iterator());
public void testRunJob_MultipleResults() throws Exception {
when(process.readAnalyticsResults()).thenReturn(List.of(PROCESS_RESULT, PROCESS_RESULT).iterator());

processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener);

verify(listener).onFailure(exceptionCaptor.capture());
ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue();
assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(exception.getMessage(), containsString(TASK_ID));
assertThat(exception.getMessage(), containsString("no results"));
assertThat(exception.getMessage(), containsString("more than one result"));

InOrder inOrder = inOrder(process);
inOrder.verify(process).isProcessAlive();
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
}

public void testRunJob_MultipleResults() throws Exception {
when(process.readAnalyticsResults()).thenReturn(List.of(PROCESS_RESULT, PROCESS_RESULT).iterator());
public void testRunJob_OneResult_ParseException() throws Exception {
when(process.readAnalyticsResults()).thenThrow(new ElasticsearchParseException("cannot parse result"));

processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener);

verify(listener).onFailure(exceptionCaptor.capture());
ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue();
assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(exception.getMessage(), containsString(TASK_ID));
assertThat(exception.getMessage(), containsString("more than one result"));
assertThat(exception.getMessage(), containsString("cannot parse result"));

InOrder inOrder = inOrder(process);
inOrder.verify(process).isProcessAlive();
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);

}

public void testRunJob_FailsOnClose() throws Exception {
Expand All @@ -162,10 +162,32 @@ public void testRunJob_FailsOnClose() throws Exception {
assertThat(exception.getMessage(), containsString("Error while closing process"));

InOrder inOrder = inOrder(process);
inOrder.verify(process).isProcessAlive();
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
inOrder.verify(process).readError();
verifyNoMoreInteractions(process, listener);
}

public void testRunJob_FailsOnClose_ProcessReportsError() throws Exception {
doThrow(ExceptionsHelper.serverError("some LOG(ERROR) lines coming from cpp process")).when(process).close();
when(process.readError()).thenReturn("Error from inside the process");

processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener);

verify(listener).onFailure(exceptionCaptor.capture());
ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue();
assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(exception.getMessage(), containsString(TASK_ID));
assertThat(exception.getMessage(), containsString("Error while closing process"));
assertThat(exception.getMessage(), containsString("some LOG(ERROR) lines coming from cpp process"));
assertThat(exception.getMessage(), containsString("Error from inside the process"));

InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
inOrder.verify(process).readError();
verifyNoMoreInteractions(process, listener);
}

Expand All @@ -177,7 +199,6 @@ public void testRunJob_Ok() throws Exception {
assertThat(result, equalTo(PROCESS_RESULT));

InOrder inOrder = inOrder(process);
inOrder.verify(process).isProcessAlive();
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
Expand Down