From 07ab0beea0429ba989dcce57e0d029aef1ff7a02 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 2 Sep 2020 16:32:45 +0300 Subject: [PATCH] [7.x][ML] Improve handling of exception while starting DFA process (#61838) (#61847) While starting the data frame analytics process it is possible to get an exception before the process crash handler is in place. In addition, right after starting the process, we check the process is alive to ensure we capture a failed process. However, those exceptions are unhandled. This commit catches any exception thrown while starting the process and sets the task to failed with the root cause error message. I have also taken the chance to remove some unused parameters in `NativeAnalyticsProcessFactory`. Relates #61704 Backport of #61838 --- .../process/AnalyticsProcessManager.java | 12 +++++++++++- .../process/NativeAnalyticsProcessFactory.java | 10 +++++----- .../process/AnalyticsProcessManagerTests.java | 15 +++++++++++++++ 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 97cd053b13f27..793f5c4ea8859 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -141,7 +141,17 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, // Fetch existing model state (if any) BytesReference state = getModelState(config); - if (processContext.startProcess(dataExtractorFactory, task, state)) { + boolean isProcessStarted; + try { + isProcessStarted = processContext.startProcess(dataExtractorFactory, task, state); + } catch (Exception e) { + processContext.stop(); + task.setFailed(processContext.getFailureReason() == null ? + e : ExceptionsHelper.serverError(processContext.getFailureReason())); + return; + } + + if (isProcessStarted) { executorServiceForProcess.execute(() -> processContext.resultProcessor.get().process(processContext.process.get())); executorServiceForProcess.execute(() -> processData(task, processContext, state)); } else { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index 2d1e4fc01ce02..115516e303004 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -80,14 +80,14 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co // The extra 2 are for the checksum and the control field int numberOfFields = analyticsProcessConfig.cols() + 2; - createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes, executorService); + createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes); NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, analyticsProcessConfig, namedXContentRegistry); try { - startProcess(config, executorService, processPipes, analyticsProcess); + startProcess(config, executorService, analyticsProcess); return analyticsProcess; } catch (IOException | EsRejectedExecutionException e) { String msg = "Failed to connect to data frame analytics process for job " + jobId; @@ -101,8 +101,8 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co } } - private void startProcess(DataFrameAnalyticsConfig config, ExecutorService executorService, ProcessPipes processPipes, - NativeAnalyticsProcess process) throws IOException { + private void startProcess(DataFrameAnalyticsConfig config, ExecutorService executorService, + NativeAnalyticsProcess process) throws IOException { if (config.getAnalysis().persistsState()) { IndexingStateProcessor stateProcessor = new IndexingStateProcessor(config.getId(), resultsPersisterService, auditor); process.start(executorService, stateProcessor); @@ -112,7 +112,7 @@ private void startProcess(DataFrameAnalyticsConfig config, ExecutorService execu } private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, List filesToDelete, - ProcessPipes processPipes, ExecutorService executorService) { + ProcessPipes processPipes) { AnalyticsBuilder analyticsBuilder = new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete); try { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java index 7a720dd8b5f50..f5964ec02603d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java @@ -45,6 +45,7 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -190,6 +191,20 @@ public void testRunJob_Ok() { verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, task); } + public void testRunJob_ProcessNotAliveAfterStart() { + when(process.isProcessAlive()).thenReturn(false); + when(task.getParams()).thenReturn( + new StartDataFrameAnalyticsAction.TaskParams("data_frame_id", Version.CURRENT, Collections.emptyList(), false)); + + processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory); + assertThat(processManager.getProcessContextCount(), equalTo(1)); + + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(Exception.class); + verify(task).setFailed(errorCaptor.capture()); + + assertThat(errorCaptor.getValue().getMessage(), equalTo("Failed to start data frame analytics process")); + } + public void testProcessContext_GetSetFailureReason() { AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig); assertThat(processContext.getFailureReason(), is(nullValue()));