Skip to content

Commit

Permalink
[ML] Remove out-of-phase buckets feature (elastic#318)
Browse files Browse the repository at this point in the history
This feature was never fully completed and in fact
we no longer need it as the multibucket feature
covers the benefits from supporting out-of-phase buckets.
  • Loading branch information
dimitris-athanasiou committed Nov 22, 2018
1 parent 9014b2a commit 6d0d6c4
Show file tree
Hide file tree
Showing 53 changed files with 186 additions and 2,548 deletions.
6 changes: 0 additions & 6 deletions bin/autodetect/CCmdLineParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ bool CCmdLineParser::parse(int argc,
bool& isPersistFileNamedPipe,
size_t& maxAnomalyRecords,
bool& memoryUsage,
std::size_t& bucketResultsDelay,
bool& multivariateByFields,
TStrVec& clauseTokens) {
try {
Expand Down Expand Up @@ -111,8 +110,6 @@ bool CCmdLineParser::parse(int argc,
"The maximum number of records to be outputted for each bucket. Defaults to 100, a value 0 removes the limit.")
("memoryUsage",
"Log the model memory usage at the end of the job")
("resultFinalizationWindow", boost::program_options::value<std::size_t>(),
"The numer of half buckets to store before choosing which overlapping bucket has the biggest anomaly")
("multivariateByFields",
"Optional flag to enable multi-variate analysis of correlated by fields")
;
Expand Down Expand Up @@ -222,9 +219,6 @@ bool CCmdLineParser::parse(int argc,
if (vm.count("memoryUsage") > 0) {
memoryUsage = true;
}
if (vm.count("resultFinalizationWindow") > 0) {
bucketResultsDelay = vm["resultFinalizationWindow"].as<std::size_t>();
}
if (vm.count("multivariateByFields") > 0) {
multivariateByFields = true;
}
Expand Down
1 change: 0 additions & 1 deletion bin/autodetect/CCmdLineParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class CCmdLineParser {
bool& isPersistFileNamedPipe,
size_t& maxAnomalyRecords,
bool& memoryUsage,
std::size_t& bucketResultsDelay,
bool& multivariateByFields,
TStrVec& clauseTokens);

Expand Down
8 changes: 3 additions & 5 deletions bin/autodetect/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ int main(int argc, char** argv) {
bool isPersistFileNamedPipe(false);
size_t maxAnomalyRecords(100u);
bool memoryUsage(false);
std::size_t bucketResultsDelay(0);
bool multivariateByFields(false);
TStrVec clauseTokens;
if (ml::autodetect::CCmdLineParser::parse(
Expand All @@ -96,8 +95,8 @@ int main(int argc, char** argv) {
timeFormat, quantilesStateFile, deleteStateFiles, persistInterval,
maxQuantileInterval, inputFileName, isInputFileNamedPipe, outputFileName,
isOutputFileNamedPipe, restoreFileName, isRestoreFileNamedPipe,
persistFileName, isPersistFileNamedPipe, maxAnomalyRecords, memoryUsage,
bucketResultsDelay, multivariateByFields, clauseTokens) == false) {
persistFileName, isPersistFileNamedPipe, maxAnomalyRecords,
memoryUsage, multivariateByFields, clauseTokens) == false) {
return EXIT_FAILURE;
}

Expand Down Expand Up @@ -143,8 +142,7 @@ int main(int argc, char** argv) {
summaryCountFieldName.empty() ? ml::model_t::E_None : ml::model_t::E_Manual);
ml::model::CAnomalyDetectorModelConfig modelConfig =
ml::model::CAnomalyDetectorModelConfig::defaultConfig(
bucketSpan, summaryMode, summaryCountFieldName, latency,
bucketResultsDelay, multivariateByFields);
bucketSpan, summaryMode, summaryCountFieldName, latency, multivariateByFields);
modelConfig.detectionRules(ml::model::CAnomalyDetectorModelConfig::TIntDetectionRuleVecUMapCRef(
fieldConfig.detectionRules()));
modelConfig.scheduledEvents(ml::model::CAnomalyDetectorModelConfig::TStrDetectionRulePrVecCRef(
Expand Down
44 changes: 6 additions & 38 deletions include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@

#include <model/CAnomalyDetector.h>
#include <model/CAnomalyDetectorModelConfig.h>
#include <model/CBucketQueue.h>
#include <model/CHierarchicalResults.h>
#include <model/CHierarchicalResultsAggregator.h>
#include <model/CHierarchicalResultsNormalizer.h>
#include <model/CInterimBucketCorrector.h>
#include <model/CResourceMonitor.h>
#include <model/CResultsQueue.h>
#include <model/CSearchKey.h>

#include <api/CDataProcessor.h>
Expand Down Expand Up @@ -117,26 +115,20 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
std::pair<model::CSearchKey::TStrCRefKeyCRefPr, TAnomalyDetectorPtr>;
using TKeyCRefAnomalyDetectorPtrPrVec = std::vector<TKeyCRefAnomalyDetectorPtrPr>;
using TModelPlotDataVec = model::CAnomalyDetector::TModelPlotDataVec;
using TModelPlotDataVecCItr = TModelPlotDataVec::const_iterator;
using TModelPlotDataVecQueue = model::CBucketQueue<TModelPlotDataVec>;

struct API_EXPORT SRestoredStateDetail {
ERestoreStateStatus s_RestoredStateStatus;
boost::optional<std::string> s_Extra;
};

struct SBackgroundPersistArgs {
SBackgroundPersistArgs(const model::CResultsQueue& resultsQueue,
const TModelPlotDataVecQueue& modelPlotQueue,
core_t::TTime time,
SBackgroundPersistArgs(core_t::TTime time,
const model::CResourceMonitor::SResults& modelSizeStats,
const model::CInterimBucketCorrector& interimBucketCorrector,
const model::CHierarchicalResultsAggregator& aggregator,
core_t::TTime latestRecordTime,
core_t::TTime lastResultsTime);

model::CResultsQueue s_ResultsQueue;
TModelPlotDataVecQueue s_ModelPlotQueue;
core_t::TTime s_Time;
model::CResourceMonitor::SResults s_ModelSizeStats;
model::CInterimBucketCorrector s_InterimBucketCorrector;
Expand Down Expand Up @@ -224,14 +216,11 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
void outputInterimResults(core_t::TTime bucketStartTime);

//! Helper function for outputResults.
//! \p processingTimer is the processing time can be written to the bucket
//! \p sumPastProcessingTime is the total time previously spent processing
//! but resulted in no bucket being outputted.
//! \p processingTime is the processing time of the bucket
void writeOutResults(bool interim,
model::CHierarchicalResults& results,
core_t::TTime bucketTime,
uint64_t processingTime,
uint64_t sumPastProcessingTime);
uint64_t processingTime);

//! Reset buckets in the range specified by the control message.
void resetBuckets(const std::string& controlMessage);
Expand Down Expand Up @@ -259,8 +248,6 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {

//! Persist the detectors to a stream.
bool persistState(const std::string& descriptionPrefix,
const model::CResultsQueue& resultsQueue,
const TModelPlotDataVecQueue& modelPlotQueue,
core_t::TTime time,
const TKeyCRefAnomalyDetectorPtrPrVec& detectors,
const model::CResourceMonitor::SResults& modelSizeStats,
Expand Down Expand Up @@ -296,16 +283,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! \param[in] endTime The end of the time interval to skip sampling.
void skipSampling(core_t::TTime endTime);

//! Outputs queued results and resets the queue to the given \p startTime
void flushAndResetResultsQueue(core_t::TTime startTime);

//! Roll time forward to \p time
void timeNow(core_t::TTime time);

//! Get the bucketLength, or half the bucketLength if
//! out-of-phase buckets are active
core_t::TTime effectiveBucketLength() const;

//! Update configuration
void updateConfig(const std::string& config);

Expand Down Expand Up @@ -333,15 +313,12 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! specified time range.
void generateModelPlot(core_t::TTime startTime,
core_t::TTime endTime,
const model::CAnomalyDetector& detector);
const model::CAnomalyDetector& detector,
TModelPlotDataVec& modelPlotData);

//! Write the pre-generated model plot to the output stream of the user's
//! choosing: either file or streamed to the API
void writeOutModelPlot(core_t::TTime resultsTime);

//! Write the pre-generated model plot to the output stream of the user's
//! choosing: either file or streamed to the API
void writeOutModelPlot(core_t::TTime, CModelPlotDataJsonWriter& writer);
void writeOutModelPlot(const TModelPlotDataVec& modelPlotData);

//! Persist one detector to a stream.
//! This method is static so that there is no danger of it accessing
Expand Down Expand Up @@ -477,15 +454,6 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! The hierarchical results normalizer.
model::CHierarchicalResultsNormalizer m_Normalizer;

//! Store the last N half-buckets' results in order
//! to choose the best result
model::CResultsQueue m_ResultsQueue;

//! Also store the model plot for the buckets for each
//! result time - these will be output when the corresponding
//! result is output
TModelPlotDataVecQueue m_ModelPlotQueue;

friend class ::CBackgroundPersisterTest;
friend class ::CAnomalyJobTest;
};
Expand Down
11 changes: 0 additions & 11 deletions include/model/CAnomalyDetectorModel.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,17 +352,6 @@ class MODEL_EXPORT CAnomalyDetectorModel {
core_t::TTime endTime,
CResourceMonitor& resourceMonitor) = 0;

//! This samples the bucket statistics, and any state needed
//! by computeProbablity, in the time interval [\p startTime,
//! \p endTime], but does not update the model. This is needed
//! by the results preview.
//!
//! \param[in] startTime The start of the time interval to sample.
//! \param[in] endTime The end of the time interval to sample.
virtual void sampleOutOfPhase(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& resourceMonitor) = 0;

//! Rolls time to \p endTime while skipping sampling the models for
//! buckets within the gap.
//!
Expand Down
19 changes: 1 addition & 18 deletions include/model/CAnomalyDetectorModelConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {

//! Bucket length corresponding to the default decay and learn rates.
static const core_t::TTime STANDARD_BUCKET_LENGTH;

//! The default number of half buckets to store before choosing which
//! overlapping bucket has the biggest anomaly
static const std::size_t DEFAULT_BUCKET_RESULTS_DELAY;
//@}

//! \name Modelling
Expand Down Expand Up @@ -239,15 +235,12 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
//! then this is the name of the field holding the summary count.
//! \param[in] latency The amount of time records are buffered for, to
//! allow out-of-order records to be seen by the models in order.
//! \param[in] bucketResultsDelay The number of half-bucket results
//! to sit on before giving a definitive result.
//! \param[in] multivariateByFields Should multivariate analysis of
//! correlated 'by' fields be performed?
static CAnomalyDetectorModelConfig defaultConfig(core_t::TTime bucketLength,
model_t::ESummaryMode summaryMode,
const std::string& summaryCountFieldName,
core_t::TTime latency,
std::size_t bucketResultsDelay,
bool multivariateByFields);

//! Overload using defaults.
Expand All @@ -256,8 +249,7 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
model_t::ESummaryMode summaryMode = model_t::E_None,
const std::string& summaryCountFieldName = "") {
return defaultConfig(bucketLength, summaryMode, summaryCountFieldName,
DEFAULT_LATENCY_BUCKETS * bucketLength,
DEFAULT_BUCKET_RESULTS_DELAY, false);
DEFAULT_LATENCY_BUCKETS * bucketLength, false);
}

//! Get the factor to normalize all bucket lengths to the default
Expand All @@ -273,8 +265,6 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {

//! Set the data bucketing interval.
void bucketLength(core_t::TTime length);
//! Set the number of buckets to delay finalizing out-of-phase buckets.
void bucketResultsDelay(std::size_t delay);
//! Set the single interim bucket correction calculator.
void interimBucketCorrector(const TInterimBucketCorrectorPtr& interimBucketCorrector);
//! Set whether to model multibucket features.
Expand Down Expand Up @@ -358,9 +348,6 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
//! numbers of buckets.
std::size_t latencyBuckets() const;

//! Get the bucket result delay window.
std::size_t bucketResultsDelay() const;

//! Get the single interim bucket correction calculator.
const CInterimBucketCorrector& interimBucketCorrector() const;

Expand Down Expand Up @@ -444,10 +431,6 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
//! Bucket length.
core_t::TTime m_BucketLength;

//! Get the bucket result delay window: The numer of half buckets to
//! store before choosing which overlapping bucket has the biggest anomaly
std::size_t m_BucketResultsDelay;

//! Should multivariate analysis of correlated 'by' fields be performed?
bool m_MultivariateByFields;

Expand Down
6 changes: 5 additions & 1 deletion include/model/CBucketGatherer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class CResourceMonitor;
//! IMPLEMENTATION:\n
//! This functionality has been separated from the CDataGatherer in order
//! to allow the CDataGatherer to support multiple overlapping buckets and
//! buckets with different time spans.
//! buckets with different time spans. However, the overlapping feature
//! has been removed but this class is kept to avoid BWC issues.
class MODEL_EXPORT CBucketGatherer {
public:
using TDoubleVec = std::vector<double>;
Expand Down Expand Up @@ -394,6 +395,9 @@ class MODEL_EXPORT CBucketGatherer {
//! Create samples if possible for the bucket pointed out by \p time.
virtual void sample(core_t::TTime time) = 0;

//! Persist state by passing information \p inserter.
virtual void acceptPersistInserter(core::CStatePersistInserter& inserter) const = 0;

private:
//! Resize the necessary data structures so they can hold values
//! for the person and/or attribute identified by \p pid and \p cid,
Expand Down
11 changes: 0 additions & 11 deletions include/model/CCountingModel.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,6 @@ class MODEL_EXPORT CCountingModel : public CAnomalyDetectorModel {
core_t::TTime endTime,
CResourceMonitor& resourceMonitor);

//! This samples the bucket statistics, and any state needed
//! by computeProbablity, in the time interval [\p startTime,
//! \p endTime], but does not update the model. This is needed
//! by the results preview.
//!
//! \param[in] startTime The start of the time interval to sample.
//! \param[in] endTime The end of the time interval to sample.
virtual void sampleOutOfPhase(core_t::TTime startTime,
core_t::TTime endTime,
CResourceMonitor& resourceMonitor);

//! This samples the bucket statistics, in the time interval
//! [\p startTime, \p endTime].
//!
Expand Down
6 changes: 0 additions & 6 deletions include/model/CCountingModelFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ class MODEL_EXPORT CCountingModelFactory : public CModelFactory {

//! Set the features which will be modeled.
virtual void features(const TFeatureVec& features);

//! Set the bucket results delay
virtual void bucketResultsDelay(std::size_t bucketResultsDelay);
//@}

//! Get the minimum seasonal variance scale
Expand Down Expand Up @@ -153,9 +150,6 @@ class MODEL_EXPORT CCountingModelFactory : public CModelFactory {
//! The count features which will be modeled.
TFeatureVec m_Features;

//! The bucket results delay.
std::size_t m_BucketResultsDelay;

//! A cached search key.
mutable TOptionalSearchKey m_SearchKeyCache;
};
Expand Down
20 changes: 4 additions & 16 deletions include/model/CDataGatherer.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ class MODEL_EXPORT CDataGatherer {
CBucketQueue<TSizeSizePrStoredStringPtrPrUInt64UMapVec>;
using TSearchKeyCRef = boost::reference_wrapper<const CSearchKey>;
using TBucketGathererPtr = std::unique_ptr<CBucketGatherer>;
using TBucketGathererPtrVec = std::vector<TBucketGathererPtr>;
using TFeatureAnyPr = std::pair<model_t::EFeature, boost::any>;
using TFeatureAnyPrVec = std::vector<TFeatureAnyPr>;
using TMetricCategoryVec = std::vector<model_t::EMetricCategory>;
Expand Down Expand Up @@ -338,7 +337,7 @@ class MODEL_EXPORT CDataGatherer {
core_t::TTime bucketLength,
std::vector<std::pair<model_t::EFeature, T>>& result) const {
TFeatureAnyPrVec rawFeatureData;
this->chooseBucketGatherer(time).featureData(time, bucketLength, rawFeatureData);
m_BucketGatherer->featureData(time, bucketLength, rawFeatureData);

bool succeeded = true;

Expand Down Expand Up @@ -558,7 +557,7 @@ class MODEL_EXPORT CDataGatherer {
void timeNow(core_t::TTime time);

//! Print the current bucket.
std::string printCurrentBucket(core_t::TTime time) const;
std::string printCurrentBucket() const;

//! Record a attribute called \p attribute.
std::size_t addAttribute(const std::string& attribute,
Expand Down Expand Up @@ -591,9 +590,6 @@ class MODEL_EXPORT CDataGatherer {

//! Reset bucket and return true if bucket was successfully
//! reset or false otherwise.
//! Note that this should not be used in conjunction with out-of-phase buckets
//! where the concept of resetting a specific bucketed period of time is
//! not valid.
bool resetBucket(core_t::TTime bucketStart);

//! Release memory that is no longer needed
Expand Down Expand Up @@ -693,14 +689,6 @@ class MODEL_EXPORT CDataGatherer {
using TModelParamsCRef = boost::reference_wrapper<const SModelParams>;

private:
//! Select the correct bucket gatherer based on the time: if we have
//! out-of-phase buckets, select either in-phase or out-of-phase.
const CBucketGatherer& chooseBucketGatherer(core_t::TTime time) const;

//! Select the correct bucket gatherer based on the time: if we have
//! out-of-phase buckets, select either in-phase or out-of-phase.
CBucketGatherer& chooseBucketGatherer(core_t::TTime time);

//! Restore state from supplied traverser.
bool acceptRestoreTraverser(const std::string& summaryCountFieldName,
const std::string& personFieldName,
Expand Down Expand Up @@ -738,9 +726,9 @@ class MODEL_EXPORT CDataGatherer {
//! The collection of features on which to gather data.
TFeatureVec m_Features;

//! The collection of bucket gatherers which contain the bucket-specific
//! The bucket gatherer which contains the bucket-specific
//! metrics and counts.
TBucketGathererPtrVec m_Gatherers;
TBucketGathererPtr m_BucketGatherer;

//! Indicates whether the data being gathered are already summarized
//! by an external aggregation process.
Expand Down
2 changes: 1 addition & 1 deletion include/model/CEventRateBucketGatherer.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class MODEL_EXPORT CEventRateBucketGatherer final : public CBucketGatherer {
bool acceptRestoreTraverser(core::CStateRestoreTraverser& traverser);

//! Persist state by passing information to the supplied inserter
void acceptPersistInserter(core::CStatePersistInserter& inserter) const;
virtual void acceptPersistInserter(core::CStatePersistInserter& inserter) const;

//! Create a clone of this data gatherer that will result in the same
//! persisted state. The clone may be incomplete in ways that do not
Expand Down
Loading

0 comments on commit 6d0d6c4

Please sign in to comment.