Skip to content

Commit

Permalink
[ML] Make controller send responses for each command received (#1520)
Browse files Browse the repository at this point in the history
This change makes the controller process respond to each command
it receives with a document indicating whether that command was
successfully executed or not.

This response will be used by the Java side of the connection to
determine when it is appropriate to move on to the next phase of
the action that the controller command was part of.  For example,
when starting a process and connecting named pipes to it it is
best that the named pipe connections are not attempted until the
process is confirmed to be started.

Relates elastic/elasticsearch#62823
  • Loading branch information
droberts195 authored Oct 17, 2020
1 parent 5710d8a commit d90ff2e
Show file tree
Hide file tree
Showing 13 changed files with 412 additions and 153 deletions.
58 changes: 29 additions & 29 deletions bin/autodetect/CCmdLineParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,68 +64,68 @@ bool CCmdLineParser::parse(int argc,
("help", "Display this information and exit")
("version", "Display version information and exit")
("config", boost::program_options::value<std::string>(),
"The job configuration file")
"The job configuration file")
("limitconfig", boost::program_options::value<std::string>(),
"Optional limit config file")
"Optional limit config file")
("modelconfig", boost::program_options::value<std::string>(),
"Optional model config file")
"Optional model config file")
("fieldconfig", boost::program_options::value<std::string>(),
"Optional field config file")
"Optional field config file")
("modelplotconfig", boost::program_options::value<std::string>(),
"Optional model plot config file")
"Optional model plot config file")
("jobid", boost::program_options::value<std::string>(),
"ID of the job this process is associated with")
"ID of the job this process is associated with")
("logProperties", boost::program_options::value<std::string>(),
"Optional logger properties file")
"Optional logger properties file")
("logPipe", boost::program_options::value<std::string>(),
"Optional log to named pipe")
"Optional log to named pipe")
("bucketspan", boost::program_options::value<core_t::TTime>(),
"Optional aggregation bucket span (in seconds) - default is 300")
"Optional aggregation bucket span (in seconds) - default is 300")
("latency", boost::program_options::value<core_t::TTime>(),
"Optional maximum delay for out-of-order records (in seconds) - default is 0")
"Optional maximum delay for out-of-order records (in seconds) - default is 0")
("summarycountfield", boost::program_options::value<std::string>(),
"Optional field to that contains counts for pre-summarized input - default is none")
"Optional field to that contains counts for pre-summarized input - default is none")
("delimiter", boost::program_options::value<char>(),
"Optional delimiter character for delimited data formats - default is '\t' (tab separated)")
"Optional delimiter character for delimited data formats - default is '\t' (tab separated)")
("lengthEncodedInput",
"Take input in length encoded binary format - default is delimited")
"Take input in length encoded binary format - default is delimited")
("timefield", boost::program_options::value<std::string>(),
"Optional name of the field containing the timestamp - default is 'time'")
"Optional name of the field containing the timestamp - default is 'time'")
("timeformat", boost::program_options::value<std::string>(),
"Optional format of the date in the time field in strptime code - default is the epoch time in seconds")
"Optional format of the date in the time field in strptime code - default is the epoch time in seconds")
("quantilesState", boost::program_options::value<std::string>(),
"Optional file to quantiles for normalization")
"Optional file to quantiles for normalization")
("deleteStateFiles",
"If the 'quantilesState' option is used and this flag is set then delete the model state files once they have been read")
"If the 'quantilesState' option is used and this flag is set then delete the model state files once they have been read")
("namedPipeConnectTimeout", boost::program_options::value<core_t::TTime>(),
"Optional timeout (in seconds) for connecting named pipes on startup - default is 300 seconds")
"Optional timeout (in seconds) for connecting named pipes on startup - default is 300 seconds")
("input", boost::program_options::value<std::string>(),
"Optional file to read input from - not present means read from STDIN")
"Optional file to read input from - not present means read from STDIN")
("inputIsPipe", "Specified input file is a named pipe")
("output", boost::program_options::value<std::string>(),
"Optional file to write output to - not present means write to STDOUT")
"Optional file to write output to - not present means write to STDOUT")
("outputIsPipe", "Specified output file is a named pipe")
("restore", boost::program_options::value<std::string>(),
"Optional file to restore state from - not present means no state restoration")
"Optional file to restore state from - not present means no state restoration")
("restoreIsPipe", "Specified restore file is a named pipe")
("persist", boost::program_options::value<std::string>(),
"Optional file to persist state to - not present means no state persistence")
"Optional file to persist state to - not present means no state persistence")
("persistIsPipe", "Specified persist file is a named pipe")
("persistInterval", boost::program_options::value<core_t::TTime>(),
"Optional time interval at which to periodically persist model state (Mutually exclusive with bucketPersistInterval)")
"Optional time interval at which to periodically persist model state (Mutually exclusive with bucketPersistInterval)")
("persistInForeground", "Persistence occurs in the foreground. Defaults to background persistence.")
("bucketPersistInterval", boost::program_options::value<std::size_t>(),
"Optional number of buckets after which to periodically persist model state (Mutually exclusive with persistInterval)")
"Optional number of buckets after which to periodically persist model state (Mutually exclusive with persistInterval)")
("maxQuantileInterval", boost::program_options::value<core_t::TTime>(),
"Optional interval at which to periodically output quantiles if they have not been output due to an anomaly - if not specified then quantiles will only be output following a big anomaly")
"Optional interval at which to periodically output quantiles if they have not been output due to an anomaly - if not specified then quantiles will only be output following a big anomaly")
("maxAnomalyRecords", boost::program_options::value<std::size_t>(),
"The maximum number of records to be outputted for each bucket. Defaults to 100, a value 0 removes the limit.")
"The maximum number of records to be outputted for each bucket. Defaults to 100, a value 0 removes the limit.")
("memoryUsage",
"Log the model memory usage at the end of the job")
"Log the model memory usage at the end of the job")
("multivariateByFields",
"Optional flag to enable multi-variate analysis of correlated by fields")
"Optional flag to enable multi-variate analysis of correlated by fields")
("stopCategorizationOnWarnStatus",
"Optional flag to stop categorization for partitions where the status is 'warn'.")
"Optional flag to stop categorization for partitions where the status is 'warn'.")
;
// clang-format on

Expand Down
14 changes: 10 additions & 4 deletions bin/controller/CCmdLineParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@ bool CCmdLineParser::parse(int argc,
const char* const* argv,
std::string& jvmPidStr,
std::string& logPipe,
std::string& commandPipe) {
std::string& commandPipe,
std::string& outputPipe) {
try {
boost::program_options::options_description desc(DESCRIPTION);
// clang-format off
desc.add_options()
("help", "Display this information and exit")
("version", "Display version information and exit")
("jvmPid", boost::program_options::value<std::string>(),
"Process ID of the JVM to communicate with - default is parent process PID")
"Process ID of the JVM to communicate with - default is parent process PID")
("logPipe", boost::program_options::value<std::string>(),
"Named pipe to log to - default is controller_log_<JVM PID>")
"Named pipe to log to - default is controller_log_<JVM PID>")
("commandPipe", boost::program_options::value<std::string>(),
"Named pipe to accept commands from - default is controller_command_<JVM PID>")
"Named pipe to accept commands from - default is controller_command_<JVM PID>")
("outputPipe", boost::program_options::value<std::string>(),
"Named pipe to output responses to - default is controller_output_<JVM PID>")
;
// clang-format on

Expand All @@ -59,6 +62,9 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("commandPipe") > 0) {
commandPipe = vm["commandPipe"].as<std::string>();
}
if (vm.count("outputPipe") > 0) {
outputPipe = vm["outputPipe"].as<std::string>();
}
} catch (std::exception& e) {
std::cerr << "Error processing command line: " << e.what() << std::endl;
return false;
Expand Down
3 changes: 2 additions & 1 deletion bin/controller/CCmdLineParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class CCmdLineParser {
const char* const* argv,
std::string& jvmPidStr,
std::string& logPipe,
std::string& commandPipe);
std::string& commandPipe,
std::string& outputPipe);

private:
static const std::string DESCRIPTION;
Expand Down
80 changes: 51 additions & 29 deletions bin/controller/CCommandProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ namespace ml {
namespace controller {

// Initialise statics
const std::string CCommandProcessor::START("start");
const std::string CCommandProcessor::KILL("kill");
const std::string CCommandProcessor::START{"start"};
const std::string CCommandProcessor::KILL{"kill"};

CCommandProcessor::CCommandProcessor(const TStrVec& permittedProcessPaths)
: m_Spawner(permittedProcessPaths) {
CCommandProcessor::CCommandProcessor(const TStrVec& permittedProcessPaths,
std::ostream& responseStream)
: m_Spawner{permittedProcessPaths}, m_ResponseWriter{responseStream} {
}

void CCommandProcessor::processCommands(std::istream& stream) {
void CCommandProcessor::processCommands(std::istream& commandStream) {
std::string command;
while (std::getline(stream, command)) {
if (!command.empty()) {
while (std::getline(commandStream, command)) {
if (command.empty() == false) {
this->handleCommand(command);
}
}
Expand All @@ -41,61 +42,82 @@ void CCommandProcessor::processCommands(std::istream& stream) {
bool CCommandProcessor::handleCommand(const std::string& command) {
// Command lines must be tab-separated
TStrVec tokens;
std::string remainder;
core::CStringUtils::tokenise(TAB, command, tokens, remainder);
if (!remainder.empty()) {
tokens.push_back(remainder);
{
std::string remainder;
core::CStringUtils::tokenise(TAB, command, tokens, remainder);
if (remainder.empty() == false) {
tokens.emplace_back(std::move(remainder));
}
}

// Multiple consecutive tabs might have caused empty tokens
tokens.erase(std::remove(tokens.begin(), tokens.end(), EMPTY_STRING), tokens.end());

if (tokens.empty()) {
LOG_DEBUG(<< "Ignoring empty command");
if (tokens.size() < 3) {
if (tokens.empty() == false) {
LOG_ERROR(<< "Ignoring command with only " << tokens.size()
<< ((tokens.size() == 1) ? " token" : " tokens"));
}
return false;
}

// Split into verb and other tokens
std::string verb(tokens[0]);
tokens.erase(tokens.begin());
// Split into ID, verb and other tokens
std::uint32_t id{0};
if (core::CStringUtils::stringToType(tokens[0], id) == false || id == 0) {
LOG_ERROR(<< "Invalid command ID in " << core::CContainerPrinter::print(tokens));
return false;
}

std::string verb{std::move(tokens[1])};
tokens.erase(tokens.begin(), tokens.begin() + 2);

if (verb == START) {
return this->handleStart(tokens);
return this->handleStart(id, std::move(tokens));
}
if (verb == KILL) {
return this->handleKill(tokens);
return this->handleKill(id, std::move(tokens));
}

LOG_ERROR(<< "Did not understand verb '" << verb << '\'');
std::string error{"Did not understand verb '" + verb + '\''};
LOG_ERROR(<< error << " in command with ID " << id);
m_ResponseWriter.writeResponse(id, false, error);
return false;
}

bool CCommandProcessor::handleStart(TStrVec& tokens) {
std::string processPath;
processPath.swap(tokens[0]);
bool CCommandProcessor::handleStart(std::uint32_t id, TStrVec tokens) {
std::string processPath{std::move(tokens[0])};
tokens.erase(tokens.begin());

if (m_Spawner.spawn(processPath, tokens) == false) {
LOG_ERROR(<< "Failed to start process '" << processPath << '\'');
std::string error{"Failed to start process '" + processPath + '\''};
LOG_ERROR(<< error << " in command with ID " << id);
m_ResponseWriter.writeResponse(id, false, error);
return false;
}

m_ResponseWriter.writeResponse(id, true, "Process '" + processPath + "' started");
return true;
}

bool CCommandProcessor::handleKill(TStrVec& tokens) {
core::CProcess::TPid pid = 0;
if (tokens.size() != 1 || core::CStringUtils::stringToType(tokens[0], pid) == false) {
LOG_ERROR(<< "Unexpected arguments for kill command: "
<< core::CContainerPrinter::print(tokens));
bool CCommandProcessor::handleKill(std::uint32_t id, TStrVec tokens) {
core::CProcess::TPid pid{0};
if (tokens.size() != 1 ||
core::CStringUtils::stringToType(tokens[0], pid) == false || pid == 0) {
std::string error{"Unexpected arguments for kill command: " +
core::CContainerPrinter::print(tokens)};
LOG_ERROR(<< error << " in command with ID " << id);
m_ResponseWriter.writeResponse(id, false, error);
return false;
}

if (m_Spawner.terminateChild(pid) == false) {
LOG_ERROR(<< "Failed to kill process with PID " << pid);
std::string error{"Failed to kill process with PID " + tokens[0]};
LOG_ERROR(<< error << " in command with ID " << id);
m_ResponseWriter.writeResponse(id, false, error);
return false;
}

m_ResponseWriter.writeResponse(id, true, "Process with PID " + tokens[0] + " killed");
return true;
}
}
Expand Down
30 changes: 20 additions & 10 deletions bin/controller/CCommandProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

#include <core/CDetachedProcessSpawner.h>

#include "CResponseJsonWriter.h"

#include <cstdint>
#include <iosfwd>
#include <string>
#include <vector>
Expand All @@ -25,7 +28,11 @@ namespace controller {
//! command to be executed.
//!
//! Each command has the following format:
//! verb arguments...
//! ID verb arguments...
//!
//! The ID is expected to be a unique positive integer. This is reported
//! in error messages and in the response objects that are sent when the
//! command is complete.
//!
//! Available verbs are:
//! 1) start - in this case the arguments consist of the process name
Expand All @@ -51,30 +58,33 @@ class CCommandProcessor {
static const std::string KILL;

public:
CCommandProcessor(const TStrVec& permittedProcessPaths);
CCommandProcessor(const TStrVec& permittedProcessPaths, std::ostream& responseStream);

//! Action commands read from the supplied \p stream until end-of-file
//! is reached.
void processCommands(std::istream& stream);
//! Action commands read from the supplied \p commandStream until
//! end-of-file is reached.
void processCommands(std::istream& commandStream);

//! Parse and handle a single command.
bool handleCommand(const std::string& command);

private:
//! Handle a start command.
//! \param tokens Tokens to the command excluding the verb. Passed
//! non-const so that this method can manipulate the
//! tokens without having to copy.
bool handleStart(TStrVec& tokens);
//! \param id The command ID.
//! \param tokens Tokens to the command excluding the command ID and verb.
bool handleStart(std::uint32_t id, TStrVec tokens);

//! Handle a kill command.
//! \param id The command ID.
//! \param tokens Expected to contain one element, namely the process
//! ID of the process to be killed.
bool handleKill(TStrVec& tokens);
bool handleKill(std::uint32_t id, TStrVec tokens);

private:
//! Used to spawn/kill the requested processes.
core::CDetachedProcessSpawner m_Spawner;

//! Used to write responses in JSON format to the response stream.
CResponseJsonWriter m_ResponseWriter;
};
}
}
Expand Down
41 changes: 41 additions & 0 deletions bin/controller/CResponseJsonWriter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

#include "CResponseJsonWriter.h"

#include <core/CLogger.h>

#include <ios>

namespace ml {
namespace controller {
namespace {

// JSON field names
const std::string ID{"id"};
const std::string SUCCESS{"success"};
const std::string REASON{"reason"};
}

CResponseJsonWriter::CResponseJsonWriter(std::ostream& responseStream)
: m_WrappedOutputStream(responseStream), m_Writer{m_WrappedOutputStream} {
}

void CResponseJsonWriter::writeResponse(std::uint32_t id, bool success, const std::string& reason) {
m_Writer.StartObject();
m_Writer.Key(ID);
m_Writer.Uint(id);
m_Writer.Key(SUCCESS);
m_Writer.Bool(success);
m_Writer.Key(REASON);
m_Writer.String(reason);
m_Writer.EndObject();
m_Writer.flush();
LOG_DEBUG(<< "Wrote controller response - id: " << id
<< " success: " << std::boolalpha << success << " reason: " << reason);
}
}
}
Loading

0 comments on commit d90ff2e

Please sign in to comment.