Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Close results stream before data frame analytics job stops #67828

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty_DependentVariableI
"classification_training_percent_is_50_boolean", BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES, "boolean");
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/67581")
public void testStopAndRestart() throws Exception {
initialize("classification_stop_and_restart");
String predictedClassField = KEYWORD_FIELD + "_prediction";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.OneHotEncoding;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.PreProcessor;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -69,23 +67,9 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
private String sourceIndex;
private String destIndex;

@Before
public void setupLogging() {
client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put("logger.org.elasticsearch.xpack.ml.dataframe", "DEBUG"))
.get();
}

@After
public void cleanup() {
cleanUp();
client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.putNull("logger.org.elasticsearch.xpack.ml.dataframe"))
.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,6 @@ public interface AnalyticsProcess<ProcessResult> extends NativeProcess {
*/
Iterator<ProcessResult> readAnalyticsResults();

/**
* Read anything left in the stream before
* closing the stream otherwise if the process
* tries to write more after the close it gets
* a SIGPIPE
*/
void consumeAndCloseOutputStream();

/**
*
* @return the process config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.elasticsearch.xpack.ml.dataframe.process.results.TrainedModelDefinitionChunk;
import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder;
import org.elasticsearch.xpack.ml.dataframe.stats.StatsPersister;
import org.elasticsearch.xpack.ml.inference.modelsize.ModelSizeInfo;
import org.elasticsearch.xpack.ml.extractor.ExtractedFields;
import org.elasticsearch.xpack.ml.inference.modelsize.ModelSizeInfo;
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;

Expand Down Expand Up @@ -117,7 +117,6 @@ public void process(AnalyticsProcess<AnalyticsResult> process) {
completeResultsProgress();
}
completionLatch.countDown();
process.consumeAndCloseOutputStream();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the offending code piece. Latch down was counted down before we close output stream.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ private MemoryUsageEstimationResult runJob(String jobId,
jobId, e.getMessage(), process.readError()).getFormattedMessage();
throw ExceptionsHelper.serverError(errorMsg, e);
} finally {
process.consumeAndCloseOutputStream();
try {
LOGGER.debug("[{}] Closing process", jobId);
process.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,4 @@ public interface AutodetectProcess extends NativeProcess {
* @return stream of autodetect results.
*/
Iterator<AutodetectResult> readAutodetectResults();

/**
* Read anything left in the stream before
* closing the stream otherwise if the process
* tries to write more after the close it gets
* a SIGPIPE
*/
void consumeAndCloseOutputStream();
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,6 @@ public AutodetectResult next() {
};
}

@Override
public void consumeAndCloseOutputStream() {
}

@Override
public ZonedDateTime getProcessStartTime() {
return startTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,24 +187,20 @@ public void process() {

private void readResults() {
currentRunBucketCount = 0;
try {
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
while (iterator.hasNext()) {
try {
AutodetectResult result = iterator.next();
processResult(result);
if (result.getBucket() != null) {
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount);
}
} catch (Exception e) {
if (isAlive() == false) {
throw e;
}
LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e);
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
while (iterator.hasNext()) {
try {
AutodetectResult result = iterator.next();
processResult(result);
if (result.getBucket() != null) {
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount);
}
} catch (Exception e) {
if (isAlive() == false) {
throw e;
}
LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e);
}
} finally {
process.consumeAndCloseOutputStream();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,19 @@ public void process() {
}

private void readResults() {
try {
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
while (iterator.hasNext()) {
try {
AutodetectResult result = iterator.next();
processResult(result);
} catch (Exception e) {
if (isAlive() == false) {
throw e;
}
LOGGER.warn(
new ParameterizedMessage("[{}] [{}] Error processing model snapshot upgrade result", jobId, snapshotId),
e);
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
while (iterator.hasNext()) {
try {
AutodetectResult result = iterator.next();
processResult(result);
} catch (Exception e) {
if (isAlive() == false) {
throw e;
}
LOGGER.warn(
new ParameterizedMessage("[{}] [{}] Error processing model snapshot upgrade result", jobId, snapshotId),
e);
}
} finally {
process.consumeAndCloseOutputStream();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,16 @@ public void close() throws IOException {
if (processInStream() != null) {
processInStream().close();
}

// wait for the process to exit by waiting for end-of-file on the named pipe connected
// to the state processor - it may take a long time for all the model state to be
// indexed
if (stateProcessorFuture != null) {
stateProcessorFuture.get(MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES);
}

consumeAndCloseOutputStream();

// the log processor should have stopped by now too - assume processing the logs will
// take no more than 5 seconds longer than processing the state (usually it should
// finish first)
Expand Down Expand Up @@ -312,13 +316,12 @@ protected boolean isProcessKilled() {
return processKilled;
}

public void consumeAndCloseOutputStream() {
try {
void consumeAndCloseOutputStream() {
try (InputStream outStream = processOutStream()) {
byte[] buff = new byte[512];
while (processOutStream().read(buff) >= 0) {
while (outStream.read(buff) >= 0) {
// Do nothing
}
processOutStream().close();
} catch (IOException e) {
// Given we are closing down the process there is no point propagating IO exceptions here
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public void testRunJob_NoResults() throws Exception {
InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
}
Expand All @@ -127,7 +126,6 @@ public void testRunJob_MultipleResults() throws Exception {
InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
}
Expand All @@ -146,7 +144,6 @@ public void testRunJob_OneResult_ParseException() throws Exception {
InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
}
Expand All @@ -164,7 +161,6 @@ public void testRunJob_FailsOnClose() throws Exception {

InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
inOrder.verify(process).readError();
verifyNoMoreInteractions(process, listener);
Expand All @@ -186,7 +182,6 @@ public void testRunJob_FailsOnClose_ProcessReportsError() throws Exception {

InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
inOrder.verify(process).readError();
verifyNoMoreInteractions(process, listener);
Expand All @@ -201,7 +196,6 @@ public void testRunJob_Ok() throws Exception {

InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.process.IndexingStateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectControlMsgWriter;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.process.IndexingStateProcessor;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -37,7 +37,6 @@
import java.util.concurrent.Future;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -152,19 +151,6 @@ public void testPersistJob() throws IOException {
testWriteMessage(NativeAutodetectProcess::persistState, AutodetectControlMsgWriter.BACKGROUND_PERSIST_MESSAGE_CODE);
}

@SuppressWarnings("unchecked")
public void testConsumeAndCloseOutputStream() throws IOException {

try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
processPipes, NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {

process.start(executorService);
process.consumeAndCloseOutputStream();
assertThat(outputStream.available(), equalTo(0));
}
}

@SuppressWarnings("unchecked")
private void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunction, String expectedMessageCode) throws IOException {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
Expand Down Expand Up @@ -78,8 +79,9 @@ public void initialize() throws IOException {
}

@After
public void terminateExecutorService() {
public void terminateExecutorService() throws IOException {
ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS);
assertThat(processPipes.getProcessOutStream().get().available(), equalTo(0));
verifyNoMoreInteractions(onProcessCrash);
}

Expand Down