Skip to content

Commit

Permalink
[7.x][ML] Handle IOException while closing DFA process while task is …
Browse files Browse the repository at this point in the history
…stopping (#69910) (#69955)

This handles an edge case where a data frame analytics job is stopped
while the process is being closed. As the process gets killed, the process
close may fail with a broken pipe IOException. We should ignore that IOException
instead of set the job to the failed state.

Backport of #69910

Relates #67581
  • Loading branch information
dimitris-athanasiou authored Mar 4, 2021
1 parent 34524c0 commit 86c7d74
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchModule;
Expand All @@ -36,20 +36,18 @@
import org.elasticsearch.xpack.core.ml.inference.preprocessing.OneHotEncoding;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.PreProcessor;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -80,23 +78,9 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
private String sourceIndex;
private String destIndex;

@Before
public void setupLogging() {
client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put("logger.org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler", "DEBUG"))
.get();
}

@After
public void cleanup() {
cleanUp();
client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.putNull("logger.org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler"))
.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,16 @@ private void closeProcess(DataFrameAnalyticsTask task) {
processContext.process.get().close();
LOGGER.info("[{}] Closed process", configId);
} catch (Exception e) {
LOGGER.error("[" + configId + "] Error closing data frame analyzer process", e);
String errorMsg = new ParameterizedMessage(
"[{}] Error closing data frame analyzer process [{}]", configId, e.getMessage()).getFormattedMessage();
processContext.setFailureReason(errorMsg);
if (task.isStopping()) {
LOGGER.debug(() -> new ParameterizedMessage(
"[{}] Process closing was interrupted by kill request due to the task being stopped", configId), e);
LOGGER.info("[{}] Closed process", configId);
} else {
LOGGER.error("[" + configId + "] Error closing data frame analyzer process", e);
String errorMsg = new ParameterizedMessage(
"[{}] Error closing data frame analyzer process [{}]", configId, e.getMessage()).getFormattedMessage();
processContext.setFailureReason(errorMsg);
}
}
}

Expand Down

0 comments on commit 86c7d74

Please sign in to comment.