Skip to content

Commit

Permalink
[7.x][ML] Init AD model config from JSON config (#1602) (#1605)
Browse files Browse the repository at this point in the history
Initialise CAnomalyDetectorModelConfig with values from
the JSON config file parsed by CAnomalyJobConfig.

Relates to #1253
Backports #1602
  • Loading branch information
edsavage authored Dec 3, 2020
1 parent 3176441 commit 18dc4af
Show file tree
Hide file tree
Showing 22 changed files with 285 additions and 158 deletions.
16 changes: 0 additions & 16 deletions bin/autodetect/CCmdLineParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ bool CCmdLineParser::parse(int argc,
std::string& modelPlotConfigFile,
std::string& logProperties,
std::string& logPipe,
core_t::TTime& bucketSpan,
core_t::TTime& latency,
std::string& summaryCountFieldName,
char& delimiter,
bool& lengthEncodedInput,
std::string& timeField,
Expand All @@ -53,7 +50,6 @@ bool CCmdLineParser::parse(int argc,
bool& isPersistInForeground,
std::size_t& maxAnomalyRecords,
bool& memoryUsage,
bool& multivariateByFields,
bool& stopCategorizationOnWarnStatus,
TStrVec& clauseTokens) {
try {
Expand Down Expand Up @@ -177,15 +173,6 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("logPipe") > 0) {
logPipe = vm["logPipe"].as<std::string>();
}
if (vm.count("bucketspan") > 0) {
bucketSpan = vm["bucketspan"].as<core_t::TTime>();
}
if (vm.count("latency") > 0) {
latency = vm["latency"].as<core_t::TTime>();
}
if (vm.count("summarycountfield") > 0) {
summaryCountFieldName = vm["summarycountfield"].as<std::string>();
}
if (vm.count("delimiter") > 0) {
delimiter = vm["delimiter"].as<char>();
}
Expand Down Expand Up @@ -250,9 +237,6 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("memoryUsage") > 0) {
memoryUsage = true;
}
if (vm.count("multivariateByFields") > 0) {
multivariateByFields = true;
}
if (vm.count("stopCategorizationOnWarnStatus") > 0) {
stopCategorizationOnWarnStatus = true;
}
Expand Down
6 changes: 1 addition & 5 deletions bin/autodetect/CCmdLineParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CCmdLineParser {
using TStrVec = std::vector<std::string>;

public:
//! Parse the arguments and return options if appropriate. Unamed
//! Parse the arguments and return options if appropriate. Unnamed
//! options are placed in a vector for further processing/validation
//! later on by the api::CFieldConfig class.
static bool parse(int argc,
Expand All @@ -41,9 +41,6 @@ class CCmdLineParser {
std::string& modelPlotConfigFile,
std::string& logProperties,
std::string& logPipe,
core_t::TTime& bucketSpan,
core_t::TTime& latency,
std::string& summaryCountFieldName,
char& delimiter,
bool& lengthEncodedInput,
std::string& timeField,
Expand All @@ -65,7 +62,6 @@ class CCmdLineParser {
bool& isPersistInForeground,
std::size_t& maxAnomalyRecords,
bool& memoryUsage,
bool& multivariateByFields,
bool& stopCategorizationOnWarnStatus,
TStrVec& clauseTokens);

Expand Down
26 changes: 15 additions & 11 deletions bin/autodetect/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ int main(int argc, char** argv) {
std::string modelPlotConfigFile;
std::string logProperties;
std::string logPipe;
ml::core_t::TTime bucketSpan{0};
ml::core_t::TTime latency{0};
std::string summaryCountFieldName;
char delimiter{'\t'};
bool lengthEncodedInput{false};
std::string timeField{ml::api::CAnomalyJob::DEFAULT_TIME_FIELD_NAME};
Expand All @@ -116,19 +113,17 @@ int main(int argc, char** argv) {
bool isPersistInForeground{false};
std::size_t maxAnomalyRecords{100};
bool memoryUsage{false};
bool multivariateByFields{false};
bool stopCategorizationOnWarnStatus{false};
TStrVec clauseTokens;
if (ml::autodetect::CCmdLineParser::parse(
argc, argv, configFile, limitConfigFile, modelConfigFile, fieldConfigFile,
modelPlotConfigFile, logProperties, logPipe, bucketSpan, latency,
summaryCountFieldName, delimiter, lengthEncodedInput, timeField,
timeFormat, quantilesStateFile, deleteStateFiles, persistInterval,
modelPlotConfigFile, logProperties, logPipe, delimiter, lengthEncodedInput,
timeField, timeFormat, quantilesStateFile, deleteStateFiles, persistInterval,
bucketPersistInterval, maxQuantileInterval, namedPipeConnectTimeout,
inputFileName, isInputFileNamedPipe, outputFileName, isOutputFileNamedPipe,
restoreFileName, isRestoreFileNamedPipe, persistFileName, isPersistFileNamedPipe,
isPersistInForeground, maxAnomalyRecords, memoryUsage, multivariateByFields,
stopCategorizationOnWarnStatus, clauseTokens) == false) {
restoreFileName, isRestoreFileNamedPipe, persistFileName,
isPersistFileNamedPipe, isPersistInForeground, maxAnomalyRecords,
memoryUsage, stopCategorizationOnWarnStatus, clauseTokens) == false) {
return EXIT_FAILURE;
}

Expand Down Expand Up @@ -212,12 +207,20 @@ int main(int argc, char** argv) {
mutableFields.push_back(ml::api::CFieldDataCategorizer::MLCATEGORY_NAME);
}

// TODO: Encapsulate the logic below into say CAnomalyJobConfig::makeModelConfig
const ml::api::CAnomalyJobConfig::CAnalysisConfig& analysisConfig =
jobConfig.analysisConfig();
const std::string& summaryCountFieldName = analysisConfig.summaryCountFieldName();
ml::core_t::TTime bucketSpan = analysisConfig.bucketSpan();
ml::core_t::TTime latency = analysisConfig.latency();
bool multivariateByFields = analysisConfig.multivariateByFields();

ml::model_t::ESummaryMode summaryMode{
summaryCountFieldName.empty() ? ml::model_t::E_None : ml::model_t::E_Manual};
ml::model::CAnomalyDetectorModelConfig modelConfig{ml::model::CAnomalyDetectorModelConfig::defaultConfig(
bucketSpan, summaryMode, summaryCountFieldName, latency, multivariateByFields)};
modelConfig.detectionRules(ml::model::CAnomalyDetectorModelConfig::TIntDetectionRuleVecUMapCRef(
fieldConfig.detectionRules()));
analysisConfig.detectionRules()));
modelConfig.scheduledEvents(ml::model::CAnomalyDetectorModelConfig::TStrDetectionRulePrVecCRef(
fieldConfig.scheduledEvents()));

Expand Down Expand Up @@ -295,6 +298,7 @@ int main(int argc, char** argv) {
// The anomaly job knows how to detect anomalies
ml::api::CAnomalyJob job{jobId,
limits,
jobConfig,
fieldConfig,
modelConfig,
wrappedOutputStream,
Expand Down
12 changes: 11 additions & 1 deletion include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class CHierarchicalResults;
class CLimits;
}
namespace api {
class CAnomalyJobConfig;
class CPersistenceManager;
class CModelPlotDataJsonWriter;
class CFieldConfig;
Expand Down Expand Up @@ -142,6 +143,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
public:
CAnomalyJob(const std::string& jobId,
model::CLimits& limits,
CAnomalyJobConfig& jobConfig,
CFieldConfig& fieldConfig,
model::CAnomalyDetectorModelConfig& modelConfig,
core::CJsonOutputStreamWrapper& outputBuffer,
Expand Down Expand Up @@ -424,7 +426,15 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! Object to which the output is passed
CJsonOutputWriter m_JsonOutputWriter;

//! Field names to use for the analysis
//! Configuration settings for the analysis parsed from
//! JSON configuration file.
//! Note that this is a non-const reference as it needs to be capable of
//! being modified by job updates (and those changes reflected wherever a
//! reference is held).
CAnomalyJobConfig& m_JobConfig;

//! Field names to use for the analysis.
//! This is a non-const reference for the same reasons as for m_JobConfig.
CFieldConfig& m_FieldConfig;

//! The model configuration
Expand Down
40 changes: 32 additions & 8 deletions include/api/CAnomalyJobConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class API_EXPORT CAnomalyJobConfig {
static const std::string OVER_FIELD_NAME;
static const std::string PARTITION_FIELD_NAME;
static const std::string DETECTOR_DESCRIPTION;
static const std::string DETECTOR_INDEX;
static const std::string EXCLUDE_FREQUENT;
static const std::string CUSTOM_RULES;
static const std::string USE_NULL;
Expand All @@ -42,7 +43,8 @@ class API_EXPORT CAnomalyJobConfig {
CDetectorConfig() {}

void parse(const rapidjson::Value& detectorConfig,
const CDetectionRulesJsonParser::TStrPatternSetUMap& ruleFilters);
const CDetectionRulesJsonParser::TStrPatternSetUMap& ruleFilters,
CDetectionRulesJsonParser::TDetectionRuleVec& detectionRules);

std::string function() const { return m_Function; }
std::string fieldName() const { return m_FieldName; }
Expand All @@ -55,9 +57,6 @@ class API_EXPORT CAnomalyJobConfig {
std::string detectorDescription() const {
return m_DetectorDescription;
}
CDetectionRulesJsonParser::TDetectionRuleVec customRules() const {
return m_CustomRules;
}
bool useNull() const { return m_UseNull; }

private:
Expand All @@ -68,7 +67,7 @@ class API_EXPORT CAnomalyJobConfig {
std::string m_PartitionFieldName{};
std::string m_ExcludeFrequent{};
std::string m_DetectorDescription{};
CDetectionRulesJsonParser::TDetectionRuleVec m_CustomRules{};
int m_DetectorIndex{};
bool m_UseNull{false};
};

Expand All @@ -80,16 +79,21 @@ class API_EXPORT CAnomalyJobConfig {
static const std::string DETECTORS;
static const std::string INFLUENCERS;
static const std::string LATENCY;
static const std::string MULTIVARIATE_BY_FIELDS;
static const std::string PER_PARTITION_CATEGORIZATION;
static const std::string ENABLED;
static const std::string STOP_ON_WARN;

static const core_t::TTime DEFAULT_BUCKET_SPAN;
static const core_t::TTime DEFAULT_LATENCY;

public:
using TStrVec = std::vector<std::string>;
using TDetectorConfigVec = std::vector<CDetectorConfig>;

using TIntDetectionRuleVecUMap =
boost::unordered_map<int, CDetectionRulesJsonParser::TDetectionRuleVec>;

public:
//! Default constructor
CAnalysisConfig() {}
Expand All @@ -100,6 +104,10 @@ class API_EXPORT CAnomalyJobConfig {

void parse(const rapidjson::Value& json);

bool processFilter(const std::string& key, const std::string& value);

bool updateFilters(const boost::property_tree::ptree& propTree);

core_t::TTime bucketSpan() const { return m_BucketSpan; }

std::string summaryCountFieldName() const {
Expand All @@ -121,9 +129,20 @@ class API_EXPORT CAnomalyJobConfig {
return m_Detectors;
}
const TStrVec& influencers() const { return m_Influencers; }
std::string latency() const { return m_Latency; }
core_t::TTime latency() const { return m_Latency; }

bool multivariateByFields() const { return m_MultivariateByFields; }

static core_t::TTime bucketSpanSeconds(const std::string& bucketSpanString);
const TIntDetectionRuleVecUMap& detectionRules() const {
return m_DetectorRules;
}

const CDetectionRulesJsonParser::TStrPatternSetUMap& ruleFilters() const {
return m_RuleFilters;
}

static core_t::TTime durationSeconds(const std::string& durationString,
core_t::TTime defaultDuration);

private:
core_t::TTime m_BucketSpan{DEFAULT_BUCKET_SPAN};
Expand All @@ -134,7 +153,11 @@ class API_EXPORT CAnomalyJobConfig {
bool m_PerPartitionCategorizationStopOnWarn{false};
TDetectorConfigVec m_Detectors{};
TStrVec m_Influencers{};
std::string m_Latency{};
core_t::TTime m_Latency{DEFAULT_LATENCY};
bool m_MultivariateByFields{false};

//! The detection rules per detector index.
TIntDetectionRuleVecUMap m_DetectorRules;

//! The filters per id used by categorical rule conditions.
CDetectionRulesJsonParser::TStrPatternSetUMap m_RuleFilters{};
Expand Down Expand Up @@ -234,6 +257,7 @@ class API_EXPORT CAnomalyJobConfig {

std::string jobId() const { return m_JobId; }
std::string jobType() const { return m_JobType; }
CAnalysisConfig& analysisConfig() { return m_AnalysisConfig; }
const CAnalysisConfig& analysisConfig() const { return m_AnalysisConfig; }
const CDataDescription& dataDescription() const {
return m_DataDescription;
Expand Down
6 changes: 5 additions & 1 deletion include/api/CConfigUpdater.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <model/CAnomalyDetectorModelConfig.h>

#include <api/CAnomalyJobConfig.h>
#include <api/CFieldConfig.h>
#include <api/ImportExport.h>

Expand All @@ -34,7 +35,9 @@ namespace api {
//!
class API_EXPORT CConfigUpdater {
public:
CConfigUpdater(CFieldConfig& fieldConfig, model::CAnomalyDetectorModelConfig& modelConfig);
CConfigUpdater(CAnomalyJobConfig& jobConfig,
CFieldConfig& fieldConfig,
model::CAnomalyDetectorModelConfig& modelConfig);

//! Update from given config changes
//! \param config the requested changes in an ini syntax
Expand All @@ -49,6 +52,7 @@ class API_EXPORT CConfigUpdater {
static const std::string SCHEDULED_EVENTS;

private:
CAnomalyJobConfig& m_JobConfig;
CFieldConfig& m_FieldConfig;
model::CAnomalyDetectorModelConfig& m_ModelConfig;
};
Expand Down
8 changes: 5 additions & 3 deletions lib/api/CAnomalyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <model/CStringStore.h>

#include <api/CAnnotationJsonWriter.h>
#include <api/CAnomalyJobConfig.h>
#include <api/CConfigUpdater.h>
#include <api/CFieldConfig.h>
#include <api/CHierarchicalResultsWriter.h>
Expand Down Expand Up @@ -127,6 +128,7 @@ const CAnomalyJob::TAnomalyDetectorPtr CAnomalyJob::NULL_DETECTOR;

CAnomalyJob::CAnomalyJob(const std::string& jobId,
model::CLimits& limits,
CAnomalyJobConfig& jobConfig,
CFieldConfig& fieldConfig,
model::CAnomalyDetectorModelConfig& modelConfig,
core::CJsonOutputStreamWrapper& outputStream,
Expand All @@ -139,8 +141,8 @@ CAnomalyJob::CAnomalyJob(const std::string& jobId,
: CDataProcessor{timeFieldName, timeFieldFormat}, m_JobId{jobId}, m_Limits{limits},
m_OutputStream{outputStream}, m_ForecastRunner{m_JobId, m_OutputStream,
limits.resourceMonitor()},
m_JsonOutputWriter{m_JobId, m_OutputStream}, m_FieldConfig{fieldConfig},
m_ModelConfig{modelConfig}, m_NumRecordsHandled{0},
m_JsonOutputWriter{m_JobId, m_OutputStream}, m_JobConfig{jobConfig},
m_FieldConfig{fieldConfig}, m_ModelConfig{modelConfig}, m_NumRecordsHandled{0},
m_LastFinalisedBucketEndTime{0}, m_PersistCompleteFunc{persistCompleteFunc},
m_MaxDetectors{std::numeric_limits<size_t>::max()},
m_PersistenceManager{persistenceManager}, m_MaxQuantileInterval{maxQuantileInterval},
Expand Down Expand Up @@ -455,7 +457,7 @@ void CAnomalyJob::acknowledgeFlush(const std::string& flushId) {

void CAnomalyJob::updateConfig(const std::string& config) {
LOG_DEBUG(<< "Received update config request: " << config);
CConfigUpdater configUpdater(m_FieldConfig, m_ModelConfig);
CConfigUpdater configUpdater(m_JobConfig, m_FieldConfig, m_ModelConfig);
if (configUpdater.update(config) == false) {
LOG_ERROR(<< "Failed to update configuration");
}
Expand Down
Loading

0 comments on commit 18dc4af

Please sign in to comment.