diff --git a/bin/autodetect/CCmdLineParser.cc b/bin/autodetect/CCmdLineParser.cc index e5fe8e69ff..284a3186c8 100644 --- a/bin/autodetect/CCmdLineParser.cc +++ b/bin/autodetect/CCmdLineParser.cc @@ -64,68 +64,68 @@ bool CCmdLineParser::parse(int argc, ("help", "Display this information and exit") ("version", "Display version information and exit") ("config", boost::program_options::value(), - "The job configuration file") + "The job configuration file") ("limitconfig", boost::program_options::value(), - "Optional limit config file") + "Optional limit config file") ("modelconfig", boost::program_options::value(), - "Optional model config file") + "Optional model config file") ("fieldconfig", boost::program_options::value(), - "Optional field config file") + "Optional field config file") ("modelplotconfig", boost::program_options::value(), - "Optional model plot config file") + "Optional model plot config file") ("jobid", boost::program_options::value(), - "ID of the job this process is associated with") + "ID of the job this process is associated with") ("logProperties", boost::program_options::value(), - "Optional logger properties file") + "Optional logger properties file") ("logPipe", boost::program_options::value(), - "Optional log to named pipe") + "Optional log to named pipe") ("bucketspan", boost::program_options::value(), - "Optional aggregation bucket span (in seconds) - default is 300") + "Optional aggregation bucket span (in seconds) - default is 300") ("latency", boost::program_options::value(), - "Optional maximum delay for out-of-order records (in seconds) - default is 0") + "Optional maximum delay for out-of-order records (in seconds) - default is 0") ("summarycountfield", boost::program_options::value(), - "Optional field to that contains counts for pre-summarized input - default is none") + "Optional field to that contains counts for pre-summarized input - default is none") ("delimiter", boost::program_options::value(), - "Optional delimiter character for delimited data formats - default is '\t' (tab separated)") + "Optional delimiter character for delimited data formats - default is '\t' (tab separated)") ("lengthEncodedInput", - "Take input in length encoded binary format - default is delimited") + "Take input in length encoded binary format - default is delimited") ("timefield", boost::program_options::value(), - "Optional name of the field containing the timestamp - default is 'time'") + "Optional name of the field containing the timestamp - default is 'time'") ("timeformat", boost::program_options::value(), - "Optional format of the date in the time field in strptime code - default is the epoch time in seconds") + "Optional format of the date in the time field in strptime code - default is the epoch time in seconds") ("quantilesState", boost::program_options::value(), - "Optional file to quantiles for normalization") + "Optional file to quantiles for normalization") ("deleteStateFiles", - "If the 'quantilesState' option is used and this flag is set then delete the model state files once they have been read") + "If the 'quantilesState' option is used and this flag is set then delete the model state files once they have been read") ("namedPipeConnectTimeout", boost::program_options::value(), - "Optional timeout (in seconds) for connecting named pipes on startup - default is 300 seconds") + "Optional timeout (in seconds) for connecting named pipes on startup - default is 300 seconds") ("input", boost::program_options::value(), - "Optional file to read input from - not present means read from STDIN") + "Optional file to read input from - not present means read from STDIN") ("inputIsPipe", "Specified input file is a named pipe") ("output", boost::program_options::value(), - "Optional file to write output to - not present means write to STDOUT") + "Optional file to write output to - not present means write to STDOUT") ("outputIsPipe", "Specified output file is a named pipe") ("restore", boost::program_options::value(), - "Optional file to restore state from - not present means no state restoration") + "Optional file to restore state from - not present means no state restoration") ("restoreIsPipe", "Specified restore file is a named pipe") ("persist", boost::program_options::value(), - "Optional file to persist state to - not present means no state persistence") + "Optional file to persist state to - not present means no state persistence") ("persistIsPipe", "Specified persist file is a named pipe") ("persistInterval", boost::program_options::value(), - "Optional time interval at which to periodically persist model state (Mutually exclusive with bucketPersistInterval)") + "Optional time interval at which to periodically persist model state (Mutually exclusive with bucketPersistInterval)") ("persistInForeground", "Persistence occurs in the foreground. Defaults to background persistence.") ("bucketPersistInterval", boost::program_options::value(), - "Optional number of buckets after which to periodically persist model state (Mutually exclusive with persistInterval)") + "Optional number of buckets after which to periodically persist model state (Mutually exclusive with persistInterval)") ("maxQuantileInterval", boost::program_options::value(), - "Optional interval at which to periodically output quantiles if they have not been output due to an anomaly - if not specified then quantiles will only be output following a big anomaly") + "Optional interval at which to periodically output quantiles if they have not been output due to an anomaly - if not specified then quantiles will only be output following a big anomaly") ("maxAnomalyRecords", boost::program_options::value(), - "The maximum number of records to be outputted for each bucket. Defaults to 100, a value 0 removes the limit.") + "The maximum number of records to be outputted for each bucket. Defaults to 100, a value 0 removes the limit.") ("memoryUsage", - "Log the model memory usage at the end of the job") + "Log the model memory usage at the end of the job") ("multivariateByFields", - "Optional flag to enable multi-variate analysis of correlated by fields") + "Optional flag to enable multi-variate analysis of correlated by fields") ("stopCategorizationOnWarnStatus", - "Optional flag to stop categorization for partitions where the status is 'warn'.") + "Optional flag to stop categorization for partitions where the status is 'warn'.") ; // clang-format on diff --git a/bin/controller/CCmdLineParser.cc b/bin/controller/CCmdLineParser.cc index 9361598b22..7922edfeb7 100644 --- a/bin/controller/CCmdLineParser.cc +++ b/bin/controller/CCmdLineParser.cc @@ -21,7 +21,8 @@ bool CCmdLineParser::parse(int argc, const char* const* argv, std::string& jvmPidStr, std::string& logPipe, - std::string& commandPipe) { + std::string& commandPipe, + std::string& outputPipe) { try { boost::program_options::options_description desc(DESCRIPTION); // clang-format off @@ -29,11 +30,13 @@ bool CCmdLineParser::parse(int argc, ("help", "Display this information and exit") ("version", "Display version information and exit") ("jvmPid", boost::program_options::value(), - "Process ID of the JVM to communicate with - default is parent process PID") + "Process ID of the JVM to communicate with - default is parent process PID") ("logPipe", boost::program_options::value(), - "Named pipe to log to - default is controller_log_") + "Named pipe to log to - default is controller_log_") ("commandPipe", boost::program_options::value(), - "Named pipe to accept commands from - default is controller_command_") + "Named pipe to accept commands from - default is controller_command_") + ("outputPipe", boost::program_options::value(), + "Named pipe to output responses to - default is controller_output_") ; // clang-format on @@ -59,6 +62,9 @@ bool CCmdLineParser::parse(int argc, if (vm.count("commandPipe") > 0) { commandPipe = vm["commandPipe"].as(); } + if (vm.count("outputPipe") > 0) { + outputPipe = vm["outputPipe"].as(); + } } catch (std::exception& e) { std::cerr << "Error processing command line: " << e.what() << std::endl; return false; diff --git a/bin/controller/CCmdLineParser.h b/bin/controller/CCmdLineParser.h index 9434a2be04..abd41332af 100644 --- a/bin/controller/CCmdLineParser.h +++ b/bin/controller/CCmdLineParser.h @@ -33,7 +33,8 @@ class CCmdLineParser { const char* const* argv, std::string& jvmPidStr, std::string& logPipe, - std::string& commandPipe); + std::string& commandPipe, + std::string& outputPipe); private: static const std::string DESCRIPTION; diff --git a/bin/controller/CCommandProcessor.cc b/bin/controller/CCommandProcessor.cc index 9538a60211..db0b345e7a 100644 --- a/bin/controller/CCommandProcessor.cc +++ b/bin/controller/CCommandProcessor.cc @@ -22,17 +22,18 @@ namespace ml { namespace controller { // Initialise statics -const std::string CCommandProcessor::START("start"); -const std::string CCommandProcessor::KILL("kill"); +const std::string CCommandProcessor::START{"start"}; +const std::string CCommandProcessor::KILL{"kill"}; -CCommandProcessor::CCommandProcessor(const TStrVec& permittedProcessPaths) - : m_Spawner(permittedProcessPaths) { +CCommandProcessor::CCommandProcessor(const TStrVec& permittedProcessPaths, + std::ostream& responseStream) + : m_Spawner{permittedProcessPaths}, m_ResponseWriter{responseStream} { } -void CCommandProcessor::processCommands(std::istream& stream) { +void CCommandProcessor::processCommands(std::istream& commandStream) { std::string command; - while (std::getline(stream, command)) { - if (!command.empty()) { + while (std::getline(commandStream, command)) { + if (command.empty() == false) { this->handleCommand(command); } } @@ -41,61 +42,82 @@ void CCommandProcessor::processCommands(std::istream& stream) { bool CCommandProcessor::handleCommand(const std::string& command) { // Command lines must be tab-separated TStrVec tokens; - std::string remainder; - core::CStringUtils::tokenise(TAB, command, tokens, remainder); - if (!remainder.empty()) { - tokens.push_back(remainder); + { + std::string remainder; + core::CStringUtils::tokenise(TAB, command, tokens, remainder); + if (remainder.empty() == false) { + tokens.emplace_back(std::move(remainder)); + } } // Multiple consecutive tabs might have caused empty tokens tokens.erase(std::remove(tokens.begin(), tokens.end(), EMPTY_STRING), tokens.end()); - if (tokens.empty()) { - LOG_DEBUG(<< "Ignoring empty command"); + if (tokens.size() < 3) { + if (tokens.empty() == false) { + LOG_ERROR(<< "Ignoring command with only " << tokens.size() + << ((tokens.size() == 1) ? " token" : " tokens")); + } return false; } - // Split into verb and other tokens - std::string verb(tokens[0]); - tokens.erase(tokens.begin()); + // Split into ID, verb and other tokens + std::uint32_t id{0}; + if (core::CStringUtils::stringToType(tokens[0], id) == false || id == 0) { + LOG_ERROR(<< "Invalid command ID in " << core::CContainerPrinter::print(tokens)); + return false; + } + + std::string verb{std::move(tokens[1])}; + tokens.erase(tokens.begin(), tokens.begin() + 2); if (verb == START) { - return this->handleStart(tokens); + return this->handleStart(id, std::move(tokens)); } if (verb == KILL) { - return this->handleKill(tokens); + return this->handleKill(id, std::move(tokens)); } - LOG_ERROR(<< "Did not understand verb '" << verb << '\''); + std::string error{"Did not understand verb '" + verb + '\''}; + LOG_ERROR(<< error << " in command with ID " << id); + m_ResponseWriter.writeResponse(id, false, error); return false; } -bool CCommandProcessor::handleStart(TStrVec& tokens) { - std::string processPath; - processPath.swap(tokens[0]); +bool CCommandProcessor::handleStart(std::uint32_t id, TStrVec tokens) { + std::string processPath{std::move(tokens[0])}; tokens.erase(tokens.begin()); if (m_Spawner.spawn(processPath, tokens) == false) { - LOG_ERROR(<< "Failed to start process '" << processPath << '\''); + std::string error{"Failed to start process '" + processPath + '\''}; + LOG_ERROR(<< error << " in command with ID " << id); + m_ResponseWriter.writeResponse(id, false, error); return false; } + m_ResponseWriter.writeResponse(id, true, "Process '" + processPath + "' started"); return true; } -bool CCommandProcessor::handleKill(TStrVec& tokens) { - core::CProcess::TPid pid = 0; - if (tokens.size() != 1 || core::CStringUtils::stringToType(tokens[0], pid) == false) { - LOG_ERROR(<< "Unexpected arguments for kill command: " - << core::CContainerPrinter::print(tokens)); +bool CCommandProcessor::handleKill(std::uint32_t id, TStrVec tokens) { + core::CProcess::TPid pid{0}; + if (tokens.size() != 1 || + core::CStringUtils::stringToType(tokens[0], pid) == false || pid == 0) { + std::string error{"Unexpected arguments for kill command: " + + core::CContainerPrinter::print(tokens)}; + LOG_ERROR(<< error << " in command with ID " << id); + m_ResponseWriter.writeResponse(id, false, error); return false; } if (m_Spawner.terminateChild(pid) == false) { - LOG_ERROR(<< "Failed to kill process with PID " << pid); + std::string error{"Failed to kill process with PID " + tokens[0]}; + LOG_ERROR(<< error << " in command with ID " << id); + m_ResponseWriter.writeResponse(id, false, error); return false; } + m_ResponseWriter.writeResponse(id, true, "Process with PID " + tokens[0] + " killed"); return true; } } diff --git a/bin/controller/CCommandProcessor.h b/bin/controller/CCommandProcessor.h index 644869b5a7..3f37ec7f86 100644 --- a/bin/controller/CCommandProcessor.h +++ b/bin/controller/CCommandProcessor.h @@ -8,6 +8,9 @@ #include +#include "CResponseJsonWriter.h" + +#include #include #include #include @@ -25,7 +28,11 @@ namespace controller { //! command to be executed. //! //! Each command has the following format: -//! verb arguments... +//! ID verb arguments... +//! +//! The ID is expected to be a unique positive integer. This is reported +//! in error messages and in the response objects that are sent when the +//! command is complete. //! //! Available verbs are: //! 1) start - in this case the arguments consist of the process name @@ -51,30 +58,33 @@ class CCommandProcessor { static const std::string KILL; public: - CCommandProcessor(const TStrVec& permittedProcessPaths); + CCommandProcessor(const TStrVec& permittedProcessPaths, std::ostream& responseStream); - //! Action commands read from the supplied \p stream until end-of-file - //! is reached. - void processCommands(std::istream& stream); + //! Action commands read from the supplied \p commandStream until + //! end-of-file is reached. + void processCommands(std::istream& commandStream); //! Parse and handle a single command. bool handleCommand(const std::string& command); private: //! Handle a start command. - //! \param tokens Tokens to the command excluding the verb. Passed - //! non-const so that this method can manipulate the - //! tokens without having to copy. - bool handleStart(TStrVec& tokens); + //! \param id The command ID. + //! \param tokens Tokens to the command excluding the command ID and verb. + bool handleStart(std::uint32_t id, TStrVec tokens); //! Handle a kill command. + //! \param id The command ID. //! \param tokens Expected to contain one element, namely the process //! ID of the process to be killed. - bool handleKill(TStrVec& tokens); + bool handleKill(std::uint32_t id, TStrVec tokens); private: //! Used to spawn/kill the requested processes. core::CDetachedProcessSpawner m_Spawner; + + //! Used to write responses in JSON format to the response stream. + CResponseJsonWriter m_ResponseWriter; }; } } diff --git a/bin/controller/CResponseJsonWriter.cc b/bin/controller/CResponseJsonWriter.cc new file mode 100644 index 0000000000..cad9418bfe --- /dev/null +++ b/bin/controller/CResponseJsonWriter.cc @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +#include "CResponseJsonWriter.h" + +#include + +#include + +namespace ml { +namespace controller { +namespace { + +// JSON field names +const std::string ID{"id"}; +const std::string SUCCESS{"success"}; +const std::string REASON{"reason"}; +} + +CResponseJsonWriter::CResponseJsonWriter(std::ostream& responseStream) + : m_WrappedOutputStream(responseStream), m_Writer{m_WrappedOutputStream} { +} + +void CResponseJsonWriter::writeResponse(std::uint32_t id, bool success, const std::string& reason) { + m_Writer.StartObject(); + m_Writer.Key(ID); + m_Writer.Uint(id); + m_Writer.Key(SUCCESS); + m_Writer.Bool(success); + m_Writer.Key(REASON); + m_Writer.String(reason); + m_Writer.EndObject(); + m_Writer.flush(); + LOG_DEBUG(<< "Wrote controller response - id: " << id + << " success: " << std::boolalpha << success << " reason: " << reason); +} +} +} diff --git a/bin/controller/CResponseJsonWriter.h b/bin/controller/CResponseJsonWriter.h new file mode 100644 index 0000000000..1ba63428f1 --- /dev/null +++ b/bin/controller/CResponseJsonWriter.h @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +#ifndef INCLUDED_ml_controller_CResponseJsonWriter_h +#define INCLUDED_ml_controller_CResponseJsonWriter_h + +#include +#include + +#include +#include + +namespace ml { +namespace controller { + +//! \brief +//! Write a response to a controller command in JSON format. +//! +//! DESCRIPTION:\n +//! Output documents are of the form: +//! +//! { "id" : 123, "success" : true, "reason" : "message explaining success/failure" } +//! +//! They are written into a JSON array, i.e. the overall output looks +//! something like this: +//! +//! [{ "id" : 1, "success" : true, "reason" : "all ok" } +//! ,{ "id" : 2, "success" : false, "reason" : "something went wrong" } +//! ,{ "id" : 3, "success" : true, "reason" : "ok again" } +//! ] +//! +//! IMPLEMENTATION DECISIONS:\n +//! Uses the concurrent line writer. There's no need for thread safety +//! with the current design, but in future commands might be processed +//! concurrently. +//! +class CResponseJsonWriter { +public: + //! \param[in] responseStream The stream to which to write responses. + CResponseJsonWriter(std::ostream& responseStream); + + //! Writes a response in JSON format. + void writeResponse(std::uint32_t id, bool success, const std::string& reason); + +private: + //! Wrapped output stream + core::CJsonOutputStreamWrapper m_WrappedOutputStream; + + //! JSON line writer + core::CRapidJsonConcurrentLineWriter m_Writer; +}; +} +} + +#endif // INCLUDED_ml_controller_CResponseJsonWriter_h diff --git a/bin/controller/Main.cc b/bin/controller/Main.cc index 72cd07c208..59f2ad60c8 100644 --- a/bin/controller/Main.cc +++ b/bin/controller/Main.cc @@ -51,13 +51,12 @@ #include "CCmdLineParser.h" #include "CCommandProcessor.h" +#include +#include +#include #include #include -#include -#include -#include - int main(int argc, char** argv) { const std::string& defaultNamedPipePath{ml::core::CNamedPipeFactory::defaultPath()}; const std::string& progName{ml::core::CProgName::progName()}; @@ -67,8 +66,9 @@ int main(int argc, char** argv) { ml::core::CProcess::instance().parentId())}; std::string logPipe; std::string commandPipe; + std::string outputPipe; if (ml::controller::CCmdLineParser::parse(argc, argv, jvmPidStr, logPipe, - commandPipe) == false) { + commandPipe, outputPipe) == false) { return EXIT_FAILURE; } @@ -78,6 +78,9 @@ int main(int argc, char** argv) { if (commandPipe.empty()) { commandPipe = defaultNamedPipePath + progName + "_command_" + jvmPidStr; } + if (outputPipe.empty()) { + outputPipe = defaultNamedPipePath + progName + "_output_" + jvmPidStr; + } // This needs to be started before reconfiguring logging just in case // nothing connects to the other end of the logging pipe. This could @@ -129,13 +132,25 @@ int main(int argc, char** argv) { return EXIT_FAILURE; } + ml::core::CNamedPipeFactory::TOStreamP outputStream{ml::core::CNamedPipeFactory::openPipeStreamWrite( + outputPipe, cancellerThread.hasCancelledBlockingCall())}; + if (outputStream == nullptr) { + if (cancellerThread.hasCancelledBlockingCall().load()) { + LOG_INFO(<< "Parent process died - ML controller exiting"); + } else { + LOG_FATAL(<< "Could not open output pipe"); + } + cancellerThread.stop(); + return EXIT_FAILURE; + } + // Change directory to the directory containing this program, because the // permitted paths all assume the current working directory contains the // permitted programs const std::string& progDir{ml::core::CProgName::progDir()}; if (ml::core::COsFileFuncs::chdir(progDir.c_str()) == -1) { LOG_FATAL(<< "Could not change directory to '" << progDir - << "': " << ::strerror(errno)); + << "': " << std::strerror(errno)); cancellerThread.stop(); return EXIT_FAILURE; } @@ -143,7 +158,7 @@ int main(int argc, char** argv) { ml::controller::CCommandProcessor::TStrVec permittedProcessPaths{ "./autodetect", "./categorize", "./data_frame_analyzer", "./normalize"}; - ml::controller::CCommandProcessor processor{permittedProcessPaths}; + ml::controller::CCommandProcessor processor{permittedProcessPaths, *outputStream}; processor.processCommands(*commandStream); cancellerThread.stop(); diff --git a/bin/controller/Makefile b/bin/controller/Makefile index d417395309..58b724747f 100644 --- a/bin/controller/Makefile +++ b/bin/controller/Makefile @@ -13,6 +13,7 @@ ML_LIBS=$(LIB_ML_CORE) USE_BOOST=1 USE_BOOST_PROGRAMOPTIONS_LIBS=1 +USE_RAPIDJSON=1 LIBS=$(ML_LIBS) @@ -23,6 +24,7 @@ SRCS= \ CBlockingCallCancellingStreamMonitor.cc \ CCmdLineParser.cc \ CCommandProcessor.cc \ + CResponseJsonWriter.cc \ ifneq ($(PLIST_FILE),) all:: $(CPP_SRC_HOME)/gradle.properties $(CPP_SRC_HOME)/mk/make_info_plist.sh diff --git a/bin/controller/unittest/CCommandProcessorTest.cc b/bin/controller/unittest/CCommandProcessorTest.cc index c28a2a9a3c..4ea342c92d 100644 --- a/bin/controller/unittest/CCommandProcessorTest.cc +++ b/bin/controller/unittest/CCommandProcessorTest.cc @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -22,26 +23,27 @@ BOOST_AUTO_TEST_SUITE(CCommandProcessorTest) namespace { -const std::string OUTPUT_FILE("slogan1.txt"); +const std::string OUTPUT_FILE{"slogan1.txt"}; #ifdef Windows // Unlike Windows NT system calls, copy's command line cannot cope with // forward slash path separators -const std::string INPUT_FILE1("testfiles\\slogan1.txt"); -const std::string INPUT_FILE2("testfiles\\slogan2.txt"); -const char* winDir(::getenv("windir")); -const std::string PROCESS_PATH(winDir != 0 ? std::string(winDir) + "\\System32\\cmd" - : std::string("C:\\Windows\\System32\\cmd")); -const std::string PROCESS_ARGS1[] = {"/C", "copy " + INPUT_FILE1 + " ."}; -const std::string PROCESS_ARGS2[] = {"/C", "del " + INPUT_FILE2}; +const std::string INPUT_FILE1{"testfiles\\slogan1.txt"}; +const std::string INPUT_FILE2{"testfiles\\slogan2.txt"}; +const char* winDir{std::getenv("windir")}; +const std::string PROCESS_PATH{winDir != nullptr + ? std::string{winDir} + "\\System32\\cmd" + : std::string{"C:\\Windows\\System32\\cmd"}}; +const std::string PROCESS_ARGS1[]{"/C", "copy " + INPUT_FILE1 + " ."}; +const std::string PROCESS_ARGS2[]{"/C", "del " + INPUT_FILE2}; #else -const std::string INPUT_FILE1("testfiles/slogan1.txt"); -const std::string INPUT_FILE2("testfiles/slogan2.txt"); -const std::string PROCESS_PATH("/bin/sh"); -const std::string PROCESS_ARGS1[] = {"-c", "cp " + INPUT_FILE1 + " ."}; -const std::string PROCESS_ARGS2[] = {"-c", "rm " + INPUT_FILE2}; +const std::string INPUT_FILE1{"testfiles/slogan1.txt"}; +const std::string INPUT_FILE2{"testfiles/slogan2.txt"}; +const std::string PROCESS_PATH{"/bin/sh"}; +const std::string PROCESS_ARGS1[]{"-c", "cp " + INPUT_FILE1 + " ."}; +const std::string PROCESS_ARGS2[]{"-c", "rm " + INPUT_FILE2}; #endif -const std::string SLOGAN1("Elastic is great!"); -const std::string SLOGAN2("You know, for search!"); +const std::string SLOGAN1{"Elastic is great!"}; +const std::string SLOGAN2{"You know, for search!"}; } BOOST_AUTO_TEST_CASE(testStartPermitted) { @@ -49,94 +51,162 @@ BOOST_AUTO_TEST_CASE(testStartPermitted) { // check the return code as this will usually fail std::remove(OUTPUT_FILE.c_str()); - ml::controller::CCommandProcessor::TStrVec permittedPaths(1, PROCESS_PATH); - ml::controller::CCommandProcessor processor(permittedPaths); + std::ostringstream responseStream; + { + ml::controller::CCommandProcessor::TStrVec permittedPaths{PROCESS_PATH}; + ml::controller::CCommandProcessor processor{permittedPaths, responseStream}; - std::string command(ml::controller::CCommandProcessor::START); - command += '\t'; - command += PROCESS_PATH; - for (size_t index = 0; index < boost::size(PROCESS_ARGS1); ++index) { - command += '\t'; - command += PROCESS_ARGS1[index]; - } + std::string command{"1\t" + ml::controller::CCommandProcessor::START + '\t' + PROCESS_PATH}; + for (std::size_t index = 0; index < boost::size(PROCESS_ARGS1); ++index) { + command += '\t'; + command += PROCESS_ARGS1[index]; + } + + std::istringstream commandStream{command + '\n'}; + processor.processCommands(commandStream); - std::istringstream commandStream(command + '\n'); - processor.processCommands(commandStream); + // Expect the copy to complete in less than 1 second + std::this_thread::sleep_for(std::chrono::seconds{1}); - // Expect the copy to complete in less than 1 second - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::ifstream ifs{OUTPUT_FILE}; + BOOST_TEST_REQUIRE(ifs.is_open()); + std::string content; + std::getline(ifs, content); + ifs.close(); - std::ifstream ifs(OUTPUT_FILE.c_str()); - BOOST_TEST_REQUIRE(ifs.is_open()); - std::string content; - std::getline(ifs, content); - ifs.close(); + BOOST_REQUIRE_EQUAL(SLOGAN1, content); + } - BOOST_REQUIRE_EQUAL(SLOGAN1, content); + std::string jsonEscapedProcessPath{PROCESS_PATH}; + ml::core::CStringUtils::replace("\\", "\\\\", jsonEscapedProcessPath); + BOOST_REQUIRE_EQUAL("[{\"id\":1,\"success\":true,\"reason\":\"Process '" + jsonEscapedProcessPath + + "' started\"}\n" + "]", + responseStream.str()); BOOST_REQUIRE_EQUAL(0, std::remove(OUTPUT_FILE.c_str())); } BOOST_AUTO_TEST_CASE(testStartNonPermitted) { - ml::controller::CCommandProcessor::TStrVec permittedPaths(1, "some other process"); - ml::controller::CCommandProcessor processor(permittedPaths); - - std::string command(ml::controller::CCommandProcessor::START); - command += '\t'; - command += PROCESS_PATH; - for (size_t index = 0; index < boost::size(PROCESS_ARGS2); ++index) { - command += '\t'; - command += PROCESS_ARGS2[index]; + std::ostringstream responseStream; + { + ml::controller::CCommandProcessor::TStrVec permittedPaths{"some other process"}; + ml::controller::CCommandProcessor processor{permittedPaths, responseStream}; + + std::string command{"2\t" + ml::controller::CCommandProcessor::START + '\t' + PROCESS_PATH}; + for (std::size_t index = 0; index < boost::size(PROCESS_ARGS2); ++index) { + command += '\t'; + command += PROCESS_ARGS2[index]; + } + + std::istringstream commandStream{command + '\n'}; + processor.processCommands(commandStream); + + // The delete should have been rejected, so the second input file should + // still exist and have the expected contents + + std::ifstream ifs{INPUT_FILE2}; + BOOST_TEST_REQUIRE(ifs.is_open()); + std::string content; + std::getline(ifs, content); + ifs.close(); + + BOOST_REQUIRE_EQUAL(SLOGAN2, content); } - std::istringstream commandStream(command + '\n'); - processor.processCommands(commandStream); - - // The delete should have been rejected, so the second input file should - // still exist and have the expected contents - - std::ifstream ifs(INPUT_FILE2.c_str()); - BOOST_TEST_REQUIRE(ifs.is_open()); - std::string content; - std::getline(ifs, content); - ifs.close(); - - BOOST_REQUIRE_EQUAL(SLOGAN2, content); + std::string jsonEscapedProcessPath{PROCESS_PATH}; + ml::core::CStringUtils::replace("\\", "\\\\", jsonEscapedProcessPath); + BOOST_REQUIRE_EQUAL("[{\"id\":2,\"success\":false,\"reason\":\"Failed to start process '" + + jsonEscapedProcessPath + + "'\"}\n" + "]", + responseStream.str()); } BOOST_AUTO_TEST_CASE(testStartNonExistent) { - ml::controller::CCommandProcessor::TStrVec permittedPaths(1, "some other process"); - ml::controller::CCommandProcessor processor(permittedPaths); + std::ostringstream responseStream; + { + ml::controller::CCommandProcessor::TStrVec permittedPaths{"some other process"}; + ml::controller::CCommandProcessor processor{permittedPaths, responseStream}; - std::string command(ml::controller::CCommandProcessor::START); - command += "\tsome other process"; + std::string command{"3\t" + ml::controller::CCommandProcessor::START + "\tsome other process"}; - BOOST_TEST_REQUIRE(!processor.handleCommand(command)); + BOOST_REQUIRE_EQUAL(false, processor.handleCommand(command)); + } + + BOOST_REQUIRE_EQUAL("[{\"id\":3,\"success\":false,\"reason\":\"Failed to start process 'some other process'\"}\n" + "]", + responseStream.str()); } BOOST_AUTO_TEST_CASE(testKillDisallowed) { // Attempt to kill a process that exists but isn't allowed to be killed, // namely the unit test program + std::string pidStr{ + ml::core::CStringUtils::typeToString(ml::core::CProcess::instance().id())}; + + std::ostringstream responseStream; + { + ml::controller::CCommandProcessor::TStrVec permittedPaths{PROCESS_PATH}; + ml::controller::CCommandProcessor processor{permittedPaths, responseStream}; - ml::controller::CCommandProcessor::TStrVec permittedPaths(1, PROCESS_PATH); - ml::controller::CCommandProcessor processor(permittedPaths); + std::string command{"4\t" + ml::controller::CCommandProcessor::KILL + '\t' + pidStr}; - std::string command(ml::controller::CCommandProcessor::KILL); - command += '\t'; - command += - ml::core::CStringUtils::typeToString(ml::core::CProcess::instance().id()); + BOOST_REQUIRE_EQUAL(false, processor.handleCommand(command)); + } - BOOST_TEST_REQUIRE(!processor.handleCommand(command)); + BOOST_REQUIRE_EQUAL("[{\"id\":4,\"success\":false,\"reason\":\"Failed to kill process with PID " + + pidStr + + "\"}\n" + "]", + responseStream.str()); } BOOST_AUTO_TEST_CASE(testInvalidVerb) { - ml::controller::CCommandProcessor::TStrVec permittedPaths(1, "some other process"); - ml::controller::CCommandProcessor processor(permittedPaths); + std::ostringstream responseStream; + { + ml::controller::CCommandProcessor::TStrVec permittedPaths{"some other process"}; + ml::controller::CCommandProcessor processor{permittedPaths, responseStream}; + + std::string command{"5\tdrive\tsome other process"}; + + BOOST_REQUIRE_EQUAL(false, processor.handleCommand(command)); + } + + BOOST_REQUIRE_EQUAL("[{\"id\":5,\"success\":false,\"reason\":\"Did not understand verb 'drive'\"}\n" + "]", + responseStream.str()); +} + +BOOST_AUTO_TEST_CASE(testTooFewTokens) { + std::ostringstream responseStream; + { + ml::controller::CCommandProcessor::TStrVec permittedPaths{"some other process"}; + ml::controller::CCommandProcessor processor{permittedPaths, responseStream}; + + std::string command{ml::controller::CCommandProcessor::START + "\tsome other process"}; + + BOOST_REQUIRE_EQUAL(false, processor.handleCommand(command)); + } + + // It's not possible to respond without an ID + BOOST_REQUIRE_EQUAL("[]", responseStream.str()); +} - std::string command("drive"); - command += "\tsome other process"; +BOOST_AUTO_TEST_CASE(testMissingId) { + std::ostringstream responseStream; + { + ml::controller::CCommandProcessor::TStrVec permittedPaths{"some other process"}; + ml::controller::CCommandProcessor processor{permittedPaths, responseStream}; + + std::string command{ml::controller::CCommandProcessor::START + + "\tsome other process\targ1\targ2"}; + + BOOST_REQUIRE_EQUAL(false, processor.handleCommand(command)); + } - BOOST_TEST_REQUIRE(!processor.handleCommand(command)); + // It's not possible to respond without an ID + BOOST_REQUIRE_EQUAL("[]", responseStream.str()); } BOOST_AUTO_TEST_SUITE_END() diff --git a/bin/controller/unittest/CResponseJsonWriterTest.cc b/bin/controller/unittest/CResponseJsonWriterTest.cc new file mode 100644 index 0000000000..464a471c3e --- /dev/null +++ b/bin/controller/unittest/CResponseJsonWriterTest.cc @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +#include "../CResponseJsonWriter.h" + +#include + +#include + +BOOST_AUTO_TEST_SUITE(CResponseJsonWriterTest) + +BOOST_AUTO_TEST_CASE(testResponseWriter) { + std::ostringstream responseStream; + { + ml::controller::CResponseJsonWriter responseWriter{responseStream}; + responseWriter.writeResponse(1, true, "reason a"); + responseWriter.writeResponse(3, false, "reason b"); + responseWriter.writeResponse(2, true, "reason c"); + } + + BOOST_REQUIRE_EQUAL("[{\"id\":1,\"success\":true,\"reason\":\"reason a\"}\n" + ",{\"id\":3,\"success\":false,\"reason\":\"reason b\"}\n" + ",{\"id\":2,\"success\":true,\"reason\":\"reason c\"}\n" + "]", + responseStream.str()); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/bin/controller/unittest/Makefile b/bin/controller/unittest/Makefile index 95f50b4057..fd0c6356df 100644 --- a/bin/controller/unittest/Makefile +++ b/bin/controller/unittest/Makefile @@ -10,6 +10,7 @@ TARGET=ml_test$(EXE_EXT) USE_BOOST=1 USE_BOOST_PROGRAMOPTIONS_LIBS=1 USE_BOOST_TEST_LIBS=1 +USE_RAPIDJSON=1 all: build @@ -20,6 +21,7 @@ SRCS=\ Main.cc \ CBlockingCallCancellingStreamMonitorTest.cc \ CCommandProcessorTest.cc \ + CResponseJsonWriterTest.cc \ include $(CPP_SRC_HOME)/mk/stdboosttest.mk diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index d99c21de20..901b454313 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -45,6 +45,8 @@ * During regression and classification training prefer smaller models if performance is similar (See {ml-pull}1516[#1516].) +* Add a response mechanism for commands sent to the native controller. (See + {ml-pull}1520[#1520], {es-pull}63542[#63542], issue: {es-issue}62823[#62823].) == {es} version 7.10.0