Skip to content

Commit

Permalink
[ML] Make controller send responses for each command received
Browse files Browse the repository at this point in the history
This change makes the controller process respond to each command
it receives with a document indicating whether that command was
successfully executed or not.

This response will be used by the Java side of the connection to
determine when it is appropriate to move on to the next phase of
the action that the controller command was part of.  For example,
when starting a process and connecting named pipes to it it is
best that the named pipe connections are not attempted until the
process is confirmed to be started.

Relates elastic/elasticsearch#62823
  • Loading branch information
droberts195 committed Oct 1, 2020
1 parent 53216fe commit 0e6268c
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 101 deletions.
14 changes: 10 additions & 4 deletions bin/controller/CCmdLineParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@ bool CCmdLineParser::parse(int argc,
const char* const* argv,
std::string& jvmPidStr,
std::string& logPipe,
std::string& commandPipe) {
std::string& commandPipe,
std::string& outputPipe) {
try {
boost::program_options::options_description desc(DESCRIPTION);
// clang-format off
desc.add_options()
("help", "Display this information and exit")
("version", "Display version information and exit")
("jvmPid", boost::program_options::value<std::string>(),
"Process ID of the JVM to communicate with - default is parent process PID")
"Process ID of the JVM to communicate with - default is parent process PID")
("logPipe", boost::program_options::value<std::string>(),
"Named pipe to log to - default is controller_log_<JVM PID>")
"Named pipe to log to - default is controller_log_<JVM PID>")
("commandPipe", boost::program_options::value<std::string>(),
"Named pipe to accept commands from - default is controller_command_<JVM PID>")
"Named pipe to accept commands from - default is controller_command_<JVM PID>")
("outputPipe", boost::program_options::value<std::string>(),
"Named pipe to output responses to - default is controller_output_<JVM PID>")
;
// clang-format on

Expand All @@ -59,6 +62,9 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("commandPipe") > 0) {
commandPipe = vm["commandPipe"].as<std::string>();
}
if (vm.count("outputPipe") > 0) {
outputPipe = vm["outputPipe"].as<std::string>();
}
} catch (std::exception& e) {
std::cerr << "Error processing command line: " << e.what() << std::endl;
return false;
Expand Down
3 changes: 2 additions & 1 deletion bin/controller/CCmdLineParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class CCmdLineParser {
const char* const* argv,
std::string& jvmPidStr,
std::string& logPipe,
std::string& commandPipe);
std::string& commandPipe,
std::string& outputPipe);

private:
static const std::string DESCRIPTION;
Expand Down
80 changes: 51 additions & 29 deletions bin/controller/CCommandProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ namespace ml {
namespace controller {

// Initialise statics
const std::string CCommandProcessor::START("start");
const std::string CCommandProcessor::KILL("kill");
const std::string CCommandProcessor::START{"start"};
const std::string CCommandProcessor::KILL{"kill"};

CCommandProcessor::CCommandProcessor(const TStrVec& permittedProcessPaths)
: m_Spawner(permittedProcessPaths) {
CCommandProcessor::CCommandProcessor(const TStrVec& permittedProcessPaths,
std::ostream& responseStream)
: m_Spawner{permittedProcessPaths}, m_ResponseWriter{responseStream} {
}

void CCommandProcessor::processCommands(std::istream& stream) {
void CCommandProcessor::processCommands(std::istream& commandStream) {
std::string command;
while (std::getline(stream, command)) {
if (!command.empty()) {
while (std::getline(commandStream, command)) {
if (command.empty() == false) {
this->handleCommand(command);
}
}
Expand All @@ -41,61 +42,82 @@ void CCommandProcessor::processCommands(std::istream& stream) {
bool CCommandProcessor::handleCommand(const std::string& command) {
// Command lines must be tab-separated
TStrVec tokens;
std::string remainder;
core::CStringUtils::tokenise(TAB, command, tokens, remainder);
if (!remainder.empty()) {
tokens.push_back(remainder);
{
std::string remainder;
core::CStringUtils::tokenise(TAB, command, tokens, remainder);
if (remainder.empty() == false) {
tokens.emplace_back(std::move(remainder));
}
}

// Multiple consecutive tabs might have caused empty tokens
tokens.erase(std::remove(tokens.begin(), tokens.end(), EMPTY_STRING), tokens.end());

if (tokens.empty()) {
LOG_DEBUG(<< "Ignoring empty command");
if (tokens.size() < 3) {
if (tokens.empty() == false) {
LOG_ERROR(<< "Ignoring command with only " << tokens.size()
<< ((tokens.size() == 1) ? " token" : " tokens"));
}
return false;
}

// Split into verb and other tokens
std::string verb(tokens[0]);
tokens.erase(tokens.begin());
// Split into ID, verb and other tokens
std::uint32_t id{0};
if (core::CStringUtils::stringToType(tokens[0], id) == false || id == 0) {
LOG_ERROR(<< "Invalid command ID in " << core::CContainerPrinter::print(tokens));
return false;
}

std::string verb{std::move(tokens[1])};
tokens.erase(tokens.begin(), tokens.begin() + 2);

if (verb == START) {
return this->handleStart(tokens);
return this->handleStart(id, std::move(tokens));
}
if (verb == KILL) {
return this->handleKill(tokens);
return this->handleKill(id, std::move(tokens));
}

LOG_ERROR(<< "Did not understand verb '" << verb << '\'');
std::string error{"Did not understand verb '" + verb + '\''};
LOG_ERROR(<< error << " in command with ID " << id);
m_ResponseWriter.writeResponse(id, false, error);
return false;
}

bool CCommandProcessor::handleStart(TStrVec& tokens) {
std::string processPath;
processPath.swap(tokens[0]);
bool CCommandProcessor::handleStart(std::uint32_t id, TStrVec tokens) {
std::string processPath{std::move(tokens[0])};
tokens.erase(tokens.begin());

if (m_Spawner.spawn(processPath, tokens) == false) {
LOG_ERROR(<< "Failed to start process '" << processPath << '\'');
std::string error{"Failed to start process '" + processPath + '\''};
LOG_ERROR(<< error << " in command with ID " << id);
m_ResponseWriter.writeResponse(id, false, error);
return false;
}

m_ResponseWriter.writeResponse(id, true, "Process '" + processPath + "' started");
return true;
}

bool CCommandProcessor::handleKill(TStrVec& tokens) {
core::CProcess::TPid pid = 0;
if (tokens.size() != 1 || core::CStringUtils::stringToType(tokens[0], pid) == false) {
LOG_ERROR(<< "Unexpected arguments for kill command: "
<< core::CContainerPrinter::print(tokens));
bool CCommandProcessor::handleKill(std::uint32_t id, TStrVec tokens) {
core::CProcess::TPid pid{0};
if (tokens.size() != 1 ||
core::CStringUtils::stringToType(tokens[0], pid) == false || pid == 0) {
std::string error{"Unexpected arguments for kill command: " +
core::CContainerPrinter::print(tokens)};
LOG_ERROR(<< error << " in command with ID " << id);
m_ResponseWriter.writeResponse(id, false, error);
return false;
}

if (m_Spawner.terminateChild(pid) == false) {
LOG_ERROR(<< "Failed to kill process with PID " << pid);
std::string error{"Failed to kill process with PID " + tokens[0]};
LOG_ERROR(<< error << " in command with ID " << id);
m_ResponseWriter.writeResponse(id, false, error);
return false;
}

m_ResponseWriter.writeResponse(id, true, "Process with PID " + tokens[0] + " killed");
return true;
}
}
Expand Down
30 changes: 20 additions & 10 deletions bin/controller/CCommandProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

#include <core/CDetachedProcessSpawner.h>

#include "CResponseJsonWriter.h"

#include <cstdint>
#include <iosfwd>
#include <string>
#include <vector>
Expand All @@ -25,7 +28,11 @@ namespace controller {
//! command to be executed.
//!
//! Each command has the following format:
//! verb arguments...
//! ID verb arguments...
//!
//! The ID is expected to be a unique positive integer. This is reported
//! in error messages and in the response objects that are sent when the
//! command is complete.
//!
//! Available verbs are:
//! 1) start - in this case the arguments consist of the process name
Expand All @@ -51,30 +58,33 @@ class CCommandProcessor {
static const std::string KILL;

public:
CCommandProcessor(const TStrVec& permittedProcessPaths);
CCommandProcessor(const TStrVec& permittedProcessPaths, std::ostream& responseStream);

//! Action commands read from the supplied \p stream until end-of-file
//! is reached.
void processCommands(std::istream& stream);
//! Action commands read from the supplied \p commandStream until
//! end-of-file is reached.
void processCommands(std::istream& commandStream);

//! Parse and handle a single command.
bool handleCommand(const std::string& command);

private:
//! Handle a start command.
//! \param tokens Tokens to the command excluding the verb. Passed
//! non-const so that this method can manipulate the
//! tokens without having to copy.
bool handleStart(TStrVec& tokens);
//! \param id The command ID.
//! \param tokens Tokens to the command excluding the command ID and verb.
bool handleStart(std::uint32_t id, TStrVec tokens);

//! Handle a kill command.
//! \param id The command ID.
//! \param tokens Expected to contain one element, namely the process
//! ID of the process to be killed.
bool handleKill(TStrVec& tokens);
bool handleKill(std::uint32_t id, TStrVec tokens);

private:
//! Used to spawn/kill the requested processes.
core::CDetachedProcessSpawner m_Spawner;

//! Used to write responses in JSON format to the response stream.
CResponseJsonWriter m_ResponseWriter;
};
}
}
Expand Down
35 changes: 35 additions & 0 deletions bin/controller/CResponseJsonWriter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

#include "CResponseJsonWriter.h"

namespace ml {
namespace controller {
namespace {

// JSON field names
const std::string ID{"id"};
const std::string SUCCESS{"success"};
const std::string REASON{"reason"};
}

CResponseJsonWriter::CResponseJsonWriter(std::ostream& responseStream)
: m_WriteStream{responseStream}, m_Writer{m_WriteStream} {
}

void CResponseJsonWriter::writeResponse(std::uint32_t id, bool success, const std::string& reason) {
m_Writer.StartObject();
m_Writer.Key(ID);
m_Writer.Uint(id);
m_Writer.Key(SUCCESS);
m_Writer.Bool(success);
m_Writer.Key(REASON);
m_Writer.String(reason);
m_Writer.EndObject();
m_Writer.Flush();
}
}
}
53 changes: 53 additions & 0 deletions bin/controller/CResponseJsonWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
#ifndef INCLUDED_ml_controller_CResponseJsonWriter_h
#define INCLUDED_ml_controller_CResponseJsonWriter_h

#include <core/CRapidJsonLineWriter.h>

#include <rapidjson/ostreamwrapper.h>

#include <iosfwd>
#include <string>

namespace ml {
namespace controller {

//! \brief
//! Write a response to a controller command in JSON format.
//!
//! DESCRIPTION:\n
//! Output documents are of the form:
//!
//! { "id" : 123, "success" : true, "reason" : "message explaining success/failure" }
//!
//! A newline is written after each document, i.e. the output is ND-JSON.
//!
//! IMPLEMENTATION DECISIONS:\n
//! Not using the concurrent line writer, as there's no need for thread
//! safety.
//!
class CResponseJsonWriter {
public:
//! \param[in] responseStream The stream to which to write responses.
CResponseJsonWriter(std::ostream& responseStream);

//! Writes a response in JSON format.
void writeResponse(std::uint32_t id, bool success, const std::string& reason);

private:
//! JSON writer ostream wrapper
rapidjson::OStreamWrapper m_WriteStream;

using TGenericLineWriter = core::CRapidJsonLineWriter<rapidjson::OStreamWrapper>;

//! JSON writer
TGenericLineWriter m_Writer;
};
}
}

#endif // INCLUDED_ml_controller_CResponseJsonWriter_h
Loading

0 comments on commit 0e6268c

Please sign in to comment.