Skip to content

Commit

Permalink
[7.x][ML] Improve handling of exception while starting DFA process (e…
Browse files Browse the repository at this point in the history
…lastic#61838) (elastic#61847)

While starting the data frame analytics process it is possible
to get an exception before the process crash handler is in place.
In addition, right after starting the process, we check the process
is alive to ensure we capture a failed process. However, those exceptions
are unhandled.

This commit catches any exception thrown while starting the process
and sets the task to failed with the root cause error message.

I have also taken the chance to remove some unused parameters
in `NativeAnalyticsProcessFactory`.

Relates elastic#61704

Backport of elastic#61838
  • Loading branch information
dimitris-athanasiou authored Sep 2, 2020
1 parent e6dc805 commit 07ab0be
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,17 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
// Fetch existing model state (if any)
BytesReference state = getModelState(config);

if (processContext.startProcess(dataExtractorFactory, task, state)) {
boolean isProcessStarted;
try {
isProcessStarted = processContext.startProcess(dataExtractorFactory, task, state);
} catch (Exception e) {
processContext.stop();
task.setFailed(processContext.getFailureReason() == null ?
e : ExceptionsHelper.serverError(processContext.getFailureReason()));
return;
}

if (isProcessStarted) {
executorServiceForProcess.execute(() -> processContext.resultProcessor.get().process(processContext.process.get()));
executorServiceForProcess.execute(() -> processData(task, processContext, state));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co
// The extra 2 are for the checksum and the control field
int numberOfFields = analyticsProcessConfig.cols() + 2;

createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes, executorService);
createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);

NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes,
numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout,
analyticsProcessConfig, namedXContentRegistry);

try {
startProcess(config, executorService, processPipes, analyticsProcess);
startProcess(config, executorService, analyticsProcess);
return analyticsProcess;
} catch (IOException | EsRejectedExecutionException e) {
String msg = "Failed to connect to data frame analytics process for job " + jobId;
Expand All @@ -101,8 +101,8 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co
}
}

private void startProcess(DataFrameAnalyticsConfig config, ExecutorService executorService, ProcessPipes processPipes,
NativeAnalyticsProcess process) throws IOException {
private void startProcess(DataFrameAnalyticsConfig config, ExecutorService executorService,
NativeAnalyticsProcess process) throws IOException {
if (config.getAnalysis().persistsState()) {
IndexingStateProcessor stateProcessor = new IndexingStateProcessor(config.getId(), resultsPersisterService, auditor);
process.start(executorService, stateProcessor);
Expand All @@ -112,7 +112,7 @@ private void startProcess(DataFrameAnalyticsConfig config, ExecutorService execu
}

private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, List<Path> filesToDelete,
ProcessPipes processPipes, ExecutorService executorService) {
ProcessPipes processPipes) {
AnalyticsBuilder analyticsBuilder =
new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -190,6 +191,20 @@ public void testRunJob_Ok() {
verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, task);
}

public void testRunJob_ProcessNotAliveAfterStart() {
when(process.isProcessAlive()).thenReturn(false);
when(task.getParams()).thenReturn(
new StartDataFrameAnalyticsAction.TaskParams("data_frame_id", Version.CURRENT, Collections.emptyList(), false));

processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory);
assertThat(processManager.getProcessContextCount(), equalTo(1));

ArgumentCaptor<Exception> errorCaptor = ArgumentCaptor.forClass(Exception.class);
verify(task).setFailed(errorCaptor.capture());

assertThat(errorCaptor.getValue().getMessage(), equalTo("Failed to start data frame analytics process"));
}

public void testProcessContext_GetSetFailureReason() {
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig);
assertThat(processContext.getFailureReason(), is(nullValue()));
Expand Down

0 comments on commit 07ab0be

Please sign in to comment.