Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.12][ML] Fix bug persisting and restoring time series model decay rate controller #1672

Merged
merged 1 commit into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@

* Fix potential cause for log errors from CXMeansOnline1d. (See {ml-pull}1586[#1586].)
* Fix scaling of some hyperparameter for Bayesian optimization. (See {ml-pull}1612[#1612].)
* Fix missing state in persist and restore for anomaly detection. This caused suboptimal
modelling after a job was closed and reopened or failed over to a different node.
(See {ml-pull}1668[#1668].)

== {es} version 7.10.1

Expand Down
30 changes: 22 additions & 8 deletions include/maths/CDecayRateController.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
#include <maths/CPRNG.h>
#include <maths/ImportExport.h>

#include <stdint.h>
#include <array>
#include <cstdint>

namespace ml {
namespace core {
Expand Down Expand Up @@ -60,6 +61,12 @@ class MATHS_EXPORT CDecayRateController {
CDecayRateController();
CDecayRateController(int checks, std::size_t dimension);

//! Get the checks which this controller is performing.
int checks() const;

//! Set the checks which this controller is performing.
void checks(int checks);

//! Reset the errors.
void reset();

Expand Down Expand Up @@ -90,21 +97,28 @@ class MATHS_EXPORT CDecayRateController {
std::size_t memoryUsage() const;

//! Get a checksum of this object.
uint64_t checksum(uint64_t seed = 0) const;
std::uint64_t checksum(std::uint64_t seed = 0) const;

private:
//! Get the count of residuals added so far.
double count() const;
using TDouble3Ary = std::array<double, 3>;

//! Get the change to apply to the decay rate multiplier.
double change(const double (&stats)[3], core_t::TTime bucketLength) const;
private:
double count() const;
double change(const TDouble3Ary& stats, core_t::TTime bucketLength) const;
bool notControlling() const;
bool increaseDecayRateErrorIncreasing(const TDouble3Ary& stats) const;
bool increaseDecayRateErrorDecreasing(const TDouble3Ary& stats) const;
bool increaseDecayRateBiased(const TDouble3Ary& stats) const;
bool decreaseDecayRateErrorNotIncreasing(const TDouble3Ary& stats) const;
bool decreaseDecayRateErrorNotDecreasing(const TDouble3Ary& stats) const;
bool decreaseDecayRateNotBiased(const TDouble3Ary& stats) const;

private:
//! The checks we perform to detect error conditions.
int m_Checks;
int m_Checks = 0;

//! The current target multiplier.
double m_Target;
double m_Target = 1.0;

//! The cumulative multiplier applied to the decay rate.
TMeanAccumulator m_Multiplier;
Expand Down
9 changes: 9 additions & 0 deletions include/maths/CPRNG.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ class MATHS_EXPORT CPRNG : private core::CNonInstantiatable {
//! Restore from a string.
bool fromString(const std::string& state);

//! Get a checksum for this object.
std::uint64_t checksum(std::uint64_t seed) const;

private:
static const result_type A;
static const result_type B;
Expand Down Expand Up @@ -184,6 +187,9 @@ class MATHS_EXPORT CPRNG : private core::CNonInstantiatable {
//! Restore from a string.
bool fromString(const std::string& state);

//! Get a checksum for this object.
std::uint64_t checksum(std::uint64_t seed) const;

private:
static const result_type JUMP[2];

Expand Down Expand Up @@ -276,6 +282,9 @@ class MATHS_EXPORT CPRNG : private core::CNonInstantiatable {
//! Restore from a string.
bool fromString(std::string state);

//! Get a checksum for this object.
std::uint64_t checksum(std::uint64_t seed) const;

private:
static const result_type A;
static const result_type JUMP[16];
Expand Down
84 changes: 67 additions & 17 deletions lib/maths/CDecayRateController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const std::string RECENT_ABS_ERROR_TAG{"d"};
const std::string HISTORICAL_ABS_ERROR_TAG{"e"};
const std::string RNG_TAG{"f"};
const std::string MULTIPLIER_TAG{"g"};
const std::string CHECKS_TAG{"h"};

//! The factor by which we'll increase the decay rate per bucket.
const double INCREASE_RATE{1.2};
Expand Down Expand Up @@ -92,16 +93,24 @@ double adjustedMaximumMultiplier(core_t::TTime bucketLength_) {
}
}

CDecayRateController::CDecayRateController() : m_Checks(0), m_Target(1.0) {
CDecayRateController::CDecayRateController() {
m_Multiplier.add(m_Target);
}

CDecayRateController::CDecayRateController(int checks, std::size_t dimension)
: m_Checks(checks), m_Target(1.0), m_PredictionMean(dimension), m_Bias(dimension),
: m_Checks{checks}, m_PredictionMean(dimension), m_Bias(dimension),
m_RecentAbsError(dimension), m_HistoricalAbsError(dimension) {
m_Multiplier.add(m_Target);
}

int CDecayRateController::checks() const {
return m_Checks;
}

void CDecayRateController::checks(int checks) {
m_Checks = checks;
}

void CDecayRateController::reset() {
m_Target = 1.0;
m_Multiplier = TMeanAccumulator();
Expand All @@ -116,6 +125,7 @@ bool CDecayRateController::acceptRestoreTraverser(core::CStateRestoreTraverser&
m_Multiplier = TMeanAccumulator();
do {
const std::string& name = traverser.name();
RESTORE_BUILT_IN(CHECKS_TAG, m_Checks)
RESTORE_BUILT_IN(TARGET_TAG, m_Target)
RESTORE(MULTIPLIER_TAG, m_Multiplier.fromDelimited(traverser.value()))
RESTORE(RNG_TAG, m_Rng.fromString(traverser.value()))
Expand All @@ -135,7 +145,8 @@ bool CDecayRateController::acceptRestoreTraverser(core::CStateRestoreTraverser&
}

void CDecayRateController::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(TARGET_TAG, m_Target);
inserter.insertValue(CHECKS_TAG, m_Checks);
inserter.insertValue(TARGET_TAG, m_Target, core::CIEEE754::E_DoublePrecision);
inserter.insertValue(MULTIPLIER_TAG, m_Multiplier.toDelimited());
inserter.insertValue(RNG_TAG, m_Rng.toString());
core::CPersistUtils::persist(PREDICTION_MEAN_TAG, m_PredictionMean, inserter);
Expand Down Expand Up @@ -201,9 +212,9 @@ double CDecayRateController::multiplier(const TDouble1Vec& prediction,
}

if (count > 0.0) {
double factors[]{std::exp(-FAST_DECAY_RATE * decayRate),
std::exp(-FAST_DECAY_RATE * decayRate),
std::exp(-SLOW_DECAY_RATE * decayRate)};
TDouble3Ary factors{std::exp(-FAST_DECAY_RATE * decayRate),
std::exp(-FAST_DECAY_RATE * decayRate),
std::exp(-SLOW_DECAY_RATE * decayRate)};
for (auto& component : m_PredictionMean) {
component.age(factors[2]);
}
Expand All @@ -222,7 +233,7 @@ double CDecayRateController::multiplier(const TDouble1Vec& prediction,
// Compute the change to apply to the target decay rate.
TMaxAccumulator change;
for (std::size_t d = 0u; d < dimension; ++d) {
double stats[3];
TDouble3Ary stats;
for (std::size_t i = 0u; i < 3; ++i) {
stats[i] = std::fabs(CBasicStatistics::mean((*stats_[i])[d]));
}
Expand Down Expand Up @@ -272,7 +283,11 @@ std::size_t CDecayRateController::memoryUsage() const {
return mem;
}

uint64_t CDecayRateController::checksum(uint64_t seed) const {
std::uint64_t CDecayRateController::checksum(std::uint64_t seed) const {
seed = CChecksum::calculate(seed, m_Checks);
seed = CChecksum::calculate(seed, m_Target);
seed = CChecksum::calculate(seed, m_Multiplier);
seed = CChecksum::calculate(seed, m_Rng);
seed = CChecksum::calculate(seed, m_PredictionMean);
seed = CChecksum::calculate(seed, m_Bias);
seed = CChecksum::calculate(seed, m_RecentAbsError);
Expand All @@ -283,20 +298,55 @@ double CDecayRateController::count() const {
return CBasicStatistics::count(m_HistoricalAbsError[0]);
}

double CDecayRateController::change(const double (&stats)[3], core_t::TTime bucketLength) const {
if (((m_Checks & E_PredictionErrorIncrease) && stats[1] > ERROR_INCREASING * stats[2]) ||
((m_Checks & E_PredictionErrorDecrease) && stats[2] > ERROR_DECREASING * stats[1]) ||
((m_Checks & E_PredictionBias) && stats[0] > BIASED * stats[1])) {
double CDecayRateController::change(const TDouble3Ary& stats, core_t::TTime bucketLength) const {
if (this->notControlling()) {
return 1.0;
}
if (this->increaseDecayRateErrorIncreasing(stats) ||
this->increaseDecayRateErrorDecreasing(stats) ||
this->increaseDecayRateBiased(stats)) {
return adjustMultiplier(INCREASE_RATE, bucketLength);
}
if ((!(m_Checks & E_PredictionErrorIncrease) ||
stats[1] < ERROR_NOT_INCREASING * stats[2]) &&
(!(m_Checks & E_PredictionErrorDecrease) ||
stats[2] < ERROR_NOT_DECREASING * stats[1]) &&
(!(m_Checks & E_PredictionBias) || stats[0] < NOT_BIASED * stats[1])) {
if (this->decreaseDecayRateErrorNotIncreasing(stats) &&
this->decreaseDecayRateErrorNotDecreasing(stats) &&
this->decreaseDecayRateNotBiased(stats)) {
return adjustMultiplier(DECREASE_RATE, bucketLength);
}
return 1.0;
}

bool CDecayRateController::notControlling() const {
return (m_Checks & E_PredictionErrorIncrease) == false &&
(m_Checks & E_PredictionErrorDecrease) == false &&
(m_Checks & E_PredictionBias) == false;
}

bool CDecayRateController::increaseDecayRateErrorIncreasing(const TDouble3Ary& stats) const {
return (m_Checks & E_PredictionErrorIncrease) &&
stats[1] > ERROR_INCREASING * stats[2];
}

bool CDecayRateController::increaseDecayRateErrorDecreasing(const TDouble3Ary& stats) const {
return (m_Checks & E_PredictionErrorDecrease) &&
stats[2] > ERROR_DECREASING * stats[1];
}

bool CDecayRateController::increaseDecayRateBiased(const TDouble3Ary& stats) const {
return (m_Checks & E_PredictionBias) && stats[0] > BIASED * stats[1];
}

bool CDecayRateController::decreaseDecayRateErrorNotIncreasing(const TDouble3Ary& stats) const {
return (m_Checks & E_PredictionErrorIncrease) == false ||
stats[1] < ERROR_NOT_INCREASING * stats[2];
}

bool CDecayRateController::decreaseDecayRateErrorNotDecreasing(const TDouble3Ary& stats) const {
return (m_Checks & E_PredictionErrorDecrease) == false ||
stats[2] < ERROR_NOT_DECREASING * stats[1];
}

bool CDecayRateController::decreaseDecayRateNotBiased(const TDouble3Ary& stats) const {
return (m_Checks & E_PredictionBias) == false || stats[0] < NOT_BIASED * stats[1];
}
}
}
14 changes: 14 additions & 0 deletions lib/maths/CPRNG.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <core/CPersistUtils.h>
#include <core/CStringUtils.h>

#include <maths/CChecksum.h>

#include <algorithm>

namespace ml {
Expand Down Expand Up @@ -71,6 +73,10 @@ bool CPRNG::CSplitMix64::fromString(const std::string& state) {
return core::CStringUtils::stringToType(state, m_X);
}

std::uint64_t CPRNG::CSplitMix64::checksum(std::uint64_t seed) const {
return CChecksum::calculate(seed, m_X);
}

const CPRNG::CSplitMix64::result_type CPRNG::CSplitMix64::A(0x9E3779B97F4A7C15);
const CPRNG::CSplitMix64::result_type CPRNG::CSplitMix64::B(0xBF58476D1CE4E5B9);
const CPRNG::CSplitMix64::result_type CPRNG::CSplitMix64::C(0x94D049BB133111EB);
Expand Down Expand Up @@ -136,6 +142,10 @@ bool CPRNG::CXorOShiro128Plus::fromString(const std::string& state) {
return core::CPersistUtils::fromString(state, &m_X[0], &m_X[2]);
}

std::uint64_t CPRNG::CXorOShiro128Plus::checksum(std::uint64_t seed) const {
return CChecksum::calculate(seed, m_X);
}

const CPRNG::CXorOShiro128Plus::result_type CPRNG::CXorOShiro128Plus::JUMP[] = {
0xbeac0467eba5facb, 0xd86b048b86aa9922};

Expand Down Expand Up @@ -213,6 +223,10 @@ bool CPRNG::CXorShift1024Mult::fromString(std::string state) {
return core::CPersistUtils::fromString(state, &m_X[0], &m_X[16]);
}

std::uint64_t CPRNG::CXorShift1024Mult::checksum(std::uint64_t seed) const {
return CChecksum::calculate(seed, m_X);
}

const CPRNG::CXorShift1024Mult::result_type CPRNG::CXorShift1024Mult::A(1181783497276652981);
const CPRNG::CXorShift1024Mult::result_type CPRNG::CXorShift1024Mult::JUMP[16] = {
0x84242f96eca9c41d, 0xa3c65b8776f96855, 0x5b34a39f070b5837,
Expand Down
37 changes: 33 additions & 4 deletions lib/maths/CTimeSeriesModel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ double aggregateFeatureProbabilities(const TDouble4Vec& probabilities, double co
const std::string VERSION_6_3_TAG("6.3");
const std::string VERSION_6_5_TAG("6.5");
const std::string VERSION_7_3_TAG("7.3");
const std::string VERSION_7_11_TAG("7.11");

// Models
// Version >= 6.3
Expand Down Expand Up @@ -1291,7 +1292,9 @@ std::size_t CUnivariateTimeSeriesModel::memoryUsage() const {

bool CUnivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestoreParams& params,
core::CStateRestoreTraverser& traverser) {
if (traverser.name() == VERSION_6_3_TAG) {
bool stateMissingControllerChecks{false};
if (traverser.name() == VERSION_6_3_TAG || traverser.name() == VERSION_7_11_TAG) {
stateMissingControllerChecks = (traverser.name() == VERSION_6_3_TAG);
while (traverser.next()) {
const std::string& name{traverser.name()};
RESTORE_BUILT_IN(ID_6_3_TAG, m_Id)
Expand Down Expand Up @@ -1329,6 +1332,7 @@ bool CUnivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestoreParam
}
} else {
// There is no version string this is historic state.
stateMissingControllerChecks = true;
do {
const std::string& name{traverser.name()};
RESTORE_BUILT_IN(ID_OLD_TAG, m_Id)
Expand Down Expand Up @@ -1356,6 +1360,17 @@ bool CUnivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestoreParam
/**/)
} while (traverser.next());
}

if (m_Controllers != nullptr && stateMissingControllerChecks) {
(*m_Controllers)[E_TrendControl].checks(CDecayRateController::E_PredictionBias |
CDecayRateController::E_PredictionErrorIncrease);
}
if (m_Controllers != nullptr && stateMissingControllerChecks) {
(*m_Controllers)[E_ResidualControl].checks(
CDecayRateController::E_PredictionBias | CDecayRateController::E_PredictionErrorIncrease |
maths::CDecayRateController::E_PredictionErrorDecrease);
}

return true;
}

Expand All @@ -1378,7 +1393,7 @@ void CUnivariateTimeSeriesModel::acceptPersistInserter(core::CStatePersistInsert

// Note that we don't persist this->params() or the correlations
// because that state is reinitialized.
inserter.insertValue(VERSION_6_3_TAG, "");
inserter.insertValue(VERSION_7_11_TAG, "");
inserter.insertValue(ID_6_3_TAG, m_Id);
inserter.insertValue(IS_NON_NEGATIVE_6_3_TAG, static_cast<int>(m_IsNonNegative));
inserter.insertValue(IS_FORECASTABLE_6_3_TAG, static_cast<int>(m_IsForecastable));
Expand Down Expand Up @@ -2691,7 +2706,9 @@ std::size_t CMultivariateTimeSeriesModel::memoryUsage() const {

bool CMultivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestoreParams& params,
core::CStateRestoreTraverser& traverser) {
if (traverser.name() == VERSION_6_3_TAG) {
bool stateMissingControllerChecks{false};
if (traverser.name() == VERSION_6_3_TAG || traverser.name() == VERSION_7_11_TAG) {
stateMissingControllerChecks = (traverser.name() == VERSION_6_3_TAG);
while (traverser.next()) {
const std::string& name{traverser.name()};
RESTORE_BOOL(IS_NON_NEGATIVE_6_3_TAG, m_IsNonNegative)
Expand Down Expand Up @@ -2727,6 +2744,7 @@ bool CMultivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestorePar
/**/)
}
} else {
stateMissingControllerChecks = true;
do {
const std::string& name{traverser.name()};
RESTORE_BOOL(IS_NON_NEGATIVE_OLD_TAG, m_IsNonNegative)
Expand Down Expand Up @@ -2754,13 +2772,24 @@ bool CMultivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestorePar
/**/)
} while (traverser.next());
}

if (m_Controllers != nullptr && stateMissingControllerChecks) {
(*m_Controllers)[E_TrendControl].checks(CDecayRateController::E_PredictionBias |
CDecayRateController::E_PredictionErrorIncrease);
}
if (m_Controllers != nullptr && stateMissingControllerChecks) {
(*m_Controllers)[E_ResidualControl].checks(
CDecayRateController::E_PredictionBias | CDecayRateController::E_PredictionErrorIncrease |
maths::CDecayRateController::E_PredictionErrorDecrease);
}

return true;
}

void CMultivariateTimeSeriesModel::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
// Note that we don't persist this->params() because that state
// is reinitialized.
inserter.insertValue(VERSION_6_3_TAG, "");
inserter.insertValue(VERSION_7_11_TAG, "");
inserter.insertValue(IS_NON_NEGATIVE_6_3_TAG, static_cast<int>(m_IsNonNegative));
if (m_Controllers) {
core::CPersistUtils::persist(CONTROLLER_6_3_TAG, *m_Controllers, inserter);
Expand Down
Loading