Skip to content

Commit

Permalink
[7.x][ML] Add timeouts to named pipe connections (elastic#1515)
Browse files Browse the repository at this point in the history
This PR adds timeouts to the named pipe connections of the
autodetect, normalize and data_frame_analyzer processes.
(The controller process already had a different mechanism,
tied to the ES JVM lifetime.)

Before this change if the ES JVM didn't connect to one of
the named pipes of an autodetect, normalize or
data_frame_analyzer process then it would hang forever.

After this change if nothing connects to the other end of
a named pipe that the C++ process has created within the
timeout period then it will delete the named pipe and exit.

The timeout will be supplied on the command line by a
followup Java PR.  In the meantime, (unreleased snapshot)
builds that have the C++ change but not the Java change will
use a timeout of 5 minutes, which is better than nothing and
highly unlikely to be less than the configured process
connect timeout in the ES settings.

Backport of elastic#1514
  • Loading branch information
droberts195 authored Sep 29, 2020
1 parent e8a3967 commit 68feffd
Show file tree
Hide file tree
Showing 31 changed files with 643 additions and 231 deletions.
6 changes: 6 additions & 0 deletions bin/autodetect/CCmdLineParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ bool CCmdLineParser::parse(int argc,
core_t::TTime& persistInterval,
std::size_t& bucketPersistInterval,
core_t::TTime& maxQuantileInterval,
core_t::TTime& namedPipeConnectTimeout,
std::string& inputFileName,
bool& isInputFileNamedPipe,
std::string& outputFileName,
Expand Down Expand Up @@ -93,6 +94,8 @@ bool CCmdLineParser::parse(int argc,
"Optional file to quantiles for normalization")
("deleteStateFiles",
"If the 'quantilesState' option is used and this flag is set then delete the model state files once they have been read")
("namedPipeConnectTimeout", boost::program_options::value<core_t::TTime>(),
"Optional timeout (in seconds) for connecting named pipes on startup - default is 300 seconds")
("input", boost::program_options::value<std::string>(),
"Optional file to read input from - not present means read from STDIN")
("inputIsPipe", "Specified input file is a named pipe")
Expand Down Expand Up @@ -209,6 +212,9 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("maxQuantileInterval") > 0) {
maxQuantileInterval = vm["maxQuantileInterval"].as<core_t::TTime>();
}
if (vm.count("namedPipeConnectTimeout") > 0) {
namedPipeConnectTimeout = vm["namedPipeConnectTimeout"].as<core_t::TTime>();
}
if (vm.count("input") > 0) {
inputFileName = vm["input"].as<std::string>();
}
Expand Down
1 change: 1 addition & 0 deletions bin/autodetect/CCmdLineParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class CCmdLineParser {
core_t::TTime& persistInterval,
std::size_t& bucketPersistInterval,
core_t::TTime& maxQuantileInterval,
core_t::TTime& namedPipeConnectTimeout,
std::string& inputFileName,
bool& isInputFileNamedPipe,
std::string& outputFileName,
Expand Down
34 changes: 25 additions & 9 deletions bin/autodetect/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//! IMPLEMENTATION DECISIONS:\n
//! Standalone program.
//!
#include <core/CBlockingCallCancellingTimer.h>
#include <core/CDataAdder.h>
#include <core/CDataSearcher.h>
#include <core/CJsonOutputStreamWrapper.h>
Expand Down Expand Up @@ -45,6 +46,7 @@

#include "CCmdLineParser.h"

#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <functional>
Expand Down Expand Up @@ -99,6 +101,8 @@ int main(int argc, char** argv) {
ml::core_t::TTime persistInterval{-1};
std::size_t bucketPersistInterval{0};
ml::core_t::TTime maxQuantileInterval{-1};
ml::core_t::TTime namedPipeConnectTimeout{
ml::core::CBlockingCallCancellingTimer::DEFAULT_TIMEOUT_SECONDS};
std::string inputFileName;
bool isInputFileNamedPipe{false};
std::string outputFileName;
Expand All @@ -118,25 +122,37 @@ int main(int argc, char** argv) {
modelPlotConfigFile, jobId, logProperties, logPipe, bucketSpan, latency,
summaryCountFieldName, delimiter, lengthEncodedInput, timeField,
timeFormat, quantilesStateFile, deleteStateFiles, persistInterval,
bucketPersistInterval, maxQuantileInterval, inputFileName,
isInputFileNamedPipe, outputFileName, isOutputFileNamedPipe, restoreFileName,
isRestoreFileNamedPipe, persistFileName, isPersistFileNamedPipe,
bucketPersistInterval, maxQuantileInterval, namedPipeConnectTimeout,
inputFileName, isInputFileNamedPipe, outputFileName, isOutputFileNamedPipe,
restoreFileName, isRestoreFileNamedPipe, persistFileName, isPersistFileNamedPipe,
isPersistInForeground, maxAnomalyRecords, memoryUsage, multivariateByFields,
stopCategorizationOnWarnStatus, clauseTokens) == false) {
return EXIT_FAILURE;
}

ml::core::CBlockingCallCancellingTimer cancellerThread{
ml::core::CThread::currentThreadId(), std::chrono::seconds{namedPipeConnectTimeout}};

// Construct the IO manager before reconfiguring the logger, as it performs
// std::ios actions that only work before first use
ml::api::CIoManager ioMgr{inputFileName, isInputFileNamedPipe,
outputFileName, isOutputFileNamedPipe,
restoreFileName, isRestoreFileNamedPipe,
persistFileName, isPersistFileNamedPipe};

if (ml::core::CLogger::instance().reconfigure(logPipe, logProperties) == false) {
ml::api::CIoManager ioMgr{
cancellerThread, inputFileName, isInputFileNamedPipe,
outputFileName, isOutputFileNamedPipe, restoreFileName,
isRestoreFileNamedPipe, persistFileName, isPersistFileNamedPipe};

if (cancellerThread.start() == false) {
// This log message will probably never been seen as it will go to the
// real stderr of this process rather than the log pipe...
LOG_FATAL(<< "Could not start blocking call canceller thread");
return EXIT_FAILURE;
}
if (ml::core::CLogger::instance().reconfigure(
logPipe, logProperties, cancellerThread.hasCancelledBlockingCall()) == false) {
LOG_FATAL(<< "Could not reconfigure logging");
cancellerThread.stop();
return EXIT_FAILURE;
}
cancellerThread.stop();

// Log the program version immediately after reconfiguring the logger. This
// must be done from the program, and NOT a shared library, as each program
Expand Down
30 changes: 18 additions & 12 deletions bin/categorize/CCmdLineParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ bool CCmdLineParser::parse(int argc,
char& delimiter,
bool& lengthEncodedInput,
core_t::TTime& persistInterval,
core_t::TTime& namedPipeConnectTimeout,
std::string& inputFileName,
bool& isInputFileNamedPipe,
std::string& outputFileName,
Expand All @@ -43,34 +44,36 @@ bool CCmdLineParser::parse(int argc,
("help", "Display this information and exit")
("version", "Display version information and exit")
("limitconfig", boost::program_options::value<std::string>(),
"Optional limit config file")
"Optional limit config file")
("jobid", boost::program_options::value<std::string>(),
"ID of the job this process is associated with")
"ID of the job this process is associated with")
("logProperties", boost::program_options::value<std::string>(),
"Optional logger properties file")
"Optional logger properties file")
("logPipe", boost::program_options::value<std::string>(),
"Optional log to named pipe")
"Optional log to named pipe")
("delimiter", boost::program_options::value<char>(),
"Optional delimiter character for delimited data formats - default is '\t' (tab separated)")
"Optional delimiter character for delimited data formats - default is '\t' (tab separated)")
("lengthEncodedInput",
"Take input in length encoded binary format - default is delimited")
"Take input in length encoded binary format - default is delimited")
("namedPipeConnectTimeout", boost::program_options::value<core_t::TTime>(),
"Optional timeout (in seconds) for connecting named pipes on startup - default is 300 seconds")
("input", boost::program_options::value<std::string>(),
"Optional file to read input from - not present means read from STDIN")
"Optional file to read input from - not present means read from STDIN")
("inputIsPipe", "Specified input file is a named pipe")
("output", boost::program_options::value<std::string>(),
"Optional file to write output to - not present means write to STDOUT")
"Optional file to write output to - not present means write to STDOUT")
("outputIsPipe", "Specified output file is a named pipe")
("restore", boost::program_options::value<std::string>(),
"Optional file to restore state from - not present means no state restoration")
"Optional file to restore state from - not present means no state restoration")
("restoreIsPipe", "Specified restore file is a named pipe")
("persist", boost::program_options::value<std::string>(),
"Optional file to persist state to - not present means no state persistence")
"Optional file to persist state to - not present means no state persistence")
("persistIsPipe", "Specified persist file is a named pipe")
("persistInterval", boost::program_options::value<core_t::TTime>(),
"Optional interval at which to periodically persist model state - if not specified then models will only be persisted at program exit")
"Optional interval at which to periodically persist model state - if not specified then models will only be persisted at program exit")
("persistInForeground", "Persistence occurs in the foreground. Defaults to background persistence.")
("categorizationfield", boost::program_options::value<std::string>(),
"Field to compute mlcategory from")
"Field to compute mlcategory from")
;
// clang-format on

Expand Down Expand Up @@ -108,6 +111,9 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("persistInterval") > 0) {
persistInterval = vm["persistInterval"].as<core_t::TTime>();
}
if (vm.count("namedPipeConnectTimeout") > 0) {
namedPipeConnectTimeout = vm["namedPipeConnectTimeout"].as<core_t::TTime>();
}
if (vm.count("input") > 0) {
inputFileName = vm["input"].as<std::string>();
}
Expand Down
1 change: 1 addition & 0 deletions bin/categorize/CCmdLineParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class CCmdLineParser {
char& delimiter,
bool& lengthEncodedInput,
core_t::TTime& persistInterval,
core_t::TTime& namedPipeConnectTimeout,
std::string& inputFileName,
bool& isInputFileNamedPipe,
std::string& outputFileName,
Expand Down
44 changes: 30 additions & 14 deletions bin/categorize/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//! IMPLEMENTATION DECISIONS:\n
//! Standalone program.
//!
#include <core/CBlockingCallCancellingTimer.h>
#include <core/CDataAdder.h>
#include <core/CDataSearcher.h>
#include <core/CJsonOutputStreamWrapper.h>
Expand Down Expand Up @@ -40,12 +41,12 @@

#include "CCmdLineParser.h"

#include <chrono>
#include <cstdlib>
#include <functional>
#include <memory>
#include <string>

#include <stdlib.h>

int main(int argc, char** argv) {
// Read command line options
std::string limitConfigFile;
Expand All @@ -61,6 +62,8 @@ int main(int argc, char** argv) {
std::string timeFormat;
bool stopCategorizationOnWarnStatus{false};
ml::core_t::TTime persistInterval{-1};
ml::core_t::TTime namedPipeConnectTimeout{
ml::core::CBlockingCallCancellingTimer::DEFAULT_TIMEOUT_SECONDS};
std::string inputFileName;
bool isInputFileNamedPipe{false};
std::string outputFileName;
Expand All @@ -73,23 +76,36 @@ int main(int argc, char** argv) {
std::string categorizationFieldName;
if (ml::categorize::CCmdLineParser::parse(
argc, argv, limitConfigFile, jobId, logProperties, logPipe, delimiter,
lengthEncodedInput, persistInterval, inputFileName, isInputFileNamedPipe,
outputFileName, isOutputFileNamedPipe, restoreFileName,
lengthEncodedInput, persistInterval, namedPipeConnectTimeout, inputFileName,
isInputFileNamedPipe, outputFileName, isOutputFileNamedPipe, restoreFileName,
isRestoreFileNamedPipe, persistFileName, isPersistFileNamedPipe,
isPersistInForeground, categorizationFieldName) == false) {
return EXIT_FAILURE;
}

ml::core::CBlockingCallCancellingTimer cancellerThread{
ml::core::CThread::currentThreadId(), std::chrono::seconds{namedPipeConnectTimeout}};

// Construct the IO manager before reconfiguring the logger, as it performs
// std::ios actions that only work before first use
ml::api::CIoManager ioMgr(inputFileName, isInputFileNamedPipe, outputFileName,
isOutputFileNamedPipe, restoreFileName, isRestoreFileNamedPipe,
persistFileName, isPersistFileNamedPipe);

if (ml::core::CLogger::instance().reconfigure(logPipe, logProperties) == false) {
ml::api::CIoManager ioMgr{
cancellerThread, inputFileName, isInputFileNamedPipe,
outputFileName, isOutputFileNamedPipe, restoreFileName,
isRestoreFileNamedPipe, persistFileName, isPersistFileNamedPipe};

if (cancellerThread.start() == false) {
// This log message will probably never been seen as it will go to the
// real stderr of this process rather than the log pipe...
LOG_FATAL(<< "Could not start blocking call canceller thread");
return EXIT_FAILURE;
}
if (ml::core::CLogger::instance().reconfigure(
logPipe, logProperties, cancellerThread.hasCancelledBlockingCall()) == false) {
LOG_FATAL(<< "Could not reconfigure logging");
cancellerThread.stop();
return EXIT_FAILURE;
}
cancellerThread.stop();

// Log the program version immediately after reconfiguring the logger. This
// must be done from the program, and NOT a shared library, as each program
Expand All @@ -116,7 +132,7 @@ int main(int argc, char** argv) {
return EXIT_FAILURE;
}

ml::model::CLimits limits(isPersistInForeground);
ml::model::CLimits limits{isPersistInForeground};
if (!limitConfigFile.empty() && limits.init(limitConfigFile) == false) {
LOG_FATAL(<< "ML limit config file '" << limitConfigFile << "' could not be loaded");
return EXIT_FAILURE;
Expand All @@ -126,7 +142,7 @@ int main(int argc, char** argv) {
LOG_FATAL(<< "No categorization field name specified");
return EXIT_FAILURE;
}
ml::api::CFieldConfig fieldConfig(categorizationFieldName);
ml::api::CFieldConfig fieldConfig{categorizationFieldName};

using TDataSearcherUPtr = std::unique_ptr<ml::core::CDataSearcher>;
const TDataSearcherUPtr restoreSearcher{[isRestoreFileNamedPipe, &ioMgr]() -> TDataSearcherUPtr {
Expand Down Expand Up @@ -176,7 +192,7 @@ int main(int argc, char** argv) {
return std::make_unique<ml::api::CCsvInputParser>(ioMgr.inputStream(), delimiter);
}()};

ml::core::CJsonOutputStreamWrapper wrappedOutputStream(ioMgr.outputStream());
ml::core::CJsonOutputStreamWrapper wrappedOutputStream{ioMgr.outputStream()};

// The categorizer knows how to assign categories to records
ml::api::CFieldDataCategorizer categorizer{jobId,
Expand All @@ -198,8 +214,8 @@ int main(int argc, char** argv) {
}

// The skeleton avoids the need to duplicate a lot of boilerplate code
ml::api::CCmdSkeleton skeleton(restoreSearcher.get(), persister.get(),
*inputParser, categorizer);
ml::api::CCmdSkeleton skeleton{restoreSearcher.get(), persister.get(),
*inputParser, categorizer};
if (skeleton.ioLoop() == false) {
LOG_FATAL(<< "ML categorization job failed");
return EXIT_FAILURE;
Expand Down
50 changes: 0 additions & 50 deletions bin/controller/CBlockingCallCancellerThread.cc

This file was deleted.

40 changes: 40 additions & 0 deletions bin/controller/CBlockingCallCancellingStreamMonitor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
#include "CBlockingCallCancellingStreamMonitor.h"

#include <core/CLogger.h>

#include <istream>

namespace ml {
namespace controller {

CBlockingCallCancellingStreamMonitor::CBlockingCallCancellingStreamMonitor(
core::CThread::TThreadId potentiallyBlockedThreadId,
std::istream& monitorStream)
: CBlockingCallCancellerThread{potentiallyBlockedThreadId}, m_MonitorStream{monitorStream} {
}

void CBlockingCallCancellingStreamMonitor::waitForCondition() {
char c;
while (m_MonitorStream >> c) {
if (this->isShutdown()) {
return;
}
}
}

void CBlockingCallCancellingStreamMonitor::stopWaitForCondition() {
// This is to wake up the stream reading in the waitForCondition() method of
// this object. If this has an effect then the assumption is that the
// program is exiting due to a reason other than the stream this object is
// monitoring ending.
if (this->cancelBlockedIo() == false) {
LOG_WARN(<< "Failed to cancel blocked IO in thread " << this->currentThreadId());
}
}
}
}
Loading

0 comments on commit 68feffd

Please sign in to comment.