Skip to content

Commit

Permalink
[6.5][ML] Correct query times for model plot and forecast (elastic#339)
Browse files Browse the repository at this point in the history
Backport elastic#327.
  • Loading branch information
tveasey authored Dec 13, 2018
1 parent dbe5949 commit 8b723aa
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 28 deletions.
8 changes: 8 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
//=== Regressions
== {es} version 6.5.4
=== Bug Fixes
Correct query times for model plot and forecast in the bucket to match the times we assign
the samples we add to the model for each bucket. For long bucket lengths, this could result
in apparently shifted model plot with respect to the data and increased errors in forecasts.
== {es} version 6.5.0
//=== Breaking Changes
Expand Down
2 changes: 1 addition & 1 deletion include/api/CForecastRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
using TAnomalyDetectorPtr = std::shared_ptr<model::CAnomalyDetector>;
using TAnomalyDetectorPtrVec = std::vector<TAnomalyDetectorPtr>;

using TForecastModelWrapper = model::CForecastDataSink::SForecastModelWrapper;
using TForecastModelWrapper = model::CForecastDataSink::CForecastModelWrapper;
using TForecastResultSeries = model::CForecastDataSink::SForecastResultSeries;
using TForecastResultSeriesVec = std::vector<TForecastResultSeries>;
using TMathsModelPtr = std::unique_ptr<maths::CModel>;
Expand Down
28 changes: 19 additions & 9 deletions include/model/CForecastDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,31 @@ class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable {
public:
using TMathsModelPtr = std::shared_ptr<maths::CModel>;
using TStrUMap = boost::unordered_set<std::string>;
struct SForecastResultSeries;

//! Wrapper for 1 timeseries model, its feature and by Field
struct MODEL_EXPORT SForecastModelWrapper {
SForecastModelWrapper(model_t::EFeature feature,
class MODEL_EXPORT CForecastModelWrapper {
public:
CForecastModelWrapper(model_t::EFeature feature,
TMathsModelPtr&& forecastModel,
const std::string& byFieldValue);

SForecastModelWrapper(SForecastModelWrapper&& other);
CForecastModelWrapper(CForecastModelWrapper&& other);

SForecastModelWrapper(const SForecastModelWrapper& that) = delete;
SForecastModelWrapper& operator=(const SForecastModelWrapper&) = delete;
CForecastModelWrapper(const CForecastModelWrapper& that) = delete;
CForecastModelWrapper& operator=(const CForecastModelWrapper&) = delete;

model_t::EFeature s_Feature;
TMathsModelPtr s_ForecastModel;
std::string s_ByFieldValue;
bool forecast(const SForecastResultSeries& series,
core_t::TTime startTime,
core_t::TTime endTime,
double boundsPercentile,
CForecastDataSink& sink,
std::string& message) const;

private:
model_t::EFeature m_Feature;
TMathsModelPtr m_ForecastModel;
std::string m_ByFieldValue;
};

//! Everything that defines 1 series of forecasts
Expand All @@ -68,7 +78,7 @@ class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable {

SModelParams s_ModelParams;
int s_DetectorIndex;
std::vector<SForecastModelWrapper> s_ToForecast;
std::vector<CForecastModelWrapper> s_ToForecast;
std::string s_ToForecastPersisted;
std::string s_PartitionFieldName;
std::string s_PartitionFieldValue;
Expand Down
15 changes: 4 additions & 11 deletions lib/api/CForecastRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,10 @@ void CForecastRunner::forecastWorker() {
}
}

const TForecastModelWrapper& model = series.s_ToForecast.back();
model_t::TDouble1VecDouble1VecPr support =
model_t::support(model.s_Feature);
bool success = model.s_ForecastModel->forecast(
forecastJob.s_StartTime, forecastJob.forecastEnd(),
forecastJob.s_BoundsPercentile, support.first, support.second,
boost::bind(&model::CForecastDataSink::push, &sink, _1,
model_t::print(model.s_Feature), series.s_PartitionFieldName,
series.s_PartitionFieldValue, series.s_ByFieldName,
model.s_ByFieldValue, series.s_DetectorIndex),
message);
const TForecastModelWrapper& model{series.s_ToForecast.back()};
bool success{model.forecast(
series, forecastJob.s_StartTime, forecastJob.forecastEnd(),
forecastJob.s_BoundsPercentile, sink, message)};
series.s_ToForecast.pop_back();

if (success == false) {
Expand Down
32 changes: 26 additions & 6 deletions lib/model/CForecastDataSink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <core/CLogger.h>
#include <core/CScopedRapidJsonPoolAllocator.h>

#include <boost/bind.hpp>

#include <vector>

namespace ml {
Expand Down Expand Up @@ -54,16 +56,34 @@ const std::string CForecastDataSink::STATUS("forecast_status");

using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<core::CRapidJsonConcurrentLineWriter>;

CForecastDataSink::SForecastModelWrapper::SForecastModelWrapper(model_t::EFeature feature,
CForecastDataSink::CForecastModelWrapper::CForecastModelWrapper(model_t::EFeature feature,
TMathsModelPtr&& forecastModel,
const std::string& byFieldValue)
: s_Feature(feature), s_ForecastModel(std::move(forecastModel)),
s_ByFieldValue(byFieldValue) {
: m_Feature(feature), m_ForecastModel(std::move(forecastModel)),
m_ByFieldValue(byFieldValue) {
}

CForecastDataSink::CForecastModelWrapper::CForecastModelWrapper(CForecastModelWrapper&& other)
: m_Feature(other.m_Feature), m_ForecastModel(std::move(other.m_ForecastModel)),
m_ByFieldValue(std::move(other.m_ByFieldValue)) {
}

CForecastDataSink::SForecastModelWrapper::SForecastModelWrapper(SForecastModelWrapper&& other)
: s_Feature(other.s_Feature), s_ForecastModel(std::move(other.s_ForecastModel)),
s_ByFieldValue(std::move(other.s_ByFieldValue)) {
bool CForecastDataSink::CForecastModelWrapper::forecast(const SForecastResultSeries& series,
core_t::TTime startTime,
core_t::TTime endTime,
double boundsPercentile,
CForecastDataSink& sink,
std::string& message) const {
core_t::TTime bucketLength{m_ForecastModel->params().bucketLength()};
startTime = model_t::sampleTime(m_Feature, startTime, bucketLength);
endTime = model_t::sampleTime(m_Feature, endTime, bucketLength);
model_t::TDouble1VecDouble1VecPr support{model_t::support(m_Feature)};
return m_ForecastModel->forecast(
startTime, endTime, boundsPercentile, support.first, support.second,
boost::bind(&model::CForecastDataSink::push, &sink, _1, model_t::print(m_Feature),
series.s_PartitionFieldName, series.s_PartitionFieldValue,
series.s_ByFieldName, m_ByFieldValue, series.s_DetectorIndex),
message);
}

CForecastDataSink::SForecastResultSeries::SForecastResultSeries(const SModelParams& modelParams)
Expand Down
3 changes: 2 additions & 1 deletion lib/model/CModelDetailsView.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ void CModelDetailsView::modelPlotForByFieldId(core_t::TTime time,

if (this->isByFieldIdActive(byFieldId)) {
const maths::CModel* model = this->model(feature, byFieldId);
if (!model) {
if (model == nullptr) {
return;
}

std::size_t dimension = model_t::dimension(feature);
time = model_t::sampleTime(feature, time, model->params().bucketLength());

maths_t::TDouble2VecWeightsAry weights(
maths_t::CUnitWeights::unit<TDouble2Vec>(dimension));
Expand Down

0 comments on commit 8b723aa

Please sign in to comment.