Skip to content

Commit

Permalink
[ML] Fix bug persisting and restoring time series model decay rate co…
Browse files Browse the repository at this point in the history
…ntroller (#1672)

We were failing to persist and restore the checks we perform when controlling decay rate for time series models. The 
upshot is that we would disable decay rate control after a round of persist and restore. This could happen on failover or 
if the job is closed and restarted or after revert to snapshot.

I have also added a test that the behaviour is preserved after persist and restore.
  • Loading branch information
tveasey authored Jan 18, 2021
1 parent a58ad4c commit 4051fe5
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 38 deletions.
3 changes: 3 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@

* Fix potential cause for log errors from CXMeansOnline1d. (See {ml-pull}1586[#1586].)
* Fix scaling of some hyperparameter for Bayesian optimization. (See {ml-pull}1612[#1612].)
* Fix missing state in persist and restore for anomaly detection. This caused suboptimal
modelling after a job was closed and reopened or failed over to a different node.
(See {ml-pull}1668[#1668].)

== {es} version 7.10.1

Expand Down
30 changes: 22 additions & 8 deletions include/maths/CDecayRateController.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
#include <maths/CPRNG.h>
#include <maths/ImportExport.h>

#include <stdint.h>
#include <array>
#include <cstdint>

namespace ml {
namespace core {
Expand Down Expand Up @@ -60,6 +61,12 @@ class MATHS_EXPORT CDecayRateController {
CDecayRateController();
CDecayRateController(int checks, std::size_t dimension);

//! Get the checks which this controller is performing.
int checks() const;

//! Set the checks which this controller is performing.
void checks(int checks);

//! Reset the errors.
void reset();

Expand Down Expand Up @@ -90,21 +97,28 @@ class MATHS_EXPORT CDecayRateController {
std::size_t memoryUsage() const;

//! Get a checksum of this object.
uint64_t checksum(uint64_t seed = 0) const;
std::uint64_t checksum(std::uint64_t seed = 0) const;

private:
//! Get the count of residuals added so far.
double count() const;
using TDouble3Ary = std::array<double, 3>;

//! Get the change to apply to the decay rate multiplier.
double change(const double (&stats)[3], core_t::TTime bucketLength) const;
private:
double count() const;
double change(const TDouble3Ary& stats, core_t::TTime bucketLength) const;
bool notControlling() const;
bool increaseDecayRateErrorIncreasing(const TDouble3Ary& stats) const;
bool increaseDecayRateErrorDecreasing(const TDouble3Ary& stats) const;
bool increaseDecayRateBiased(const TDouble3Ary& stats) const;
bool decreaseDecayRateErrorNotIncreasing(const TDouble3Ary& stats) const;
bool decreaseDecayRateErrorNotDecreasing(const TDouble3Ary& stats) const;
bool decreaseDecayRateNotBiased(const TDouble3Ary& stats) const;

private:
//! The checks we perform to detect error conditions.
int m_Checks;
int m_Checks = 0;

//! The current target multiplier.
double m_Target;
double m_Target = 1.0;

//! The cumulative multiplier applied to the decay rate.
TMeanAccumulator m_Multiplier;
Expand Down
9 changes: 9 additions & 0 deletions include/maths/CPRNG.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ class MATHS_EXPORT CPRNG : private core::CNonInstantiatable {
//! Restore from a string.
bool fromString(const std::string& state);

//! Get a checksum for this object.
std::uint64_t checksum(std::uint64_t seed) const;

private:
static const result_type A;
static const result_type B;
Expand Down Expand Up @@ -184,6 +187,9 @@ class MATHS_EXPORT CPRNG : private core::CNonInstantiatable {
//! Restore from a string.
bool fromString(const std::string& state);

//! Get a checksum for this object.
std::uint64_t checksum(std::uint64_t seed) const;

private:
static const result_type JUMP[2];

Expand Down Expand Up @@ -276,6 +282,9 @@ class MATHS_EXPORT CPRNG : private core::CNonInstantiatable {
//! Restore from a string.
bool fromString(std::string state);

//! Get a checksum for this object.
std::uint64_t checksum(std::uint64_t seed) const;

private:
static const result_type A;
static const result_type JUMP[16];
Expand Down
84 changes: 67 additions & 17 deletions lib/maths/CDecayRateController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const std::string RECENT_ABS_ERROR_TAG{"d"};
const std::string HISTORICAL_ABS_ERROR_TAG{"e"};
const std::string RNG_TAG{"f"};
const std::string MULTIPLIER_TAG{"g"};
const std::string CHECKS_TAG{"h"};

//! The factor by which we'll increase the decay rate per bucket.
const double INCREASE_RATE{1.2};
Expand Down Expand Up @@ -92,16 +93,24 @@ double adjustedMaximumMultiplier(core_t::TTime bucketLength_) {
}
}

CDecayRateController::CDecayRateController() : m_Checks(0), m_Target(1.0) {
CDecayRateController::CDecayRateController() {
m_Multiplier.add(m_Target);
}

CDecayRateController::CDecayRateController(int checks, std::size_t dimension)
: m_Checks(checks), m_Target(1.0), m_PredictionMean(dimension), m_Bias(dimension),
: m_Checks{checks}, m_PredictionMean(dimension), m_Bias(dimension),
m_RecentAbsError(dimension), m_HistoricalAbsError(dimension) {
m_Multiplier.add(m_Target);
}

int CDecayRateController::checks() const {
return m_Checks;
}

void CDecayRateController::checks(int checks) {
m_Checks = checks;
}

void CDecayRateController::reset() {
m_Target = 1.0;
m_Multiplier = TMeanAccumulator();
Expand All @@ -116,6 +125,7 @@ bool CDecayRateController::acceptRestoreTraverser(core::CStateRestoreTraverser&
m_Multiplier = TMeanAccumulator();
do {
const std::string& name = traverser.name();
RESTORE_BUILT_IN(CHECKS_TAG, m_Checks)
RESTORE_BUILT_IN(TARGET_TAG, m_Target)
RESTORE(MULTIPLIER_TAG, m_Multiplier.fromDelimited(traverser.value()))
RESTORE(RNG_TAG, m_Rng.fromString(traverser.value()))
Expand All @@ -135,7 +145,8 @@ bool CDecayRateController::acceptRestoreTraverser(core::CStateRestoreTraverser&
}

void CDecayRateController::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(TARGET_TAG, m_Target);
inserter.insertValue(CHECKS_TAG, m_Checks);
inserter.insertValue(TARGET_TAG, m_Target, core::CIEEE754::E_DoublePrecision);
inserter.insertValue(MULTIPLIER_TAG, m_Multiplier.toDelimited());
inserter.insertValue(RNG_TAG, m_Rng.toString());
core::CPersistUtils::persist(PREDICTION_MEAN_TAG, m_PredictionMean, inserter);
Expand Down Expand Up @@ -201,9 +212,9 @@ double CDecayRateController::multiplier(const TDouble1Vec& prediction,
}

if (count > 0.0) {
double factors[]{std::exp(-FAST_DECAY_RATE * decayRate),
std::exp(-FAST_DECAY_RATE * decayRate),
std::exp(-SLOW_DECAY_RATE * decayRate)};
TDouble3Ary factors{std::exp(-FAST_DECAY_RATE * decayRate),
std::exp(-FAST_DECAY_RATE * decayRate),
std::exp(-SLOW_DECAY_RATE * decayRate)};
for (auto& component : m_PredictionMean) {
component.age(factors[2]);
}
Expand All @@ -222,7 +233,7 @@ double CDecayRateController::multiplier(const TDouble1Vec& prediction,
// Compute the change to apply to the target decay rate.
TMaxAccumulator change;
for (std::size_t d = 0u; d < dimension; ++d) {
double stats[3];
TDouble3Ary stats;
for (std::size_t i = 0u; i < 3; ++i) {
stats[i] = std::fabs(CBasicStatistics::mean((*stats_[i])[d]));
}
Expand Down Expand Up @@ -272,7 +283,11 @@ std::size_t CDecayRateController::memoryUsage() const {
return mem;
}

uint64_t CDecayRateController::checksum(uint64_t seed) const {
std::uint64_t CDecayRateController::checksum(std::uint64_t seed) const {
seed = CChecksum::calculate(seed, m_Checks);
seed = CChecksum::calculate(seed, m_Target);
seed = CChecksum::calculate(seed, m_Multiplier);
seed = CChecksum::calculate(seed, m_Rng);
seed = CChecksum::calculate(seed, m_PredictionMean);
seed = CChecksum::calculate(seed, m_Bias);
seed = CChecksum::calculate(seed, m_RecentAbsError);
Expand All @@ -283,20 +298,55 @@ double CDecayRateController::count() const {
return CBasicStatistics::count(m_HistoricalAbsError[0]);
}

double CDecayRateController::change(const double (&stats)[3], core_t::TTime bucketLength) const {
if (((m_Checks & E_PredictionErrorIncrease) && stats[1] > ERROR_INCREASING * stats[2]) ||
((m_Checks & E_PredictionErrorDecrease) && stats[2] > ERROR_DECREASING * stats[1]) ||
((m_Checks & E_PredictionBias) && stats[0] > BIASED * stats[1])) {
double CDecayRateController::change(const TDouble3Ary& stats, core_t::TTime bucketLength) const {
if (this->notControlling()) {
return 1.0;
}
if (this->increaseDecayRateErrorIncreasing(stats) ||
this->increaseDecayRateErrorDecreasing(stats) ||
this->increaseDecayRateBiased(stats)) {
return adjustMultiplier(INCREASE_RATE, bucketLength);
}
if ((!(m_Checks & E_PredictionErrorIncrease) ||
stats[1] < ERROR_NOT_INCREASING * stats[2]) &&
(!(m_Checks & E_PredictionErrorDecrease) ||
stats[2] < ERROR_NOT_DECREASING * stats[1]) &&
(!(m_Checks & E_PredictionBias) || stats[0] < NOT_BIASED * stats[1])) {
if (this->decreaseDecayRateErrorNotIncreasing(stats) &&
this->decreaseDecayRateErrorNotDecreasing(stats) &&
this->decreaseDecayRateNotBiased(stats)) {
return adjustMultiplier(DECREASE_RATE, bucketLength);
}
return 1.0;
}

bool CDecayRateController::notControlling() const {
return (m_Checks & E_PredictionErrorIncrease) == false &&
(m_Checks & E_PredictionErrorDecrease) == false &&
(m_Checks & E_PredictionBias) == false;
}

bool CDecayRateController::increaseDecayRateErrorIncreasing(const TDouble3Ary& stats) const {
return (m_Checks & E_PredictionErrorIncrease) &&
stats[1] > ERROR_INCREASING * stats[2];
}

bool CDecayRateController::increaseDecayRateErrorDecreasing(const TDouble3Ary& stats) const {
return (m_Checks & E_PredictionErrorDecrease) &&
stats[2] > ERROR_DECREASING * stats[1];
}

bool CDecayRateController::increaseDecayRateBiased(const TDouble3Ary& stats) const {
return (m_Checks & E_PredictionBias) && stats[0] > BIASED * stats[1];
}

bool CDecayRateController::decreaseDecayRateErrorNotIncreasing(const TDouble3Ary& stats) const {
return (m_Checks & E_PredictionErrorIncrease) == false ||
stats[1] < ERROR_NOT_INCREASING * stats[2];
}

bool CDecayRateController::decreaseDecayRateErrorNotDecreasing(const TDouble3Ary& stats) const {
return (m_Checks & E_PredictionErrorDecrease) == false ||
stats[2] < ERROR_NOT_DECREASING * stats[1];
}

bool CDecayRateController::decreaseDecayRateNotBiased(const TDouble3Ary& stats) const {
return (m_Checks & E_PredictionBias) == false || stats[0] < NOT_BIASED * stats[1];
}
}
}
14 changes: 14 additions & 0 deletions lib/maths/CPRNG.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <core/CPersistUtils.h>
#include <core/CStringUtils.h>

#include <maths/CChecksum.h>

#include <algorithm>

namespace ml {
Expand Down Expand Up @@ -71,6 +73,10 @@ bool CPRNG::CSplitMix64::fromString(const std::string& state) {
return core::CStringUtils::stringToType(state, m_X);
}

std::uint64_t CPRNG::CSplitMix64::checksum(std::uint64_t seed) const {
return CChecksum::calculate(seed, m_X);
}

const CPRNG::CSplitMix64::result_type CPRNG::CSplitMix64::A(0x9E3779B97F4A7C15);
const CPRNG::CSplitMix64::result_type CPRNG::CSplitMix64::B(0xBF58476D1CE4E5B9);
const CPRNG::CSplitMix64::result_type CPRNG::CSplitMix64::C(0x94D049BB133111EB);
Expand Down Expand Up @@ -136,6 +142,10 @@ bool CPRNG::CXorOShiro128Plus::fromString(const std::string& state) {
return core::CPersistUtils::fromString(state, &m_X[0], &m_X[2]);
}

std::uint64_t CPRNG::CXorOShiro128Plus::checksum(std::uint64_t seed) const {
return CChecksum::calculate(seed, m_X);
}

const CPRNG::CXorOShiro128Plus::result_type CPRNG::CXorOShiro128Plus::JUMP[] = {
0xbeac0467eba5facb, 0xd86b048b86aa9922};

Expand Down Expand Up @@ -213,6 +223,10 @@ bool CPRNG::CXorShift1024Mult::fromString(std::string state) {
return core::CPersistUtils::fromString(state, &m_X[0], &m_X[16]);
}

std::uint64_t CPRNG::CXorShift1024Mult::checksum(std::uint64_t seed) const {
return CChecksum::calculate(seed, m_X);
}

const CPRNG::CXorShift1024Mult::result_type CPRNG::CXorShift1024Mult::A(1181783497276652981);
const CPRNG::CXorShift1024Mult::result_type CPRNG::CXorShift1024Mult::JUMP[16] = {
0x84242f96eca9c41d, 0xa3c65b8776f96855, 0x5b34a39f070b5837,
Expand Down
37 changes: 33 additions & 4 deletions lib/maths/CTimeSeriesModel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ double aggregateFeatureProbabilities(const TDouble4Vec& probabilities, double co
const std::string VERSION_6_3_TAG("6.3");
const std::string VERSION_6_5_TAG("6.5");
const std::string VERSION_7_3_TAG("7.3");
const std::string VERSION_7_11_TAG("7.11");

// Models
// Version >= 6.3
Expand Down Expand Up @@ -1291,7 +1292,9 @@ std::size_t CUnivariateTimeSeriesModel::memoryUsage() const {

bool CUnivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestoreParams& params,
core::CStateRestoreTraverser& traverser) {
if (traverser.name() == VERSION_6_3_TAG) {
bool stateMissingControllerChecks{false};
if (traverser.name() == VERSION_6_3_TAG || traverser.name() == VERSION_7_11_TAG) {
stateMissingControllerChecks = (traverser.name() == VERSION_6_3_TAG);
while (traverser.next()) {
const std::string& name{traverser.name()};
RESTORE_BUILT_IN(ID_6_3_TAG, m_Id)
Expand Down Expand Up @@ -1329,6 +1332,7 @@ bool CUnivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestoreParam
}
} else {
// There is no version string this is historic state.
stateMissingControllerChecks = true;
do {
const std::string& name{traverser.name()};
RESTORE_BUILT_IN(ID_OLD_TAG, m_Id)
Expand Down Expand Up @@ -1356,6 +1360,17 @@ bool CUnivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestoreParam
/**/)
} while (traverser.next());
}

if (m_Controllers != nullptr && stateMissingControllerChecks) {
(*m_Controllers)[E_TrendControl].checks(CDecayRateController::E_PredictionBias |
CDecayRateController::E_PredictionErrorIncrease);
}
if (m_Controllers != nullptr && stateMissingControllerChecks) {
(*m_Controllers)[E_ResidualControl].checks(
CDecayRateController::E_PredictionBias | CDecayRateController::E_PredictionErrorIncrease |
maths::CDecayRateController::E_PredictionErrorDecrease);
}

return true;
}

Expand All @@ -1378,7 +1393,7 @@ void CUnivariateTimeSeriesModel::acceptPersistInserter(core::CStatePersistInsert

// Note that we don't persist this->params() or the correlations
// because that state is reinitialized.
inserter.insertValue(VERSION_6_3_TAG, "");
inserter.insertValue(VERSION_7_11_TAG, "");
inserter.insertValue(ID_6_3_TAG, m_Id);
inserter.insertValue(IS_NON_NEGATIVE_6_3_TAG, static_cast<int>(m_IsNonNegative));
inserter.insertValue(IS_FORECASTABLE_6_3_TAG, static_cast<int>(m_IsForecastable));
Expand Down Expand Up @@ -2691,7 +2706,9 @@ std::size_t CMultivariateTimeSeriesModel::memoryUsage() const {

bool CMultivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestoreParams& params,
core::CStateRestoreTraverser& traverser) {
if (traverser.name() == VERSION_6_3_TAG) {
bool stateMissingControllerChecks{false};
if (traverser.name() == VERSION_6_3_TAG || traverser.name() == VERSION_7_11_TAG) {
stateMissingControllerChecks = (traverser.name() == VERSION_6_3_TAG);
while (traverser.next()) {
const std::string& name{traverser.name()};
RESTORE_BOOL(IS_NON_NEGATIVE_6_3_TAG, m_IsNonNegative)
Expand Down Expand Up @@ -2727,6 +2744,7 @@ bool CMultivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestorePar
/**/)
}
} else {
stateMissingControllerChecks = true;
do {
const std::string& name{traverser.name()};
RESTORE_BOOL(IS_NON_NEGATIVE_OLD_TAG, m_IsNonNegative)
Expand Down Expand Up @@ -2754,13 +2772,24 @@ bool CMultivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestorePar
/**/)
} while (traverser.next());
}

if (m_Controllers != nullptr && stateMissingControllerChecks) {
(*m_Controllers)[E_TrendControl].checks(CDecayRateController::E_PredictionBias |
CDecayRateController::E_PredictionErrorIncrease);
}
if (m_Controllers != nullptr && stateMissingControllerChecks) {
(*m_Controllers)[E_ResidualControl].checks(
CDecayRateController::E_PredictionBias | CDecayRateController::E_PredictionErrorIncrease |
maths::CDecayRateController::E_PredictionErrorDecrease);
}

return true;
}

void CMultivariateTimeSeriesModel::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
// Note that we don't persist this->params() because that state
// is reinitialized.
inserter.insertValue(VERSION_6_3_TAG, "");
inserter.insertValue(VERSION_7_11_TAG, "");
inserter.insertValue(IS_NON_NEGATIVE_6_3_TAG, static_cast<int>(m_IsNonNegative));
if (m_Controllers) {
core::CPersistUtils::persist(CONTROLLER_6_3_TAG, *m_Controllers, inserter);
Expand Down
Loading

0 comments on commit 4051fe5

Please sign in to comment.