Skip to content

Commit

Permalink
[ML] Close results stream before data frame analytics job stops
Browse files Browse the repository at this point in the history
Investigating the failures in elastic#67581 it looked like after restarting
the regression job the process was started but no data was loaded.
So the process was getting stuck waiting for data.

Looking into the code it looks like this can be explained by the
fact that AnalyticsResultProcessor counts down its completion
latch before it closes the results stream. This means the job
may go to stopped state while the out stream is still alive,
which on windows results to the directory with the named pipes
staying around. Then when the job is started again, which the
test does immediately, the old pipes are used and thus the
data is not sent to the the new process.

This commit fixes this by ensuring the process output stream
is consumed and closed when the anylytics process is closed.

Fixes elastic#67581
  • Loading branch information
dimitris-athanasiou committed Jan 22, 2021
1 parent ef24722 commit 17f75b6
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty_DependentVariableI
"classification_training_percent_is_50_boolean", BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES, "boolean");
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/67581")
public void testStopAndRestart() throws Exception {
initialize("classification_stop_and_restart");
String predictedClassField = KEYWORD_FIELD + "_prediction";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,10 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
private String sourceIndex;
private String destIndex;

@Before
public void setupLogging() {
client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put("logger.org.elasticsearch.xpack.ml.dataframe", "DEBUG"))
.get();
}

@After
public void cleanup() {
cleanUp();
client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.putNull("logger.org.elasticsearch.xpack.ml.dataframe"))
.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,19 @@ public void writeEndOfDataMessage() throws IOException {
public Iterator<Result> readAnalyticsResults() {
return resultsParser.parseResults(processOutStream());
}

@Override
public void close() throws IOException {
try {
super.close();
} finally {
// Unlike autodetect where closing the process input stream initiates
// termination and additional output from the process which forces us
// to close the output stream after we've finished processing its results,
// in analytics we wait until we've read all results and then we close the
// process. Thus, we can take care of consuming and closing the output
// stream within close itself.
consumeAndCloseOutputStream();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,6 @@ public interface AnalyticsProcess<ProcessResult> extends NativeProcess {
*/
Iterator<ProcessResult> readAnalyticsResults();

/**
* Read anything left in the stream before
* closing the stream otherwise if the process
* tries to write more after the close it gets
* a SIGPIPE
*/
void consumeAndCloseOutputStream();

/**
*
* @return the process config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public void process(AnalyticsProcess<AnalyticsResult> process) {
completeResultsProgress();
}
completionLatch.countDown();
process.consumeAndCloseOutputStream();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ private MemoryUsageEstimationResult runJob(String jobId,
jobId, e.getMessage(), process.readError()).getFormattedMessage();
throw ExceptionsHelper.serverError(errorMsg, e);
} finally {
process.consumeAndCloseOutputStream();
try {
LOGGER.debug("[{}] Closing process", jobId);
process.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,11 @@ protected boolean isProcessKilled() {
}

public void consumeAndCloseOutputStream() {
try {
try (InputStream outStream = processOutStream()) {
byte[] buff = new byte[512];
while (processOutStream().read(buff) >= 0) {
while (outStream.read(buff) >= 0) {
// Do nothing
}
processOutStream().close();
} catch (IOException e) {
// Given we are closing down the process there is no point propagating IO exceptions here
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public void testRunJob_NoResults() throws Exception {
InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
}
Expand All @@ -127,7 +126,6 @@ public void testRunJob_MultipleResults() throws Exception {
InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
}
Expand All @@ -146,7 +144,6 @@ public void testRunJob_OneResult_ParseException() throws Exception {
InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
}
Expand All @@ -164,7 +161,6 @@ public void testRunJob_FailsOnClose() throws Exception {

InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
inOrder.verify(process).readError();
verifyNoMoreInteractions(process, listener);
Expand All @@ -186,7 +182,6 @@ public void testRunJob_FailsOnClose_ProcessReportsError() throws Exception {

InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
inOrder.verify(process).readError();
verifyNoMoreInteractions(process, listener);
Expand All @@ -201,7 +196,6 @@ public void testRunJob_Ok() throws Exception {

InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
}
Expand Down

0 comments on commit 17f75b6

Please sign in to comment.