Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Init AD model config from JSON config #1602

Merged
merged 3 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions bin/autodetect/CCmdLineParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ bool CCmdLineParser::parse(int argc,
std::string& modelPlotConfigFile,
std::string& logProperties,
std::string& logPipe,
core_t::TTime& bucketSpan,
core_t::TTime& latency,
std::string& summaryCountFieldName,
char& delimiter,
bool& lengthEncodedInput,
std::string& timeField,
Expand All @@ -53,7 +50,6 @@ bool CCmdLineParser::parse(int argc,
bool& isPersistInForeground,
std::size_t& maxAnomalyRecords,
bool& memoryUsage,
bool& multivariateByFields,
bool& stopCategorizationOnWarnStatus,
TStrVec& clauseTokens) {
try {
Expand Down Expand Up @@ -177,15 +173,6 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("logPipe") > 0) {
logPipe = vm["logPipe"].as<std::string>();
}
if (vm.count("bucketspan") > 0) {
bucketSpan = vm["bucketspan"].as<core_t::TTime>();
}
if (vm.count("latency") > 0) {
latency = vm["latency"].as<core_t::TTime>();
}
if (vm.count("summarycountfield") > 0) {
summaryCountFieldName = vm["summarycountfield"].as<std::string>();
}
if (vm.count("delimiter") > 0) {
delimiter = vm["delimiter"].as<char>();
}
Expand Down Expand Up @@ -250,9 +237,6 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("memoryUsage") > 0) {
memoryUsage = true;
}
if (vm.count("multivariateByFields") > 0) {
multivariateByFields = true;
}
if (vm.count("stopCategorizationOnWarnStatus") > 0) {
stopCategorizationOnWarnStatus = true;
}
Expand Down
6 changes: 1 addition & 5 deletions bin/autodetect/CCmdLineParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CCmdLineParser {
using TStrVec = std::vector<std::string>;

public:
//! Parse the arguments and return options if appropriate. Unamed
//! Parse the arguments and return options if appropriate. Unnamed
//! options are placed in a vector for further processing/validation
//! later on by the api::CFieldConfig class.
static bool parse(int argc,
Expand All @@ -41,9 +41,6 @@ class CCmdLineParser {
std::string& modelPlotConfigFile,
std::string& logProperties,
std::string& logPipe,
core_t::TTime& bucketSpan,
core_t::TTime& latency,
std::string& summaryCountFieldName,
char& delimiter,
bool& lengthEncodedInput,
std::string& timeField,
Expand All @@ -65,7 +62,6 @@ class CCmdLineParser {
bool& isPersistInForeground,
std::size_t& maxAnomalyRecords,
bool& memoryUsage,
bool& multivariateByFields,
bool& stopCategorizationOnWarnStatus,
TStrVec& clauseTokens);

Expand Down
26 changes: 15 additions & 11 deletions bin/autodetect/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ int main(int argc, char** argv) {
std::string modelPlotConfigFile;
std::string logProperties;
std::string logPipe;
ml::core_t::TTime bucketSpan{0};
ml::core_t::TTime latency{0};
std::string summaryCountFieldName;
char delimiter{'\t'};
bool lengthEncodedInput{false};
std::string timeField{ml::api::CAnomalyJob::DEFAULT_TIME_FIELD_NAME};
Expand All @@ -116,19 +113,17 @@ int main(int argc, char** argv) {
bool isPersistInForeground{false};
std::size_t maxAnomalyRecords{100};
bool memoryUsage{false};
bool multivariateByFields{false};
bool stopCategorizationOnWarnStatus{false};
TStrVec clauseTokens;
if (ml::autodetect::CCmdLineParser::parse(
argc, argv, configFile, limitConfigFile, modelConfigFile, fieldConfigFile,
modelPlotConfigFile, logProperties, logPipe, bucketSpan, latency,
summaryCountFieldName, delimiter, lengthEncodedInput, timeField,
timeFormat, quantilesStateFile, deleteStateFiles, persistInterval,
modelPlotConfigFile, logProperties, logPipe, delimiter, lengthEncodedInput,
timeField, timeFormat, quantilesStateFile, deleteStateFiles, persistInterval,
bucketPersistInterval, maxQuantileInterval, namedPipeConnectTimeout,
inputFileName, isInputFileNamedPipe, outputFileName, isOutputFileNamedPipe,
restoreFileName, isRestoreFileNamedPipe, persistFileName, isPersistFileNamedPipe,
isPersistInForeground, maxAnomalyRecords, memoryUsage, multivariateByFields,
stopCategorizationOnWarnStatus, clauseTokens) == false) {
restoreFileName, isRestoreFileNamedPipe, persistFileName,
isPersistFileNamedPipe, isPersistInForeground, maxAnomalyRecords,
memoryUsage, stopCategorizationOnWarnStatus, clauseTokens) == false) {
return EXIT_FAILURE;
}

Expand Down Expand Up @@ -212,12 +207,20 @@ int main(int argc, char** argv) {
mutableFields.push_back(ml::api::CFieldDataCategorizer::MLCATEGORY_NAME);
}

// TODO: Encapsulate the logic below into say CAnomalyJobConfig::makeModelConfig
const ml::api::CAnomalyJobConfig::CAnalysisConfig& analysisConfig =
jobConfig.analysisConfig();
const std::string& summaryCountFieldName = analysisConfig.summaryCountFieldName();
ml::core_t::TTime bucketSpan = analysisConfig.bucketSpan();
ml::core_t::TTime latency = analysisConfig.latency();
bool multivariateByFields = analysisConfig.multivariateByFields();

ml::model_t::ESummaryMode summaryMode{
summaryCountFieldName.empty() ? ml::model_t::E_None : ml::model_t::E_Manual};
ml::model::CAnomalyDetectorModelConfig modelConfig{ml::model::CAnomalyDetectorModelConfig::defaultConfig(
bucketSpan, summaryMode, summaryCountFieldName, latency, multivariateByFields)};
modelConfig.detectionRules(ml::model::CAnomalyDetectorModelConfig::TIntDetectionRuleVecUMapCRef(
fieldConfig.detectionRules()));
analysisConfig.detectionRules()));
Comment on lines +213 to +223
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the long term this could be encapsulated in a method of CAnomalyJobConfig, say CAnomalyJobConfig::makeModelConfig(). But since you're doing a series of PRs you can save this for the next one.

modelConfig.scheduledEvents(ml::model::CAnomalyDetectorModelConfig::TStrDetectionRulePrVecCRef(
fieldConfig.scheduledEvents()));

Expand Down Expand Up @@ -295,6 +298,7 @@ int main(int argc, char** argv) {
// The anomaly job knows how to detect anomalies
ml::api::CAnomalyJob job{jobId,
limits,
jobConfig,
fieldConfig,
modelConfig,
wrappedOutputStream,
Expand Down
12 changes: 11 additions & 1 deletion include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class CHierarchicalResults;
class CLimits;
}
namespace api {
class CAnomalyJobConfig;
class CPersistenceManager;
class CModelPlotDataJsonWriter;
class CFieldConfig;
Expand Down Expand Up @@ -142,6 +143,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
public:
CAnomalyJob(const std::string& jobId,
model::CLimits& limits,
CAnomalyJobConfig& jobConfig,
CFieldConfig& fieldConfig,
model::CAnomalyDetectorModelConfig& modelConfig,
core::CJsonOutputStreamWrapper& outputBuffer,
Expand Down Expand Up @@ -424,7 +426,15 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! Object to which the output is passed
CJsonOutputWriter m_JsonOutputWriter;

//! Field names to use for the analysis
//! Configuration settings for the analysis parsed from
//! JSON configuration file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to say why this is non-const. It's because it gets updated by job updates right? And of course since it's a reference that will also affect everywhere else that has a reference.

//! Note that this is a non-const reference as it needs to be capable of
//! being modified by job updates (and those changes reflected wherever a
//! reference is held).
CAnomalyJobConfig& m_JobConfig;

//! Field names to use for the analysis.
//! This is a non-const reference for the same reasons as for m_JobConfig.
CFieldConfig& m_FieldConfig;

//! The model configuration
Expand Down
40 changes: 32 additions & 8 deletions include/api/CAnomalyJobConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class API_EXPORT CAnomalyJobConfig {
static const std::string OVER_FIELD_NAME;
static const std::string PARTITION_FIELD_NAME;
static const std::string DETECTOR_DESCRIPTION;
static const std::string DETECTOR_INDEX;
static const std::string EXCLUDE_FREQUENT;
static const std::string CUSTOM_RULES;
static const std::string USE_NULL;
Expand All @@ -42,7 +43,8 @@ class API_EXPORT CAnomalyJobConfig {
CDetectorConfig() {}

void parse(const rapidjson::Value& detectorConfig,
const CDetectionRulesJsonParser::TStrPatternSetUMap& ruleFilters);
const CDetectionRulesJsonParser::TStrPatternSetUMap& ruleFilters,
CDetectionRulesJsonParser::TDetectionRuleVec& detectionRules);

std::string function() const { return m_Function; }
std::string fieldName() const { return m_FieldName; }
Expand All @@ -55,9 +57,6 @@ class API_EXPORT CAnomalyJobConfig {
std::string detectorDescription() const {
return m_DetectorDescription;
}
CDetectionRulesJsonParser::TDetectionRuleVec customRules() const {
return m_CustomRules;
}
bool useNull() const { return m_UseNull; }

private:
Expand All @@ -68,7 +67,7 @@ class API_EXPORT CAnomalyJobConfig {
std::string m_PartitionFieldName{};
std::string m_ExcludeFrequent{};
std::string m_DetectorDescription{};
CDetectionRulesJsonParser::TDetectionRuleVec m_CustomRules{};
int m_DetectorIndex{};
bool m_UseNull{false};
};

Expand All @@ -80,16 +79,21 @@ class API_EXPORT CAnomalyJobConfig {
static const std::string DETECTORS;
static const std::string INFLUENCERS;
static const std::string LATENCY;
static const std::string MULTIVARIATE_BY_FIELDS;
static const std::string PER_PARTITION_CATEGORIZATION;
static const std::string ENABLED;
static const std::string STOP_ON_WARN;

static const core_t::TTime DEFAULT_BUCKET_SPAN;
static const core_t::TTime DEFAULT_LATENCY;

public:
using TStrVec = std::vector<std::string>;
using TDetectorConfigVec = std::vector<CDetectorConfig>;

using TIntDetectionRuleVecUMap =
boost::unordered_map<int, CDetectionRulesJsonParser::TDetectionRuleVec>;

public:
//! Default constructor
CAnalysisConfig() {}
Expand All @@ -100,6 +104,10 @@ class API_EXPORT CAnomalyJobConfig {

void parse(const rapidjson::Value& json);

bool processFilter(const std::string& key, const std::string& value);

bool updateFilters(const boost::property_tree::ptree& propTree);

core_t::TTime bucketSpan() const { return m_BucketSpan; }

std::string summaryCountFieldName() const {
Expand All @@ -121,9 +129,20 @@ class API_EXPORT CAnomalyJobConfig {
return m_Detectors;
}
const TStrVec& influencers() const { return m_Influencers; }
std::string latency() const { return m_Latency; }
core_t::TTime latency() const { return m_Latency; }

bool multivariateByFields() const { return m_MultivariateByFields; }

static core_t::TTime bucketSpanSeconds(const std::string& bucketSpanString);
const TIntDetectionRuleVecUMap& detectionRules() const {
return m_DetectorRules;
}

const CDetectionRulesJsonParser::TStrPatternSetUMap& ruleFilters() const {
return m_RuleFilters;
}

static core_t::TTime durationSeconds(const std::string& durationString,
core_t::TTime defaultDuration);

private:
core_t::TTime m_BucketSpan{DEFAULT_BUCKET_SPAN};
Expand All @@ -134,7 +153,11 @@ class API_EXPORT CAnomalyJobConfig {
bool m_PerPartitionCategorizationStopOnWarn{false};
TDetectorConfigVec m_Detectors{};
TStrVec m_Influencers{};
std::string m_Latency{};
core_t::TTime m_Latency{DEFAULT_LATENCY};
bool m_MultivariateByFields{false};

//! The detection rules per detector index.
TIntDetectionRuleVecUMap m_DetectorRules;

//! The filters per id used by categorical rule conditions.
CDetectionRulesJsonParser::TStrPatternSetUMap m_RuleFilters{};
Expand Down Expand Up @@ -234,6 +257,7 @@ class API_EXPORT CAnomalyJobConfig {

std::string jobId() const { return m_JobId; }
std::string jobType() const { return m_JobType; }
CAnalysisConfig& analysisConfig() { return m_AnalysisConfig; }
const CAnalysisConfig& analysisConfig() const { return m_AnalysisConfig; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could provide both const and non-const accessors. In future as this class gets used more widely some places may hold a const reference to the CAnomalyJobConfig, and it's annoying for those places to have to cast away constness just to get a const reference to one of the sub-objects.

const CDataDescription& dataDescription() const {
return m_DataDescription;
Expand Down
6 changes: 5 additions & 1 deletion include/api/CConfigUpdater.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <model/CAnomalyDetectorModelConfig.h>

#include <api/CAnomalyJobConfig.h>
#include <api/CFieldConfig.h>
#include <api/ImportExport.h>

Expand All @@ -34,7 +35,9 @@ namespace api {
//!
class API_EXPORT CConfigUpdater {
public:
CConfigUpdater(CFieldConfig& fieldConfig, model::CAnomalyDetectorModelConfig& modelConfig);
CConfigUpdater(CAnomalyJobConfig& jobConfig,
CFieldConfig& fieldConfig,
model::CAnomalyDetectorModelConfig& modelConfig);

//! Update from given config changes
//! \param config the requested changes in an ini syntax
Expand All @@ -49,6 +52,7 @@ class API_EXPORT CConfigUpdater {
static const std::string SCHEDULED_EVENTS;

private:
CAnomalyJobConfig& m_JobConfig;
CFieldConfig& m_FieldConfig;
model::CAnomalyDetectorModelConfig& m_ModelConfig;
};
Expand Down
8 changes: 5 additions & 3 deletions lib/api/CAnomalyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <model/CStringStore.h>

#include <api/CAnnotationJsonWriter.h>
#include <api/CAnomalyJobConfig.h>
#include <api/CConfigUpdater.h>
#include <api/CFieldConfig.h>
#include <api/CHierarchicalResultsWriter.h>
Expand Down Expand Up @@ -127,6 +128,7 @@ const CAnomalyJob::TAnomalyDetectorPtr CAnomalyJob::NULL_DETECTOR;

CAnomalyJob::CAnomalyJob(const std::string& jobId,
model::CLimits& limits,
CAnomalyJobConfig& jobConfig,
CFieldConfig& fieldConfig,
model::CAnomalyDetectorModelConfig& modelConfig,
core::CJsonOutputStreamWrapper& outputStream,
Expand All @@ -139,8 +141,8 @@ CAnomalyJob::CAnomalyJob(const std::string& jobId,
: CDataProcessor{timeFieldName, timeFieldFormat}, m_JobId{jobId}, m_Limits{limits},
m_OutputStream{outputStream}, m_ForecastRunner{m_JobId, m_OutputStream,
limits.resourceMonitor()},
m_JsonOutputWriter{m_JobId, m_OutputStream}, m_FieldConfig{fieldConfig},
m_ModelConfig{modelConfig}, m_NumRecordsHandled{0},
m_JsonOutputWriter{m_JobId, m_OutputStream}, m_JobConfig{jobConfig},
m_FieldConfig{fieldConfig}, m_ModelConfig{modelConfig}, m_NumRecordsHandled{0},
m_LastFinalisedBucketEndTime{0}, m_PersistCompleteFunc{persistCompleteFunc},
m_MaxDetectors{std::numeric_limits<size_t>::max()},
m_PersistenceManager{persistenceManager}, m_MaxQuantileInterval{maxQuantileInterval},
Expand Down Expand Up @@ -455,7 +457,7 @@ void CAnomalyJob::acknowledgeFlush(const std::string& flushId) {

void CAnomalyJob::updateConfig(const std::string& config) {
LOG_DEBUG(<< "Received update config request: " << config);
CConfigUpdater configUpdater(m_FieldConfig, m_ModelConfig);
CConfigUpdater configUpdater(m_JobConfig, m_FieldConfig, m_ModelConfig);
if (configUpdater.update(config) == false) {
LOG_ERROR(<< "Failed to update configuration");
}
Expand Down
Loading