Skip to content

Commit

Permalink
[ML] Include node name when native controller cannot start process (#…
Browse files Browse the repository at this point in the history
…42225)

This adds the node name where we fail to start a process via the native
controller to facilitate debugging as otherwise it might not be known
to which node the job was allocated.
  • Loading branch information
dimitris-athanasiou authored May 22, 2019
1 parent 610230f commit fccb7a2
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
NormalizerProcessFactory normalizerProcessFactory;
if (MachineLearningField.AUTODETECT_PROCESS.get(settings) && MachineLearningFeatureSet.isRunningOnMlPlatform(true)) {
try {
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), environment);
if (nativeController == null) {
// This will only only happen when path.home is not set, which is disallowed in production
throw new ElasticsearchException("Failed to create native process controller for Machine Learning");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public MachineLearningFeatureSet(Environment environment, ClusterService cluster
if (enabled && XPackPlugin.transportClientMode(environment.settings()) == false) {
try {
if (isRunningOnMlPlatform(true)) {
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(),
environment);
if (nativeController != null) {
nativeCodeInfo = nativeController.getNativeCodeInfo();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
public class MlLifeCycleService {

private final Environment environment;
private final ClusterService clusterService;
private final DatafeedManager datafeedManager;
private final AutodetectProcessManager autodetectProcessManager;
private final MlMemoryTracker memoryTracker;

public MlLifeCycleService(Environment environment, ClusterService clusterService, DatafeedManager datafeedManager,
AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) {
this.environment = environment;
this.clusterService = clusterService;
this.datafeedManager = datafeedManager;
this.autodetectProcessManager = autodetectProcessManager;
this.memoryTracker = memoryTracker;
Expand All @@ -46,7 +48,7 @@ public synchronized void stop() {
if (datafeedManager != null) {
datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
}
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), environment);
if (nativeController != null) {
// This kills autodetect processes WITHOUT closing the jobs, so they get reallocated.
if (autodetectProcessManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,17 @@ public class NativeController {

public static final Map<String, Object> UNKNOWN_NATIVE_CODE_INFO = Map.of("version", "N/A", "build_hash", "N/A");

private final String localNodeName;
private final CppLogMessageHandler cppLogHandler;
private final OutputStream commandStream;

NativeController(Environment env, NamedPipeHelper namedPipeHelper) throws IOException {
NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper) throws IOException {
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER, null,
true, true, false, false, false, false);
processPipes.connectStreams(CONTROLLER_CONNECT_TIMEOUT);
cppLogHandler = new CppLogMessageHandler(null, processPipes.getLogStream().get());
commandStream = new BufferedOutputStream(processPipes.getCommandStream().get());
this.localNodeName = localNodeName;
this.cppLogHandler = new CppLogMessageHandler(null, processPipes.getLogStream().get());
this.commandStream = new BufferedOutputStream(processPipes.getCommandStream().get());
}

void tailLogsInThread() {
Expand Down Expand Up @@ -98,7 +100,8 @@ public void startProcess(List<String> command) throws IOException {
}

if (cppLogHandler.hasLogStreamEnded()) {
String msg = "Cannot start process [" + command.get(0) + "]: native controller process has stopped";
String msg = "Cannot start process [" + command.get(0) + "]: native controller process has stopped on node ["
+ localNodeName + "]";
LOGGER.error(msg);
throw new ElasticsearchException(msg);
}
Expand All @@ -124,7 +127,8 @@ public void killProcess(long pid) throws TimeoutException, IOException {
}

if (cppLogHandler.hasLogStreamEnded()) {
String msg = "Cannot kill process with PID [" + pid + "]: native controller process has stopped";
String msg = "Cannot kill process with PID [" + pid + "]: native controller process has stopped on node ["
+ localNodeName + "]";
LOGGER.error(msg);
throw new ElasticsearchException(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ private NativeControllerHolder() {
*
* Calls may throw an exception if initial connection to the C++ process fails.
*/
public static NativeController getNativeController(Environment environment) throws IOException {
public static NativeController getNativeController(String localNodeName, Environment environment) throws IOException {

if (MachineLearningField.AUTODETECT_PROCESS.get(environment.settings())) {
synchronized (lock) {
if (nativeController == null) {
nativeController = new NativeController(environment, new NamedPipeHelper());
nativeController = new NativeController(localNodeName, environment, new NamedPipeHelper());
nativeController.tailLogsInThread();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

public class NativeControllerTests extends ESTestCase {

private static final String NODE_NAME = "native-controller-tests-node";

private static final String TEST_MESSAGE = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211,"
+ "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) "
+ "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n";
Expand All @@ -50,7 +52,7 @@ public void testStartProcessCommand() throws IOException {
command.add("--arg2=42");
command.add("--arg3=something with spaces");

NativeController nativeController = new NativeController(TestEnvironment.newEnvironment(settings), namedPipeHelper);
NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper);
nativeController.startProcess(command);

assertEquals("start\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n",
Expand All @@ -65,7 +67,7 @@ public void testGetNativeCodeInfo() throws IOException, TimeoutException {
ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream);

NativeController nativeController = new NativeController(TestEnvironment.newEnvironment(settings), namedPipeHelper);
NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper);
nativeController.tailLogsInThread();
Map<String, Object> nativeCodeInfo = nativeController.getNativeCodeInfo();

Expand All @@ -83,15 +85,16 @@ public void testControllerDeath() throws Exception {
ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream);

NativeController nativeController = new NativeController(TestEnvironment.newEnvironment(settings), namedPipeHelper);
NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper);
nativeController.tailLogsInThread();

// As soon as the log stream ends startProcess should think the native controller has died
assertBusy(() -> {
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> nativeController.startProcess(Collections.singletonList("my process")));

assertEquals("Cannot start process [my process]: native controller process has stopped", e.getMessage());
assertEquals("Cannot start process [my process]: native controller process has stopped on node " +
"[native-controller-tests-node]", e.getMessage());
});
}
}

0 comments on commit fccb7a2

Please sign in to comment.