Skip to content

Commit

Permalink
[7.x][ML] Prepare for removal of unneeded arguments (elastic#1648) (e…
Browse files Browse the repository at this point in the history
…lastic#1657)

Prepare the ground for the Java process to stop sending soon to be
redundant command line arguments. In particular:

* Ensure that the persist interval, time field and categorisation stop
on warn properties are obtained from job config
* Perform the calculation of the random 'staggering interval' in C++ code (to be removed from Java in due course)

Backports elastic#1648
Relates elastic#1253
  • Loading branch information
edsavage authored Jan 12, 2021
1 parent 50b0f34 commit fa776dd
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 52 deletions.
21 changes: 0 additions & 21 deletions bin/autodetect/CCmdLineParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ bool CCmdLineParser::parse(int argc,
std::string& configFile,
std::string& filtersConfigFile,
std::string& eventsConfigFile,
std::string& limitConfigFile,
std::string& modelConfigFile,
std::string& logProperties,
std::string& logPipe,
Expand All @@ -35,9 +34,7 @@ bool CCmdLineParser::parse(int argc,
std::string& timeFormat,
std::string& quantilesState,
bool& deleteStateFiles,
core_t::TTime& persistInterval,
std::size_t& bucketPersistInterval,
core_t::TTime& maxQuantileInterval,
core_t::TTime& namedPipeConnectTimeout,
std::string& inputFileName,
bool& isInputFileNamedPipe,
Expand Down Expand Up @@ -136,14 +133,6 @@ bool CCmdLineParser::parse(int argc,
.run();
boost::program_options::store(parsed, vm);

auto checkConflictingOptions = [&vm](const std::string& opt1,
const std::string& opt2) {
if (vm.count(opt1) && (vm[opt1].defaulted() == false) &&
vm.count(opt2) && (vm[opt2].defaulted() == false))
throw std::runtime_error("Conflicting options '" + opt1 +
"' and '" + opt2 + "'.");
};

if (vm.count("help") > 0) {
std::cerr << desc << std::endl;
return false;
Expand All @@ -165,9 +154,6 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("eventsconfig") > 0) {
eventsConfigFile = vm["eventsconfig"].as<std::string>();
}
if (vm.count("limitconfig") > 0) {
limitConfigFile = vm["limitconfig"].as<std::string>();
}
if (vm.count("modelconfig") > 0) {
modelConfigFile = vm["modelconfig"].as<std::string>();
}
Expand Down Expand Up @@ -195,16 +181,9 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("deleteStateFiles") > 0) {
deleteStateFiles = true;
}
checkConflictingOptions("persistInterval", "bucketPersistInterval");
if (vm.count("persistInterval") > 0) {
persistInterval = vm["persistInterval"].as<core_t::TTime>();
}
if (vm.count("bucketPersistInterval") > 0) {
bucketPersistInterval = vm["bucketPersistInterval"].as<std::size_t>();
}
if (vm.count("maxQuantileInterval") > 0) {
maxQuantileInterval = vm["maxQuantileInterval"].as<core_t::TTime>();
}
if (vm.count("namedPipeConnectTimeout") > 0) {
namedPipeConnectTimeout = vm["namedPipeConnectTimeout"].as<core_t::TTime>();
}
Expand Down
3 changes: 0 additions & 3 deletions bin/autodetect/CCmdLineParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class CCmdLineParser {
std::string& config,
std::string& filtersConfig,
std::string& eventsConfig,
std::string& limitConfigFile,
std::string& modelPlotConfigFile,
std::string& logProperties,
std::string& logPipe,
Expand All @@ -47,9 +46,7 @@ class CCmdLineParser {
std::string& timeFormat,
std::string& quantilesState,
bool& deleteStateFiles,
core_t::TTime& persistInterval,
std::size_t& bucketPersistInterval,
core_t::TTime& maxQuantileInterval,
core_t::TTime& namedPipeConnectTimeout,
std::string& inputFileName,
bool& isInputFileNamedPipe,
Expand Down
38 changes: 18 additions & 20 deletions bin/autodetect/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ int main(int argc, char** argv) {
std::string configFile;
std::string filtersConfigFile;
std::string eventsConfigFile;
std::string limitConfigFile;
std::string modelConfigFile;
std::string logProperties;
std::string logPipe;
Expand All @@ -97,9 +96,7 @@ int main(int argc, char** argv) {
std::string timeFormat;
std::string quantilesStateFile;
bool deleteStateFiles{false};
ml::core_t::TTime persistInterval{-1};
std::size_t bucketPersistInterval{0};
ml::core_t::TTime maxQuantileInterval{-1};
ml::core_t::TTime namedPipeConnectTimeout{
ml::core::CBlockingCallCancellingTimer::DEFAULT_TIMEOUT_SECONDS};
std::string inputFileName;
Expand All @@ -116,12 +113,11 @@ int main(int argc, char** argv) {
bool stopCategorizationOnWarnStatus{false};
TStrVec clauseTokens;
if (ml::autodetect::CCmdLineParser::parse(
argc, argv, configFile, filtersConfigFile, eventsConfigFile, limitConfigFile,
modelConfigFile, logProperties, logPipe, delimiter, lengthEncodedInput,
timeField, timeFormat, quantilesStateFile, deleteStateFiles, persistInterval,
bucketPersistInterval, maxQuantileInterval, namedPipeConnectTimeout,
inputFileName, isInputFileNamedPipe, outputFileName, isOutputFileNamedPipe,
restoreFileName, isRestoreFileNamedPipe, persistFileName,
argc, argv, configFile, filtersConfigFile, eventsConfigFile, modelConfigFile,
logProperties, logPipe, delimiter, lengthEncodedInput, timeField,
timeFormat, quantilesStateFile, deleteStateFiles, bucketPersistInterval,
namedPipeConnectTimeout, inputFileName, isInputFileNamedPipe, outputFileName,
isOutputFileNamedPipe, restoreFileName, isRestoreFileNamedPipe, persistFileName,
isPersistFileNamedPipe, isPersistInForeground, maxAnomalyRecords,
memoryUsage, stopCategorizationOnWarnStatus, clauseTokens) == false) {
return EXIT_FAILURE;
Expand Down Expand Up @@ -230,6 +226,7 @@ int main(int argc, char** argv) {
return nullptr;
}()};

ml::core_t::TTime persistInterval{jobConfig.persistInterval()};
if ((bucketPersistInterval > 0 || persistInterval >= 0) && persister == nullptr) {
LOG_FATAL(<< "Periodic persistence cannot be enabled using the '"
<< ((persistInterval >= 0) ? "persistInterval" : "bucketPersistInterval")
Expand Down Expand Up @@ -273,8 +270,8 @@ int main(int argc, char** argv) {
std::bind(&ml::api::CModelSnapshotJsonWriter::write,
&modelSnapshotWriter, std::placeholders::_1),
persistenceManager.get(),
maxQuantileInterval,
timeField,
jobConfig.quantilePersistInterval(),
jobConfig.dataDescription().timeField(),
timeFormat,
maxAnomalyRecords};

Expand All @@ -289,15 +286,16 @@ int main(int argc, char** argv) {
}

// The categorizer knows how to assign categories to records
ml::api::CFieldDataCategorizer categorizer{jobId,
analysisConfig,
limits,
timeField,
timeFormat,
&job,
wrappedOutputStream,
persistenceManager.get(),
stopCategorizationOnWarnStatus};
ml::api::CFieldDataCategorizer categorizer{
jobId,
analysisConfig,
limits,
jobConfig.dataDescription().timeField(),
timeFormat,
&job,
wrappedOutputStream,
persistenceManager.get(),
analysisConfig.perPartitionCategorizationStopOnWarn()};

ml::api::CDataProcessor* firstProcessor{nullptr};
if (doingCategorization) {
Expand Down
28 changes: 28 additions & 0 deletions include/api/CAnomalyJobConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ class API_EXPORT CAnomalyJobConfig {
// time field is always specified in seconds since epoch.
static const std::string TIME_FORMAT;

static const std::string DEFAULT_TIME_FIELD;

public:
//! Default constructor
CDataDescription() {}
Expand Down Expand Up @@ -464,11 +466,21 @@ class API_EXPORT CAnomalyJobConfig {
static const std::string JOB_TYPE;
static const std::string ANALYSIS_CONFIG;
static const std::string DATA_DESCRIPTION;
static const std::string BACKGROUND_PERSIST_INTERVAL;
static const std::string MODEL_PLOT_CONFIG;
static const std::string ANALYSIS_LIMITS;
static const std::string FILTERS;
static const std::string EVENTS;

// Roughly how often should the quantiles be output when no
// anomalies are being detected? A staggering factor that varies by job is
// added to this.
static const core_t::TTime BASE_MAX_QUANTILE_INTERVAL;

// Roughly how often should the state be persisted? A staggering
// factor that varies by job is added to this.
static const core_t::TTime DEFAULT_BASE_PERSIST_INTERVAL;

public:
//! Default constructor
CAnomalyJobConfig() {}
Expand All @@ -494,6 +506,12 @@ class API_EXPORT CAnomalyJobConfig {
bool parseFilterConfig(const std::string& json);
bool parseEventConfig(const std::string& json);

// Generate a random time of up to 1 hour to be added to intervals at which we
// perform periodic operations. This means that when there are many jobs
// there is a certain amount of staggering of their periodic operations.
// A given job will always be given the same staggering interval.
core_t::TTime intervalStagger();

void initRuleFilters() { m_AnalysisConfig.initRuleFilters(m_RuleFilters); }

void initScheduledEvents() {
Expand All @@ -519,6 +537,12 @@ class API_EXPORT CAnomalyJobConfig {
const CModelPlotConfig& modelPlotConfig() const { return m_ModelConfig; }
const CAnalysisLimits& analysisLimits() const { return m_AnalysisLimits; }
bool isInitialized() const { return m_IsInitialized; }
core_t::TTime persistInterval() const {
return m_BackgroundPersistInterval;
}
core_t::TTime quantilePersistInterval() const {
return m_MaxQuantilePersistInterval;
}

private:
std::string m_JobId;
Expand All @@ -533,6 +557,10 @@ class API_EXPORT CAnomalyJobConfig {

std::vector<CFilterConfig> m_Filters{};
std::vector<CEventConfig> m_Events{};

core_t::TTime m_BackgroundPersistInterval{DEFAULT_BASE_PERSIST_INTERVAL};

core_t::TTime m_MaxQuantilePersistInterval{BASE_MAX_QUANTILE_INTERVAL};
};
}
}
Expand Down
35 changes: 34 additions & 1 deletion lib/api/CAnomalyJobConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <rapidjson/prettywriter.h>
#include <rapidjson/stringbuffer.h>

#include <random>

#ifdef Windows
// rapidjson::Writer<rapidjson::StringBuffer> gets instantiated in the core
// library, and on Windows it gets exported too, because
Expand All @@ -41,10 +43,14 @@ const std::string CAnomalyJobConfig::JOB_TYPE{"job_type"};
const std::string CAnomalyJobConfig::ANALYSIS_CONFIG{"analysis_config"};
const std::string CAnomalyJobConfig::ANALYSIS_LIMITS{"analysis_limits"};
const std::string CAnomalyJobConfig::DATA_DESCRIPTION{"data_description"};
const std::string CAnomalyJobConfig::BACKGROUND_PERSIST_INTERVAL{"background_persist_interval"};
const std::string CAnomalyJobConfig::MODEL_PLOT_CONFIG{"model_plot_config"};
const std::string CAnomalyJobConfig::FILTERS{"filters"};
const std::string CAnomalyJobConfig::EVENTS{"events"};

const core_t::TTime CAnomalyJobConfig::BASE_MAX_QUANTILE_INTERVAL{21600}; // 6 hours
const core_t::TTime CAnomalyJobConfig::DEFAULT_BASE_PERSIST_INTERVAL{10800}; // 3 hours

const std::string CAnomalyJobConfig::CAnalysisConfig::BUCKET_SPAN{"bucket_span"};
const std::string CAnomalyJobConfig::CAnalysisConfig::SUMMARY_COUNT_FIELD_NAME{
"summary_count_field_name"};
Expand Down Expand Up @@ -207,6 +213,8 @@ const std::size_t CAnomalyJobConfig::CAnalysisLimits::DEFAULT_MEMORY_LIMIT_BYTES
const std::string CAnomalyJobConfig::CDataDescription::TIME_FIELD{"time_field"};
const std::string CAnomalyJobConfig::CDataDescription::TIME_FORMAT{"time_format"};

const std::string CAnomalyJobConfig::CDataDescription::DEFAULT_TIME_FIELD{"time"};

const std::string CAnomalyJobConfig::CEventConfig::DESCRIPTION{"description"};
const std::string CAnomalyJobConfig::CEventConfig::RULES{"rules"};

Expand Down Expand Up @@ -255,6 +263,8 @@ const CAnomalyJobConfigReader CONFIG_READER{[] {
CAnomalyJobConfigReader::E_OptionalParameter);
theReader.addParameter(CAnomalyJobConfig::DATA_DESCRIPTION,
CAnomalyJobConfigReader::E_OptionalParameter);
theReader.addParameter(CAnomalyJobConfig::BACKGROUND_PERSIST_INTERVAL,
CAnomalyJobConfigReader::E_OptionalParameter);
return theReader;
}()};

Expand All @@ -276,6 +286,8 @@ const CAnomalyJobConfigReader ANALYSIS_CONFIG_READER{[] {
CAnomalyJobConfigReader::E_OptionalParameter);
theReader.addParameter(CAnomalyJobConfig::CAnalysisConfig::LATENCY,
CAnomalyJobConfigReader::E_OptionalParameter);
theReader.addParameter(CAnomalyJobConfig::CAnalysisConfig::MULTIVARIATE_BY_FIELDS,
CAnomalyJobConfigReader::E_OptionalParameter);
return theReader;
}()};

Expand Down Expand Up @@ -560,6 +572,20 @@ bool CAnomalyJobConfig::parse(const std::string& json) {
if (modelPlotConfig != nullptr) {
m_ModelConfig.parse(*modelPlotConfig);
}

// We choose to ignore any errors here parsing the time duration string as
// we assume that it has already been validated by ES. In the event that any
// error _does_ occur an error is logged and a default value used.
const std::string& bucketPersistIntervalString{
parameters[BACKGROUND_PERSIST_INTERVAL].fallback(EMPTY_STRING)};

const core_t::TTime defaultBackgroundPersistInterval{
DEFAULT_BASE_PERSIST_INTERVAL + this->intervalStagger()};
m_BackgroundPersistInterval = CAnomalyJobConfig::CAnalysisConfig::durationSeconds(
bucketPersistIntervalString, defaultBackgroundPersistInterval);

m_MaxQuantilePersistInterval = BASE_MAX_QUANTILE_INTERVAL + this->intervalStagger();

} catch (CAnomalyJobConfigReader::CParseError& e) {
LOG_ERROR(<< "Error parsing anomaly job config: " << e.what());
return false;
Expand All @@ -570,6 +596,13 @@ bool CAnomalyJobConfig::parse(const std::string& json) {
return true;
}

core_t::TTime CAnomalyJobConfig::intervalStagger() {
std::seed_seq seed(m_JobId.begin(), m_JobId.end());
std::mt19937 generator{seed};
std::uniform_int_distribution<> distribution{0, core::constants::HOUR - 1};
return distribution(generator);
}

void CAnomalyJobConfig::CModelPlotConfig::parse(const rapidjson::Value& modelPlotConfig) {
auto parameters = MODEL_PLOT_CONFIG_READER.read(modelPlotConfig);

Expand Down Expand Up @@ -611,7 +644,7 @@ std::size_t CAnomalyJobConfig::CAnalysisLimits::modelMemoryLimitMb(const std::st
void CAnomalyJobConfig::CDataDescription::parse(const rapidjson::Value& analysisLimits) {
auto parameters = DATA_DESCRIPTION_READER.read(analysisLimits);

m_TimeField = parameters[TIME_FIELD].as<std::string>();
m_TimeField = parameters[TIME_FIELD].fallback(DEFAULT_TIME_FIELD);
m_TimeFormat = parameters[TIME_FORMAT].fallback(EMPTY_STRING); // Ignore
}

Expand Down
Loading

0 comments on commit fa776dd

Please sign in to comment.