Skip to content

Commit

Permalink
[7.x][ML] Parse anomaly job JSON config file (#1552) (#1560)
Browse files Browse the repository at this point in the history
Add a parser for the recently added anomaly job configuration file, which
is in JSON format.

As an initial step to the configuration file replacing a number of
command line arguments, modify autodetect to use the jobId extracted by
the parser.

Relates to #1253
Backports #1552
  • Loading branch information
edsavage authored Nov 9, 2020
1 parent 7e87671 commit 282f212
Show file tree
Hide file tree
Showing 20 changed files with 1,918 additions and 43 deletions.
4 changes: 0 additions & 4 deletions bin/autodetect/CCmdLineParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ bool CCmdLineParser::parse(int argc,
std::string& modelConfigFile,
std::string& fieldConfigFile,
std::string& modelPlotConfigFile,
std::string& jobId,
std::string& logProperties,
std::string& logPipe,
core_t::TTime& bucketSpan,
Expand Down Expand Up @@ -172,9 +171,6 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("modelplotconfig") > 0) {
modelPlotConfigFile = vm["modelplotconfig"].as<std::string>();
}
if (vm.count("jobid") > 0) {
jobId = vm["jobid"].as<std::string>();
}
if (vm.count("logProperties") > 0) {
logProperties = vm["logProperties"].as<std::string>();
}
Expand Down
1 change: 0 additions & 1 deletion bin/autodetect/CCmdLineParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class CCmdLineParser {
std::string& modelConfigFile,
std::string& fieldConfigFile,
std::string& modelPlotConfigFile,
std::string& jobId,
std::string& logProperties,
std::string& logPipe,
core_t::TTime& bucketSpan,
Expand Down
21 changes: 16 additions & 5 deletions bin/autodetect/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <core/CLogger.h>
#include <core/CProcessPriority.h>
#include <core/CProgramCounters.h>
#include <core/CStringUtils.h>
#include <core/CoreTypes.h>

#include <ver/CBuildInfo.h>
Expand All @@ -29,6 +30,7 @@
#include <model/ModelTypes.h>

#include <api/CAnomalyJob.h>
#include <api/CAnomalyJobConfig.h>
#include <api/CCmdSkeleton.h>
#include <api/CCsvInputParser.h>
#include <api/CFieldConfig.h>
Expand All @@ -51,7 +53,6 @@
#include <cstdlib>
#include <functional>
#include <memory>
#include <string>

int main(int argc, char** argv) {

Expand Down Expand Up @@ -87,7 +88,6 @@ int main(int argc, char** argv) {
std::string modelConfigFile;
std::string fieldConfigFile;
std::string modelPlotConfigFile;
std::string jobId;
std::string logProperties;
std::string logPipe;
ml::core_t::TTime bucketSpan{0};
Expand Down Expand Up @@ -120,7 +120,7 @@ int main(int argc, char** argv) {
TStrVec clauseTokens;
if (ml::autodetect::CCmdLineParser::parse(
argc, argv, configFile, limitConfigFile, modelConfigFile, fieldConfigFile,
modelPlotConfigFile, jobId, logProperties, logPipe, bucketSpan, latency,
modelPlotConfigFile, logProperties, logPipe, bucketSpan, latency,
summaryCountFieldName, delimiter, lengthEncodedInput, timeField,
timeFormat, quantilesStateFile, deleteStateFiles, persistInterval,
bucketPersistInterval, maxQuantileInterval, namedPipeConnectTimeout,
Expand Down Expand Up @@ -175,8 +175,18 @@ int main(int argc, char** argv) {
// hence is done before reducing CPU priority.
ml::core::CProcessPriority::reduceCpuPriority();

if (jobId.empty()) {
LOG_FATAL(<< "No job ID specified");
std::string anomalyJobConfigJson;
bool couldReadConfigFile;
std::tie(anomalyJobConfigJson, couldReadConfigFile) =
ml::core::CStringUtils::readFileToString(configFile);
if (couldReadConfigFile == false) {
LOG_FATAL(<< "Failed to read config file '" << configFile << "'");
return EXIT_FAILURE;
}

ml::api::CAnomalyJobConfig jobConfig;
if (jobConfig.parse(anomalyJobConfigJson) == false) {
LOG_FATAL(<< "Failed to parse anomaly job config: '" << anomalyJobConfigJson << "'");
return EXIT_FAILURE;
}

Expand Down Expand Up @@ -276,6 +286,7 @@ int main(int argc, char** argv) {
mutableFields, ioMgr.inputStream(), delimiter);
}()};

const std::string jobId{jobConfig.jobId()};
ml::core::CJsonOutputStreamWrapper wrappedOutputStream{ioMgr.outputStream()};
ml::api::CModelSnapshotJsonWriter modelSnapshotWriter{jobId, wrappedOutputStream};

Expand Down
15 changes: 3 additions & 12 deletions bin/data_frame_analyzer/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <core/CNonInstantiatable.h>
#include <core/CProcessPriority.h>
#include <core/CProgramCounters.h>
#include <core/CStringUtils.h>
#include <core/Concurrency.h>

#include <ver/CBuildInfo.h>
Expand All @@ -46,17 +47,6 @@
#include <string>

namespace {
std::pair<std::string, bool> readFileToString(const std::string& fileName) {
std::ifstream fileStream{fileName};
if (fileStream.is_open() == false) {
LOG_FATAL(<< "Environment error: failed to open file '" << fileName << "'.");
return {std::string{}, false};
}
return {std::string{std::istreambuf_iterator<char>{fileStream},
std::istreambuf_iterator<char>{}},
true};
}

class CCleanUpOnExit : private ml::core::CNonInstantiatable {
public:
using TTemporaryDirectoryPtr = std::shared_ptr<ml::core::CTemporaryDirectory>;
Expand Down Expand Up @@ -169,7 +159,8 @@ int main(int argc, char** argv) {

std::string analysisSpecificationJson;
bool couldReadConfigFile;
std::tie(analysisSpecificationJson, couldReadConfigFile) = readFileToString(configFile);
std::tie(analysisSpecificationJson, couldReadConfigFile) =
ml::core::CStringUtils::readFileToString(configFile);
if (couldReadConfigFile == false) {
LOG_FATAL(<< "Failed to read config file '" << configFile << "'");
return EXIT_FAILURE;
Expand Down
243 changes: 243 additions & 0 deletions include/api/CAnomalyJobConfig.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
#ifndef INCLUDED_ml_api_CAnomalyJobConfig_h
#define INCLUDED_ml_api_CAnomalyJobConfig_h

#include <core/CLogger.h>

#include <api/CDetectionRulesJsonParser.h>
#include <api/ImportExport.h>

#include <model/CLimits.h>

#include <rapidjson/document.h>

#include <string>
#include <vector>

namespace ml {
namespace api {

//! \brief A parser to convert JSON configuration of an anomaly job JSON into an object
class API_EXPORT CAnomalyJobConfig {
public:
class API_EXPORT CAnalysisConfig {
public:
class API_EXPORT CDetectorConfig {
public:
static const std::string FUNCTION;
static const std::string FIELD_NAME;
static const std::string BY_FIELD_NAME;
static const std::string OVER_FIELD_NAME;
static const std::string PARTITION_FIELD_NAME;
static const std::string DETECTOR_DESCRIPTION;
static const std::string EXCLUDE_FREQUENT;
static const std::string CUSTOM_RULES;
static const std::string USE_NULL;

public:
CDetectorConfig() {}

void parse(const rapidjson::Value& json);

std::string function() const { return m_Function; }
std::string fieldName() const { return m_FieldName; }
std::string byFieldName() const { return m_ByFieldName; }
std::string overFieldName() const { return m_OverFieldName; }
std::string partitionFieldName() const {
return m_PartitionFieldName;
}
std::string excludeFrequent() const { return m_ExcludeFrequent; }
std::string detectorDescription() const {
return m_DetectorDescription;
}
CDetectionRulesJsonParser::TDetectionRuleVec customRules() const {
return m_CustomRules;
}
bool useNull() const { return m_UseNull; }

private:
std::string m_Function{};
std::string m_FieldName{};
std::string m_ByFieldName{};
std::string m_OverFieldName{};
std::string m_PartitionFieldName{};
std::string m_ExcludeFrequent{};
std::string m_DetectorDescription{};
CDetectionRulesJsonParser::TDetectionRuleVec m_CustomRules{};
bool m_UseNull{false};
};

public:
static const std::string BUCKET_SPAN;
static const std::string SUMMARY_COUNT_FIELD_NAME;
static const std::string CATEGORIZATION_FIELD_NAME;
static const std::string CATEGORIZATION_FILTERS;
static const std::string DETECTORS;
static const std::string INFLUENCERS;
static const std::string LATENCY;
static const std::string PER_PARTITION_CATEGORIZATION;
static const std::string ENABLED;
static const std::string STOP_ON_WARN;

static const core_t::TTime DEFAULT_BUCKET_SPAN;

public:
using TStrVec = std::vector<std::string>;
using TDetectorConfigVec = std::vector<CDetectorConfig>;

public:
CAnalysisConfig() {}

void parse(const rapidjson::Value& json);

core_t::TTime bucketSpan() const { return m_BucketSpan; }

std::string summaryCountFieldName() const {
return m_SummaryCountFieldName;
}
std::string categorizationFieldName() const {
return m_CategorizationFieldName;
}
const TStrVec& categorizationFilters() const {
return m_CategorizationFilters;
}
bool perPartitionCategorizationEnabled() const {
return m_PerPartitionCategorizationEnabled;
}
bool perPartitionCategorizationStopOnWarn() const {
return m_PerPartitionCategorizationStopOnWarn;
}
const TDetectorConfigVec& detectorsConfig() const {
return m_Detectors;
}
const TStrVec& influencers() const { return m_Influencers; }
std::string latency() const { return m_Latency; }

static core_t::TTime bucketSpanSeconds(const std::string& bucketSpanString);

private:
std::size_t m_BucketSpan{300}; // 5m
std::string m_SummaryCountFieldName{};
std::string m_CategorizationFieldName{};
TStrVec m_CategorizationFilters{};
bool m_PerPartitionCategorizationEnabled{false};
bool m_PerPartitionCategorizationStopOnWarn{false};
TDetectorConfigVec m_Detectors{};
TStrVec m_Influencers{};
std::string m_Latency{};
};

class API_EXPORT CDataDescription {
public:
static const std::string TIME_FIELD;

// The time format present in the job config is in Java format and
// should be ignored in the C++ code. In any case, in production, the
// time field is always specified in seconds since epoch.
static const std::string TIME_FORMAT;

public:
//! Default constructor
CDataDescription() {}

void parse(const rapidjson::Value& json);

std::string timeField() const { return m_TimeField; }

private:
std::string m_TimeField; // e.g. timestamp
std::string m_TimeFormat; // e.g. epoch_ms
};

class API_EXPORT CModelPlotConfig {
public:
static const std::string ANNOTATIONS_ENABLED;
static const std::string ENABLED;
static const std::string TERMS;

public:
//! Default constructor
CModelPlotConfig() {}

void parse(const rapidjson::Value& modelPlotConfig);

bool annotationsEnabled() const { return m_AnnotationsEnabled; }
bool enabled() const { return m_Enabled; }

// The terms string is a comma separated list of partition or
// by field values, but with no form of escaping.
// TODO improve this to be a more robust format
std::string terms() const { return m_Terms; }

private:
bool m_AnnotationsEnabled{false};
bool m_Enabled{false};
std::string m_Terms;
};

class API_EXPORT CAnalysisLimits {
public:
static const std::string MODEL_MEMORY_LIMIT;
static const std::string CATEGORIZATION_EXAMPLES_LIMIT;

static const std::size_t DEFAULT_MEMORY_LIMIT_BYTES;

public:
//! Default constructor
CAnalysisLimits() {}

void parse(const rapidjson::Value& analysLimits);

//! Size of the memory limit for the resource monitor
//! as a whole number of MB.
std::size_t modelMemoryLimit() const { return m_ModelMemoryLimit; }

long categorizationExamplesLimit() const {
return m_CategorizationExamplesLimit;
}

static std::size_t modelMemoryLimitMb(const std::string& memoryLimitStr);

private:
std::size_t m_CategorizationExamplesLimit{model::CLimits::DEFAULT_RESULTS_MAX_EXAMPLES};
std::size_t m_ModelMemoryLimit{};
};

public:
static const std::string JOB_ID;
static const std::string JOB_TYPE;
static const std::string ANALYSIS_CONFIG;
static const std::string DATA_DESCRIPTION;
static const std::string MODEL_PLOT_CONFIG;
static const std::string ANALYSIS_LIMITS;

public:
//! Default constructor
CAnomalyJobConfig() {}

bool parse(const std::string& json);

std::string jobId() const { return m_JobId; }
std::string jobType() const { return m_JobType; }
const CAnalysisConfig& analysisConfig() const { return m_AnalysisConfig; }
const CDataDescription& dataDescription() const {
return m_DataDescription;
}
const CModelPlotConfig& modelPlotConfig() const { return m_ModelConfig; }
const CAnalysisLimits& analysisLimits() const { return m_AnalysisLimits; }

private:
std::string m_JobId;
std::string m_JobType;
CAnalysisConfig m_AnalysisConfig;
CDataDescription m_DataDescription;
CModelPlotConfig m_ModelConfig;
CAnalysisLimits m_AnalysisLimits;
};
}
}
#endif // INCLUDED_ml_api_CAnomalyJobConfig_h
Loading

0 comments on commit 282f212

Please sign in to comment.