Skip to content

Commit

Permalink
[ML] Wait for controller to respond to commands (elastic#63542)
Browse files Browse the repository at this point in the history
This change makes threads that send a command to the ML
controller process wait for it to respond to the command.
Previously such threads would block until the command was
sent, but not until it was actioned.  This was on the
assumption that the sort of commands being sent would be
actioned almost instantaneously, but that assumption has
been shown to be false when anti-malware software is
running.

Relates elastic/ml-cpp#1520
Fixes elastic#62823
  • Loading branch information
droberts195 authored Oct 17, 2020
1 parent ee892a9 commit 65ceee8
Show file tree
Hide file tree
Showing 85 changed files with 369 additions and 2,276 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.client;

import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.get.GetRequest;
Expand Down Expand Up @@ -220,7 +219,6 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/63542")
public class MachineLearningIT extends ESRestHighLevelClientTestCase {

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.elasticsearch.client.documentation;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -245,7 +244,6 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/63542")
public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {

@After
Expand Down
5 changes: 0 additions & 5 deletions x-pack/plugin/ml/qa/basic-multi-node/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,3 @@ testClusters.all {
setting 'indices.lifecycle.history_index_enabled', 'false'
setting 'slm.history_index_enabled', 'false'
}

// Entire suite muted while https://github.com/elastic/elasticsearch/pull/63542
// and https://github.com/elastic/ml-cpp/pull/1520 are merged
javaRestTest.enabled = false

3 changes: 0 additions & 3 deletions x-pack/plugin/ml/qa/ml-with-security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,6 @@ yamlRestTest {
].join(',')
}

// Entire suite muted while https://github.com/elastic/elasticsearch/pull/63542
// and https://github.com/elastic/ml-cpp/pull/1520 are merged
yamlRestTest.enabled = false

testClusters.all {
testDistribution = 'DEFAULT'
Expand Down
4 changes: 0 additions & 4 deletions x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ javaRestTest {
systemProperty 'es.set.netty.runtime.available.processors', 'false'
}

// Entire suite muted while https://github.com/elastic/elasticsearch/pull/63542
// and https://github.com/elastic/ml-cpp/pull/1520 are merged
javaRestTest.enabled = false

testClusters.all {
numberOfNodes = 3
testDistribution = 'DEFAULT'
Expand Down
5 changes: 0 additions & 5 deletions x-pack/plugin/ml/qa/single-node-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,3 @@ testClusters.all {
setting 'xpack.security.enabled', 'false'
setting 'xpack.license.self_generated.type', 'trial'
}

// Entire suite muted while https://github.com/elastic/elasticsearch/pull/63542
// and https://github.com/elastic/ml-cpp/pull/1520 are merged
javaRestTest.enabled = false

Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
AnalyticsProcessFactory<MemoryUsageEstimationResult> memoryEstimationProcessFactory;
if (MachineLearningField.AUTODETECT_PROCESS.get(settings)) {
try {
NativeController nativeController = NativeController.makeNativeController(clusterService.getNodeName(), environment);
NativeController nativeController =
NativeController.makeNativeController(clusterService.getNodeName(), environment, xContentRegistry);
autodetectProcessFactory = new NativeAutodetectProcessFactory(
environment,
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public AnalyticsBuilder performMemoryUsageEstimationOnly() {
return this;
}

public void build() throws IOException {
public void build() throws IOException, InterruptedException {
List<String> command = buildAnalyticsCommand();
processPipes.addArgs(command);
nativeController.startProcess(command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public AnalyticsProcessManager(Settings settings,
this(
settings,
client,
threadPool.generic(),
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME),
threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME),
analyticsProcessFactory,
auditor,
Expand Down Expand Up @@ -451,7 +451,7 @@ synchronized void stop() {
}
if (process.get() != null) {
try {
process.get().kill();
process.get().kill(true);
} catch (IOException e) {
LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", config.getId()), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,11 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP
new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete);
try {
analyticsBuilder.build();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("[{}] Interrupted while launching data frame analytics process", jobId);
} catch (IOException e) {
String msg = "Failed to launch data frame analytics process for job " + jobId;
String msg = "[" + jobId + "] Failed to launch data frame analytics process";
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,11 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP
.performMemoryUsageEstimationOnly();
try {
analyticsBuilder.build();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("[{}] Interrupted while launching data frame analytics memory usage estimation process", jobId);
} catch (IOException e) {
String msg = "Failed to launch data frame analytics memory usage estimation process for job " + jobId;
String msg = "[" + jobId + "] Failed to launch data frame analytics memory usage estimation process";
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public AutodetectBuilder scheduledEvents(List<ScheduledEvent> scheduledEvents) {
/**
* Requests that the controller daemon start an autodetect process.
*/
public void build() throws IOException {
public void build() throws IOException, InterruptedException {

List<String> command = buildAutodetectCommand();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void killProcess(boolean awaitCompletion, boolean finish, boolean finaliz
processKilled = true;
autodetectResultProcessor.setProcessKilled();
autodetectWorkerExecutor.shutdown();
autodetectProcess.kill();
autodetectProcess.kill(awaitCompletion);

if (awaitCompletion) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void close() {
}

@Override
public void kill() {
public void kill(boolean awaitCompletion) {
open = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,11 @@ void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipe
autodetectBuilder.quantiles(autodetectParams.quantiles());
}
autodetectBuilder.build();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("[{}] Interrupted while launching autodetect", job.getId());
} catch (IOException e) {
String msg = "Failed to launch autodetect for job " + job.getId();
String msg = "[" + job.getId() + "] Failed to launch autodetect";
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void flushStream() {
}

@Override
public void kill() {
public void kill(boolean awaitCompletion) {
// Nothing to do
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ private void createNativeProcess(String jobId, String quantilesState, ProcessPip
List<String> command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build();
processPipes.addArgs(command);
nativeController.startProcess(command);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("[{}] Interrupted while launching normalizer", jobId);
} catch (IOException e) {
String msg = "Failed to launch normalizer for job " + jobId;
String msg = "[" + jobId + "] Failed to launch normalizer";
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,19 +206,22 @@ public void close() throws IOException {
}

@Override
public void kill() throws IOException {
public void kill(boolean awaitCompletion) throws IOException {
LOGGER.debug("[{}] Killing {} process", jobId, getName());
processKilled = true;
try {
// The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID.
// Without the PID we cannot kill the process.
nativeController.killProcess(cppLogHandler().getPid(processPipes.getTimeout()));
nativeController.killProcess(cppLogHandler().getPid(processPipes.getTimeout()), awaitCompletion);

// Wait for the process to die before closing processInStream as if the process
// is still alive when processInStream is closed it may start persisting state
cppLogHandler().waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT);
} catch (TimeoutException e) {
LOGGER.warn("[{}] Failed to get PID of {} process to kill", jobId, getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("[{}] Interrupted while killing {} process", jobId, getName());
} finally {
try {
if (processInStream() != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.process;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

public class ControllerResponse implements ToXContentObject {

public static final ParseField TYPE = new ParseField("controller_response");

public static final ParseField COMMAND_ID = new ParseField("id");
public static final ParseField SUCCESS = new ParseField("success");
public static final ParseField REASON = new ParseField("reason");

public static final ConstructingObjectParser<ControllerResponse, Void> PARSER = new ConstructingObjectParser<>(
TYPE.getPreferredName(), a -> new ControllerResponse((int) a[0], (boolean) a[1], (String) a[2]));

static {
PARSER.declareInt(ConstructingObjectParser.constructorArg(), COMMAND_ID);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), SUCCESS);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
}

private final int commandId;
private final boolean success;
private final String reason;

ControllerResponse(int commandId, boolean success, String reason) {
this.commandId = commandId;
this.success = success;
this.reason = reason;
}

public int getCommandId() {
return commandId;
}

public boolean isSuccess() {
return success;
}

public String getReason() {
return reason;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(COMMAND_ID.getPreferredName(), commandId);
builder.field(SUCCESS.getPreferredName(), success);
if (reason != null) {
builder.field(REASON.getPreferredName(), reason);
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ControllerResponse that = (ControllerResponse) o;
return this.commandId == that.commandId &&
this.success == that.success &&
Objects.equals(this.reason, that.reason);
}

@Override
public int hashCode() {
return Objects.hash(commandId, success, reason);
}
}
Loading

0 comments on commit 65ceee8

Please sign in to comment.