From d6f365a9c76086a6eb380d6a80cd933d89f0ffd0 Mon Sep 17 00:00:00 2001 From: Tom Veasey Date: Fri, 15 Jan 2021 15:23:01 +0000 Subject: [PATCH] [ML] Fix bug persisting and restoring time series model decay rate controller (#1668) We were failing to persist and restore the checks we perform when controlling decay rate for time series models. The upshot is that we would disable decay rate control after a round of persist and restore. This could happen on failover or if the job is closed and restarted or after revert to snapshot. I have however also added a test that the behaviour is preserved after persist and restore. --- docs/CHANGELOG.asciidoc | 3 + include/maths/CDecayRateController.h | 30 +++++-- include/maths/CPRNG.h | 9 ++ lib/maths/CDecayRateController.cc | 84 +++++++++++++++---- lib/maths/CPRNG.cc | 14 ++++ lib/maths/CTimeSeriesModel.cc | 37 +++++++- .../unittest/CDecayRateControllerTest.cc | 60 +++++++++++-- 7 files changed, 199 insertions(+), 38 deletions(-) diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index ba96df8ba2..30b032130a 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -62,6 +62,9 @@ * Fix potential cause for log errors from CXMeansOnline1d. (See {ml-pull}1586[#1586].) * Fix scaling of some hyperparameter for Bayesian optimization. (See {ml-pull}1612[#1612].) +* Fix missing state in persist and restore for anomaly detection. This caused suboptimal + modelling after a job was closed and reopened or failed over to a different node. + (See {ml-pull}1668[#1668].) == {es} version 7.10.1 diff --git a/include/maths/CDecayRateController.h b/include/maths/CDecayRateController.h index 623f79adc4..1d44139bcc 100644 --- a/include/maths/CDecayRateController.h +++ b/include/maths/CDecayRateController.h @@ -14,7 +14,8 @@ #include #include -#include +#include +#include namespace ml { namespace core { @@ -60,6 +61,12 @@ class MATHS_EXPORT CDecayRateController { CDecayRateController(); CDecayRateController(int checks, std::size_t dimension); + //! Get the checks which this controller is performing. + int checks() const; + + //! Set the checks which this controller is performing. + void checks(int checks); + //! Reset the errors. void reset(); @@ -90,21 +97,28 @@ class MATHS_EXPORT CDecayRateController { std::size_t memoryUsage() const; //! Get a checksum of this object. - uint64_t checksum(uint64_t seed = 0) const; + std::uint64_t checksum(std::uint64_t seed = 0) const; private: - //! Get the count of residuals added so far. - double count() const; + using TDouble3Ary = std::array; - //! Get the change to apply to the decay rate multiplier. - double change(const double (&stats)[3], core_t::TTime bucketLength) const; +private: + double count() const; + double change(const TDouble3Ary& stats, core_t::TTime bucketLength) const; + bool notControlling() const; + bool increaseDecayRateErrorIncreasing(const TDouble3Ary& stats) const; + bool increaseDecayRateErrorDecreasing(const TDouble3Ary& stats) const; + bool increaseDecayRateBiased(const TDouble3Ary& stats) const; + bool decreaseDecayRateErrorNotIncreasing(const TDouble3Ary& stats) const; + bool decreaseDecayRateErrorNotDecreasing(const TDouble3Ary& stats) const; + bool decreaseDecayRateNotBiased(const TDouble3Ary& stats) const; private: //! The checks we perform to detect error conditions. - int m_Checks; + int m_Checks = 0; //! The current target multiplier. - double m_Target; + double m_Target = 1.0; //! The cumulative multiplier applied to the decay rate. TMeanAccumulator m_Multiplier; diff --git a/include/maths/CPRNG.h b/include/maths/CPRNG.h index a194338ce7..da4fbb4278 100644 --- a/include/maths/CPRNG.h +++ b/include/maths/CPRNG.h @@ -93,6 +93,9 @@ class MATHS_EXPORT CPRNG : private core::CNonInstantiatable { //! Restore from a string. bool fromString(const std::string& state); + //! Get a checksum for this object. + std::uint64_t checksum(std::uint64_t seed) const; + private: static const result_type A; static const result_type B; @@ -184,6 +187,9 @@ class MATHS_EXPORT CPRNG : private core::CNonInstantiatable { //! Restore from a string. bool fromString(const std::string& state); + //! Get a checksum for this object. + std::uint64_t checksum(std::uint64_t seed) const; + private: static const result_type JUMP[2]; @@ -276,6 +282,9 @@ class MATHS_EXPORT CPRNG : private core::CNonInstantiatable { //! Restore from a string. bool fromString(std::string state); + //! Get a checksum for this object. + std::uint64_t checksum(std::uint64_t seed) const; + private: static const result_type A; static const result_type JUMP[16]; diff --git a/lib/maths/CDecayRateController.cc b/lib/maths/CDecayRateController.cc index 689f34cded..5a6c1a8bec 100644 --- a/lib/maths/CDecayRateController.cc +++ b/lib/maths/CDecayRateController.cc @@ -31,6 +31,7 @@ const std::string RECENT_ABS_ERROR_TAG{"d"}; const std::string HISTORICAL_ABS_ERROR_TAG{"e"}; const std::string RNG_TAG{"f"}; const std::string MULTIPLIER_TAG{"g"}; +const std::string CHECKS_TAG{"h"}; //! The factor by which we'll increase the decay rate per bucket. const double INCREASE_RATE{1.2}; @@ -92,16 +93,24 @@ double adjustedMaximumMultiplier(core_t::TTime bucketLength_) { } } -CDecayRateController::CDecayRateController() : m_Checks(0), m_Target(1.0) { +CDecayRateController::CDecayRateController() { m_Multiplier.add(m_Target); } CDecayRateController::CDecayRateController(int checks, std::size_t dimension) - : m_Checks(checks), m_Target(1.0), m_PredictionMean(dimension), m_Bias(dimension), + : m_Checks{checks}, m_PredictionMean(dimension), m_Bias(dimension), m_RecentAbsError(dimension), m_HistoricalAbsError(dimension) { m_Multiplier.add(m_Target); } +int CDecayRateController::checks() const { + return m_Checks; +} + +void CDecayRateController::checks(int checks) { + m_Checks = checks; +} + void CDecayRateController::reset() { m_Target = 1.0; m_Multiplier = TMeanAccumulator(); @@ -116,6 +125,7 @@ bool CDecayRateController::acceptRestoreTraverser(core::CStateRestoreTraverser& m_Multiplier = TMeanAccumulator(); do { const std::string& name = traverser.name(); + RESTORE_BUILT_IN(CHECKS_TAG, m_Checks) RESTORE_BUILT_IN(TARGET_TAG, m_Target) RESTORE(MULTIPLIER_TAG, m_Multiplier.fromDelimited(traverser.value())) RESTORE(RNG_TAG, m_Rng.fromString(traverser.value())) @@ -135,7 +145,8 @@ bool CDecayRateController::acceptRestoreTraverser(core::CStateRestoreTraverser& } void CDecayRateController::acceptPersistInserter(core::CStatePersistInserter& inserter) const { - inserter.insertValue(TARGET_TAG, m_Target); + inserter.insertValue(CHECKS_TAG, m_Checks); + inserter.insertValue(TARGET_TAG, m_Target, core::CIEEE754::E_DoublePrecision); inserter.insertValue(MULTIPLIER_TAG, m_Multiplier.toDelimited()); inserter.insertValue(RNG_TAG, m_Rng.toString()); core::CPersistUtils::persist(PREDICTION_MEAN_TAG, m_PredictionMean, inserter); @@ -201,9 +212,9 @@ double CDecayRateController::multiplier(const TDouble1Vec& prediction, } if (count > 0.0) { - double factors[]{std::exp(-FAST_DECAY_RATE * decayRate), - std::exp(-FAST_DECAY_RATE * decayRate), - std::exp(-SLOW_DECAY_RATE * decayRate)}; + TDouble3Ary factors{std::exp(-FAST_DECAY_RATE * decayRate), + std::exp(-FAST_DECAY_RATE * decayRate), + std::exp(-SLOW_DECAY_RATE * decayRate)}; for (auto& component : m_PredictionMean) { component.age(factors[2]); } @@ -222,7 +233,7 @@ double CDecayRateController::multiplier(const TDouble1Vec& prediction, // Compute the change to apply to the target decay rate. TMaxAccumulator change; for (std::size_t d = 0u; d < dimension; ++d) { - double stats[3]; + TDouble3Ary stats; for (std::size_t i = 0u; i < 3; ++i) { stats[i] = std::fabs(CBasicStatistics::mean((*stats_[i])[d])); } @@ -272,7 +283,11 @@ std::size_t CDecayRateController::memoryUsage() const { return mem; } -uint64_t CDecayRateController::checksum(uint64_t seed) const { +std::uint64_t CDecayRateController::checksum(std::uint64_t seed) const { + seed = CChecksum::calculate(seed, m_Checks); + seed = CChecksum::calculate(seed, m_Target); + seed = CChecksum::calculate(seed, m_Multiplier); + seed = CChecksum::calculate(seed, m_Rng); seed = CChecksum::calculate(seed, m_PredictionMean); seed = CChecksum::calculate(seed, m_Bias); seed = CChecksum::calculate(seed, m_RecentAbsError); @@ -283,20 +298,55 @@ double CDecayRateController::count() const { return CBasicStatistics::count(m_HistoricalAbsError[0]); } -double CDecayRateController::change(const double (&stats)[3], core_t::TTime bucketLength) const { - if (((m_Checks & E_PredictionErrorIncrease) && stats[1] > ERROR_INCREASING * stats[2]) || - ((m_Checks & E_PredictionErrorDecrease) && stats[2] > ERROR_DECREASING * stats[1]) || - ((m_Checks & E_PredictionBias) && stats[0] > BIASED * stats[1])) { +double CDecayRateController::change(const TDouble3Ary& stats, core_t::TTime bucketLength) const { + if (this->notControlling()) { + return 1.0; + } + if (this->increaseDecayRateErrorIncreasing(stats) || + this->increaseDecayRateErrorDecreasing(stats) || + this->increaseDecayRateBiased(stats)) { return adjustMultiplier(INCREASE_RATE, bucketLength); } - if ((!(m_Checks & E_PredictionErrorIncrease) || - stats[1] < ERROR_NOT_INCREASING * stats[2]) && - (!(m_Checks & E_PredictionErrorDecrease) || - stats[2] < ERROR_NOT_DECREASING * stats[1]) && - (!(m_Checks & E_PredictionBias) || stats[0] < NOT_BIASED * stats[1])) { + if (this->decreaseDecayRateErrorNotIncreasing(stats) && + this->decreaseDecayRateErrorNotDecreasing(stats) && + this->decreaseDecayRateNotBiased(stats)) { return adjustMultiplier(DECREASE_RATE, bucketLength); } return 1.0; } + +bool CDecayRateController::notControlling() const { + return (m_Checks & E_PredictionErrorIncrease) == false && + (m_Checks & E_PredictionErrorDecrease) == false && + (m_Checks & E_PredictionBias) == false; +} + +bool CDecayRateController::increaseDecayRateErrorIncreasing(const TDouble3Ary& stats) const { + return (m_Checks & E_PredictionErrorIncrease) && + stats[1] > ERROR_INCREASING * stats[2]; +} + +bool CDecayRateController::increaseDecayRateErrorDecreasing(const TDouble3Ary& stats) const { + return (m_Checks & E_PredictionErrorDecrease) && + stats[2] > ERROR_DECREASING * stats[1]; +} + +bool CDecayRateController::increaseDecayRateBiased(const TDouble3Ary& stats) const { + return (m_Checks & E_PredictionBias) && stats[0] > BIASED * stats[1]; +} + +bool CDecayRateController::decreaseDecayRateErrorNotIncreasing(const TDouble3Ary& stats) const { + return (m_Checks & E_PredictionErrorIncrease) == false || + stats[1] < ERROR_NOT_INCREASING * stats[2]; +} + +bool CDecayRateController::decreaseDecayRateErrorNotDecreasing(const TDouble3Ary& stats) const { + return (m_Checks & E_PredictionErrorDecrease) == false || + stats[2] < ERROR_NOT_DECREASING * stats[1]; +} + +bool CDecayRateController::decreaseDecayRateNotBiased(const TDouble3Ary& stats) const { + return (m_Checks & E_PredictionBias) == false || stats[0] < NOT_BIASED * stats[1]; +} } } diff --git a/lib/maths/CPRNG.cc b/lib/maths/CPRNG.cc index 3232e35cbc..26710a7476 100644 --- a/lib/maths/CPRNG.cc +++ b/lib/maths/CPRNG.cc @@ -9,6 +9,8 @@ #include #include +#include + #include namespace ml { @@ -71,6 +73,10 @@ bool CPRNG::CSplitMix64::fromString(const std::string& state) { return core::CStringUtils::stringToType(state, m_X); } +std::uint64_t CPRNG::CSplitMix64::checksum(std::uint64_t seed) const { + return CChecksum::calculate(seed, m_X); +} + const CPRNG::CSplitMix64::result_type CPRNG::CSplitMix64::A(0x9E3779B97F4A7C15); const CPRNG::CSplitMix64::result_type CPRNG::CSplitMix64::B(0xBF58476D1CE4E5B9); const CPRNG::CSplitMix64::result_type CPRNG::CSplitMix64::C(0x94D049BB133111EB); @@ -136,6 +142,10 @@ bool CPRNG::CXorOShiro128Plus::fromString(const std::string& state) { return core::CPersistUtils::fromString(state, &m_X[0], &m_X[2]); } +std::uint64_t CPRNG::CXorOShiro128Plus::checksum(std::uint64_t seed) const { + return CChecksum::calculate(seed, m_X); +} + const CPRNG::CXorOShiro128Plus::result_type CPRNG::CXorOShiro128Plus::JUMP[] = { 0xbeac0467eba5facb, 0xd86b048b86aa9922}; @@ -213,6 +223,10 @@ bool CPRNG::CXorShift1024Mult::fromString(std::string state) { return core::CPersistUtils::fromString(state, &m_X[0], &m_X[16]); } +std::uint64_t CPRNG::CXorShift1024Mult::checksum(std::uint64_t seed) const { + return CChecksum::calculate(seed, m_X); +} + const CPRNG::CXorShift1024Mult::result_type CPRNG::CXorShift1024Mult::A(1181783497276652981); const CPRNG::CXorShift1024Mult::result_type CPRNG::CXorShift1024Mult::JUMP[16] = { 0x84242f96eca9c41d, 0xa3c65b8776f96855, 0x5b34a39f070b5837, diff --git a/lib/maths/CTimeSeriesModel.cc b/lib/maths/CTimeSeriesModel.cc index c34b3723a9..117c1a4a83 100644 --- a/lib/maths/CTimeSeriesModel.cc +++ b/lib/maths/CTimeSeriesModel.cc @@ -99,6 +99,7 @@ double aggregateFeatureProbabilities(const TDouble4Vec& probabilities, double co const std::string VERSION_6_3_TAG("6.3"); const std::string VERSION_6_5_TAG("6.5"); const std::string VERSION_7_3_TAG("7.3"); +const std::string VERSION_7_11_TAG("7.11"); // Models // Version >= 6.3 @@ -1291,7 +1292,9 @@ std::size_t CUnivariateTimeSeriesModel::memoryUsage() const { bool CUnivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestoreParams& params, core::CStateRestoreTraverser& traverser) { - if (traverser.name() == VERSION_6_3_TAG) { + bool stateMissingControllerChecks{false}; + if (traverser.name() == VERSION_6_3_TAG || traverser.name() == VERSION_7_11_TAG) { + stateMissingControllerChecks = (traverser.name() == VERSION_6_3_TAG); while (traverser.next()) { const std::string& name{traverser.name()}; RESTORE_BUILT_IN(ID_6_3_TAG, m_Id) @@ -1329,6 +1332,7 @@ bool CUnivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestoreParam } } else { // There is no version string this is historic state. + stateMissingControllerChecks = true; do { const std::string& name{traverser.name()}; RESTORE_BUILT_IN(ID_OLD_TAG, m_Id) @@ -1356,6 +1360,17 @@ bool CUnivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestoreParam /**/) } while (traverser.next()); } + + if (m_Controllers != nullptr && stateMissingControllerChecks) { + (*m_Controllers)[E_TrendControl].checks(CDecayRateController::E_PredictionBias | + CDecayRateController::E_PredictionErrorIncrease); + } + if (m_Controllers != nullptr && stateMissingControllerChecks) { + (*m_Controllers)[E_ResidualControl].checks( + CDecayRateController::E_PredictionBias | CDecayRateController::E_PredictionErrorIncrease | + maths::CDecayRateController::E_PredictionErrorDecrease); + } + return true; } @@ -1378,7 +1393,7 @@ void CUnivariateTimeSeriesModel::acceptPersistInserter(core::CStatePersistInsert // Note that we don't persist this->params() or the correlations // because that state is reinitialized. - inserter.insertValue(VERSION_6_3_TAG, ""); + inserter.insertValue(VERSION_7_11_TAG, ""); inserter.insertValue(ID_6_3_TAG, m_Id); inserter.insertValue(IS_NON_NEGATIVE_6_3_TAG, static_cast(m_IsNonNegative)); inserter.insertValue(IS_FORECASTABLE_6_3_TAG, static_cast(m_IsForecastable)); @@ -2691,7 +2706,9 @@ std::size_t CMultivariateTimeSeriesModel::memoryUsage() const { bool CMultivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestoreParams& params, core::CStateRestoreTraverser& traverser) { - if (traverser.name() == VERSION_6_3_TAG) { + bool stateMissingControllerChecks{false}; + if (traverser.name() == VERSION_6_3_TAG || traverser.name() == VERSION_7_11_TAG) { + stateMissingControllerChecks = (traverser.name() == VERSION_6_3_TAG); while (traverser.next()) { const std::string& name{traverser.name()}; RESTORE_BOOL(IS_NON_NEGATIVE_6_3_TAG, m_IsNonNegative) @@ -2727,6 +2744,7 @@ bool CMultivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestorePar /**/) } } else { + stateMissingControllerChecks = true; do { const std::string& name{traverser.name()}; RESTORE_BOOL(IS_NON_NEGATIVE_OLD_TAG, m_IsNonNegative) @@ -2754,13 +2772,24 @@ bool CMultivariateTimeSeriesModel::acceptRestoreTraverser(const SModelRestorePar /**/) } while (traverser.next()); } + + if (m_Controllers != nullptr && stateMissingControllerChecks) { + (*m_Controllers)[E_TrendControl].checks(CDecayRateController::E_PredictionBias | + CDecayRateController::E_PredictionErrorIncrease); + } + if (m_Controllers != nullptr && stateMissingControllerChecks) { + (*m_Controllers)[E_ResidualControl].checks( + CDecayRateController::E_PredictionBias | CDecayRateController::E_PredictionErrorIncrease | + maths::CDecayRateController::E_PredictionErrorDecrease); + } + return true; } void CMultivariateTimeSeriesModel::acceptPersistInserter(core::CStatePersistInserter& inserter) const { // Note that we don't persist this->params() because that state // is reinitialized. - inserter.insertValue(VERSION_6_3_TAG, ""); + inserter.insertValue(VERSION_7_11_TAG, ""); inserter.insertValue(IS_NON_NEGATIVE_6_3_TAG, static_cast(m_IsNonNegative)); if (m_Controllers) { core::CPersistUtils::persist(CONTROLLER_6_3_TAG, *m_Controllers, inserter); diff --git a/lib/maths/unittest/CDecayRateControllerTest.cc b/lib/maths/unittest/CDecayRateControllerTest.cc index c652cacfd6..38959e9bf4 100644 --- a/lib/maths/unittest/CDecayRateControllerTest.cc +++ b/lib/maths/unittest/CDecayRateControllerTest.cc @@ -16,16 +16,18 @@ #include +#include + BOOST_AUTO_TEST_SUITE(CDecayRateControllerTest) using namespace ml; using namespace handy_typedefs; +using TDoubleVec = std::vector; BOOST_AUTO_TEST_CASE(testLowCov) { - // Supply small but biased errors so we increase the decay - // rate to its maximum then gradually reduce the error to - // less than the coefficient of variation cutoff to control - // and make sure the decay rate reverts to typical. + // Supply small but biased errors so we increase the decay rate to its maximum + // then gradually reduce the error to less than the coefficient of variation + // cutoff to control and make sure the decay rate reverts to typical. maths::CDecayRateController controller(maths::CDecayRateController::E_PredictionBias, 1); @@ -46,9 +48,8 @@ BOOST_AUTO_TEST_CASE(testLowCov) { } BOOST_AUTO_TEST_CASE(testOrderedErrors) { - // Test that if we add a number of ordered samples, such - // that overall they don't have bias, the decay rate is - // not increased. + // Test that if we add a number of ordered samples, such that overall they don't + // have bias, the decay rate is not increased. using TDouble1VecVec = std::vector; @@ -68,8 +69,6 @@ BOOST_AUTO_TEST_CASE(testOrderedErrors) { } BOOST_AUTO_TEST_CASE(testPersist) { - using TDoubleVec = std::vector; - test::CRandomNumbers rng; TDoubleVec values; @@ -109,4 +108,47 @@ BOOST_AUTO_TEST_CASE(testPersist) { } } +BOOST_AUTO_TEST_CASE(testBehaviourAfterPersistAndRestore) { + // Test that we get the same decisions after persisting and restoring. + + test::CRandomNumbers rng; + + TDoubleVec values; + rng.generateUniformSamples(1000.0, 1010.0, 1000, values); + + TDoubleVec errors; + rng.generateUniformSamples(-2.0, 6.0, 1000, errors); + + maths::CDecayRateController origController{ + maths::CDecayRateController::E_PredictionBias, 1}; + maths::CDecayRateController restoredController; + for (std::size_t i = 0; i < 500; ++i) { + origController.multiplier({values[i]}, {{errors[i]}}, 3600, 1.0, 0.0005); + } + + std::string origXml; + { + core::CRapidXmlStatePersistInserter inserter("root"); + origController.acceptPersistInserter(inserter); + inserter.toXml(origXml); + } + + // Restore the XML into a new controller. + { + core::CRapidXmlParser parser; + BOOST_TEST_REQUIRE(parser.parseStringIgnoreCdata(origXml)); + core::CRapidXmlStateRestoreTraverser traverser(parser); + BOOST_REQUIRE_EQUAL(true, traverser.traverseSubLevel(std::bind( + &maths::CDecayRateController::acceptRestoreTraverser, + &restoredController, std::placeholders::_1))); + } + + for (std::size_t i = 500; i < values.size(); ++i) { + BOOST_REQUIRE_CLOSE( + origController.multiplier({values[i]}, {{errors[i]}}, 3600, 1.0, 0.0005), + restoredController.multiplier({values[i]}, {{errors[i]}}, 3600, 1.0, 0.0005), + 0.001); + } +} + BOOST_AUTO_TEST_SUITE_END()