Skip to content

Commit

Permalink
[7.x][ML] Parse JSON format config updates (#1682) (#1686)
Browse files Browse the repository at this point in the history
* Expect configuration update requests to be received in JSON formatted
strings. 
* Remove support for old, ini file parsing.

Relates elastic/elasticsearch#67721, #1253
Backports #1682
  • Loading branch information
edsavage authored Jan 21, 2021
1 parent 5ed5547 commit 0e7681a
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 187 deletions.
16 changes: 9 additions & 7 deletions include/api/CAnomalyJobConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class API_EXPORT CAnomalyJobConfig {
public:
class API_EXPORT CDetectorConfig {
public:
static const std::string DETECTOR_RULES;

static const std::string FUNCTION;
static const std::string FIELD_NAME;
static const std::string BY_FIELD_NAME;
Expand Down Expand Up @@ -287,6 +289,10 @@ class API_EXPORT CAnomalyJobConfig {

bool parseRules(int detectorIndex, const std::string& rules);

bool parseRules(int detectorIndex, const rapidjson::Value& rules);

bool parseRulesUpdate(const rapidjson::Value& rulesUpdateConfig);

private:
// Convenience method intended for use by the unit tests only
void addDetector(const std::string& functionName,
Expand All @@ -302,13 +308,8 @@ class API_EXPORT CAnomalyJobConfig {
overFieldName, partitionFieldName);
}

bool processFilter(const std::string& key, const std::string& value);

//! Process and store a scheduled event
bool processScheduledEvent(const boost::property_tree::ptree& propTree,
const std::string& key,
const std::string& value,
TIntSet& handledScheduledEvents);
bool parseRules(CDetectionRulesJsonParser::TDetectionRuleVec& detectionRules,
const rapidjson::Value& rules);

bool parseRules(CDetectionRulesJsonParser::TDetectionRuleVec& detectionRules,
const std::string& rules);
Expand Down Expand Up @@ -523,6 +524,7 @@ class API_EXPORT CAnomalyJobConfig {
return m_DataDescription;
}
const CModelPlotConfig& modelPlotConfig() const { return m_ModelConfig; }
CModelPlotConfig& modelPlotConfig() { return m_ModelConfig; }
const CAnalysisLimits& analysisLimits() const { return m_AnalysisLimits; }
bool isInitialized() const { return m_IsInitialized; }
core_t::TTime persistInterval() const {
Expand Down
12 changes: 2 additions & 10 deletions include/api/CConfigUpdater.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ namespace api {
//! update, a control message is being sent with the requested
//! configuration changes. This class is responsible for parsing
//! text with the requested configuration changes and apply them.
//! The changes are expected in an ini type of syntax.
//! The changes are expected in a JSON document.
//!
//! IMPLEMENTATION DECISIONS:\n
//! As long as the parsing of the configuration changes is
//! done successfully, the updater tries to apply as many
//! changes as possible even if it fails on a particular
//! change (e.g. unknown stanza name).
//! change.
//!
class API_EXPORT CConfigUpdater {
public:
Expand All @@ -41,14 +41,6 @@ class API_EXPORT CConfigUpdater {
//! \param config the requested changes in an ini syntax
bool update(const std::string& config);

private:
static const std::string MODEL_DEBUG_CONFIG;
static const std::string DETECTOR_RULES;
static const std::string DETECTOR_INDEX;
static const std::string RULES_JSON;
static const std::string FILTERS;
static const std::string SCHEDULED_EVENTS;

private:
CAnomalyJobConfig& m_JobConfig;
model::CAnomalyDetectorModelConfig& m_ModelConfig;
Expand Down
6 changes: 6 additions & 0 deletions include/model/CAnomalyDetectorModelConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
//! Get the central confidence interval for the model debug plot.
double modelPlotBoundsPercentile() const;

//! Is model plot enabled?
bool modelPlotEnabled() const;

//! Are annotations enabled for each of the models?
bool modelPlotAnnotationsEnabled() const;

Expand Down Expand Up @@ -451,6 +454,9 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
//! A cache of customized factories requested from this config.
mutable TSearchKeyFactoryCPtrMap m_FactoryCache;

//! Is model plot enabled?
bool m_ModelPlotEnabled{false};

//! Are annotations enabled for each of the models?
bool m_ModelPlotAnnotationsEnabled{false};

Expand Down
137 changes: 42 additions & 95 deletions lib/api/CAnomalyJobConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ const std::string CAnomalyJobConfig::CAnalysisConfig::SCHEDULED_EVENT_PREFIX("sc
const std::string CAnomalyJobConfig::CAnalysisConfig::DESCRIPTION_SUFFIX(".description");
const std::string CAnomalyJobConfig::CAnalysisConfig::RULES_SUFFIX(".rules");

const std::string CAnomalyJobConfig::CAnalysisConfig::CDetectorConfig::DETECTOR_RULES{"detector_rules"};

const std::string CAnomalyJobConfig::CAnalysisConfig::CDetectorConfig::FUNCTION{"function"};
const std::string CAnomalyJobConfig::CAnalysisConfig::CDetectorConfig::FIELD_NAME{"field_name"};
const std::string CAnomalyJobConfig::CAnalysisConfig::CDetectorConfig::BY_FIELD_NAME{"by_field_name"};
Expand Down Expand Up @@ -316,6 +318,15 @@ const CAnomalyJobConfigReader DETECTOR_CONFIG_READER{[] {
return theReader;
}()};

const CAnomalyJobConfigReader CUSTOM_RULES_UPDATE_CONFIG_READER{[] {
CAnomalyJobConfigReader theReader;
theReader.addParameter(CAnomalyJobConfig::CAnalysisConfig::CDetectorConfig::DETECTOR_INDEX,
CAnomalyJobConfigReader::E_RequiredParameter);
theReader.addParameter(CAnomalyJobConfig::CAnalysisConfig::CDetectorConfig::CUSTOM_RULES,
CAnomalyJobConfigReader::E_RequiredParameter);
return theReader;
}()};

const CAnomalyJobConfigReader PPC_CONFIG_READER{[] {
CAnomalyJobConfigReader theReader;
theReader.addParameter(CAnomalyJobConfig::CAnalysisConfig::ENABLED,
Expand Down Expand Up @@ -431,6 +442,8 @@ bool CAnomalyJobConfig::parseEventConfig(const std::string& json) {
return false;
}

m_ScheduledEvents.clear();

if (doc.ObjectEmpty()) {
return true;
}
Expand All @@ -443,6 +456,7 @@ bool CAnomalyJobConfig::parseEventConfig(const std::string& json) {

const rapidjson::Value& value = doc[EVENTS];

m_Events.clear();
m_Events.resize(value.Size());
for (unsigned int i = 0; i < value.Size(); ++i) {
if (value[i].IsObject() == false) {
Expand Down Expand Up @@ -488,9 +502,10 @@ void CAnomalyJobConfig::CEventConfig::parse(const rapidjson::Value& filterConfig
scheduledEvents.emplace_back(m_Description, m_DetectionRules[0]);
}

bool CAnomalyJobConfig::parseFilterConfig(const std::string& json) {
bool CAnomalyJobConfig::parseFilterConfig(const std::string& jsonString) {

rapidjson::Document doc;
if (doc.Parse<0>(json).HasParseError()) {
if (doc.Parse<0>(jsonString).HasParseError()) {
LOG_ERROR(<< "An error occurred while parsing filter config from JSON: "
<< doc.GetParseError());
return false;
Expand All @@ -502,7 +517,8 @@ bool CAnomalyJobConfig::parseFilterConfig(const std::string& json) {

try {
if (doc.HasMember(FILTERS) == false || doc[FILTERS].IsArray() == false) {
LOG_ERROR(<< "Missing expected array field '" << FILTERS << "'. JSON: " << json);
LOG_ERROR(<< "Missing expected array field '" << FILTERS
<< "'. JSON: " << jsonString);
return false;
}

Expand All @@ -514,7 +530,6 @@ bool CAnomalyJobConfig::parseFilterConfig(const std::string& json) {
<< toString(value[i]));
return false;
}

m_Filters[i].parse(value[i], m_RuleFilters);
}
} catch (CAnomalyJobConfigReader::CParseError& e) {
Expand Down Expand Up @@ -717,114 +732,47 @@ void CAnomalyJobConfig::CAnalysisConfig::parse(const rapidjson::Value& analysisC
m_MultivariateByFields = parameters[MULTIVARIATE_BY_FIELDS].fallback(false);
}

// TODO: Process updates as JSON
bool CAnomalyJobConfig::CAnalysisConfig::updateFilters(const boost::property_tree::ptree& propTree) {
for (const auto& filterEntry : propTree) {
const std::string& key = filterEntry.first;
const std::string& value = filterEntry.second.data();
if (this->processFilter(key, value) == false) {
return false;
bool CAnomalyJobConfig::CAnalysisConfig::parseRulesUpdate(const rapidjson::Value& rulesUpdateConfig) {
try {
auto parameters = CUSTOM_RULES_UPDATE_CONFIG_READER.read(rulesUpdateConfig);
int detectorIndex = parameters[CDetectorConfig::DETECTOR_INDEX].as<int>();
auto customRules = parameters[CDetectorConfig::CUSTOM_RULES].jsonObject();
if (customRules != nullptr) {
m_DetectorRules[detectorIndex].clear();
if (this->parseRules(detectorIndex, *customRules) == false) {
LOG_ERROR(<< "Failed to update detector rules for detector: " << detectorIndex);
return false;
}
}
}
return true;
}

// TODO: Process updates as JSON
bool CAnomalyJobConfig::CAnalysisConfig::processFilter(const std::string& key,
const std::string& value) {
// expected format is filter.<filterId>=[json, array]
std::size_t sepPos{key.find(SUFFIX_SEPARATOR)};
if (sepPos == std::string::npos) {
LOG_ERROR(<< "Unrecognised filter key: " + key);
} catch (CAnomalyJobConfigReader::CParseError& e) {
LOG_ERROR(<< "Error parsing events config: " << e.what());
return false;
}
std::string filterId = key.substr(sepPos + 1);
core::CPatternSet& filter = m_RuleFilters[filterId];
return filter.initFromJson(value);
}

// TODO: Process updates as JSON
bool CAnomalyJobConfig::CAnalysisConfig::updateScheduledEvents(const boost::property_tree::ptree& propTree) {
m_ScheduledEvents.clear();

bool isClear = propTree.get(CLEAR, false);
if (isClear) {
return true;
}

TIntSet handledScheduledEvents;

for (const auto& scheduledEventEntry : propTree) {
const std::string& key = scheduledEventEntry.first;
const std::string& value = scheduledEventEntry.second.data();
if (this->processScheduledEvent(propTree, key, value, handledScheduledEvents) == false) {
return false;
}
}
return true;
}

// TODO: Process updates as JSON
bool CAnomalyJobConfig::CAnalysisConfig::processScheduledEvent(
const boost::property_tree::ptree& propTree,
const std::string& key,
const std::string& value,
TIntSet& handledScheduledEvents) {
// Here we pull out the "1" in "scheduledevent.1.description"
// description may contain a '.'
std::size_t sepPos{key.find(SUFFIX_SEPARATOR, SCHEDULED_EVENT_PREFIX.length() + 1)};
if (sepPos == std::string::npos || sepPos == key.length() - 1) {
LOG_ERROR(<< "Unrecognised configuration option " << key << " = " << value);
return false;
}

std::string indexString{key, SCHEDULED_EVENT_PREFIX.length(),
sepPos - SCHEDULED_EVENT_PREFIX.length()};
int indexKey;
if (core::CStringUtils::stringToType(indexString, indexKey) == false) {
LOG_ERROR(<< "Cannot convert config key to integer: " << indexString);
return false;
}

// Check if we've already seen this key
if (handledScheduledEvents.insert(indexKey).second == false) {
// Not an error
return true;
}

std::string description{propTree.get(
boost::property_tree::ptree::path_type{
SCHEDULED_EVENT_PREFIX + indexString + DESCRIPTION_SUFFIX, '\t'},
EMPTY_STRING)};

std::string rules{propTree.get(
boost::property_tree::ptree::path_type{
SCHEDULED_EVENT_PREFIX + indexString + RULES_SUFFIX, '\t'},
EMPTY_STRING)};

CDetectionRulesJsonParser::TDetectionRuleVec detectionRules;
if (this->parseRules(detectionRules, rules) == false) {
// parseRules() will have logged the error
return false;
}
bool CAnomalyJobConfig::CAnalysisConfig::parseRules(int detectorIndex,
const rapidjson::Value& rules) {
return parseRules(m_DetectorRules[detectorIndex], rules);
}

if (detectionRules.size() != 1) {
LOG_ERROR(<< "Scheduled events must have exactly 1 rule");
bool CAnomalyJobConfig::CAnalysisConfig::parseRules(CDetectionRulesJsonParser::TDetectionRuleVec& detectionRules,
const rapidjson::Value& rules) {
CDetectionRulesJsonParser rulesParser{m_RuleFilters};
std::string errorString;
if (rulesParser.parseRules(rules, detectionRules, errorString) == false) {
LOG_ERROR(<< "Error parsing detector rules: " << errorString);
return false;
}

m_ScheduledEvents.emplace_back(description, detectionRules[0]);

return true;
}

// TODO: Process updates as JSON
bool CAnomalyJobConfig::CAnalysisConfig::parseRules(int detectorIndex,
const std::string& rules) {
return parseRules(m_DetectorRules[detectorIndex], rules);
}

// TODO: Process updates as JSON
bool CAnomalyJobConfig::CAnalysisConfig::parseRules(CDetectionRulesJsonParser::TDetectionRuleVec& detectionRules,
const std::string& rules) {
if (rules.empty()) {
Expand Down Expand Up @@ -921,7 +869,6 @@ bool CAnomalyJobConfig::CAnalysisConfig::CDetectorConfig::determineFunction(bool
// Some functions must take a field, some mustn't and for the rest it's
// optional. Validate this based on the contents of these flags after
// determining the function. Similarly for by fields.
// TODO: Check how much validation is required here (if any) if parsing JSON job config.
bool fieldRequired{false};
bool fieldInvalid{false};
bool byFieldRequired{false};
Expand Down
Loading

0 comments on commit 0e7681a

Please sign in to comment.