From e17d5c7a69570d76bbefd0777b429a6e878ec93e Mon Sep 17 00:00:00 2001 From: Smilencer <527646889@qq.com> Date: Thu, 22 Oct 2020 14:02:48 +0800 Subject: [PATCH] feat(hotkey): collector can be terminated by timeout (#625) --- src/server/hotkey_collector.cpp | 44 +++++++++++++++++-- src/server/hotkey_collector.h | 4 ++ .../test/capacity_unit_calculator_test.cpp | 4 +- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp index dfde0c5062..16984eeba5 100644 --- a/src/server/hotkey_collector.cpp +++ b/src/server/hotkey_collector.cpp @@ -21,16 +21,24 @@ #include #include "base/pegasus_key_schema.h" #include +#include namespace pegasus { namespace server { +DSN_DEFINE_int32( + "pegasus.server", + max_seconds_to_detect_hotkey, + 150, + "the max time (in seconds) allowed to capture hotkey, will stop if hotkey's not found"); + hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type, dsn::replication::replica_base *r_base) : replica_base(r_base), _state(hotkey_collector_state::STOPPED), _hotkey_type(hotkey_type), - _internal_collector(std::make_shared()) + _internal_collector(std::make_shared()), + _collector_start_time_second(0) { } @@ -65,7 +73,18 @@ void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weigh _internal_collector->capture_data(hash_key, weight); } -void hotkey_collector::analyse_data() { _internal_collector->analyse_data(); } +void hotkey_collector::analyse_data() +{ + switch (_state.load()) { + case hotkey_collector_state::COARSE_DETECTING: + if (!terminate_if_timeout()) { + _internal_collector->analyse_data(); + } + return; + default: + return; + } +} void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response &resp) { @@ -88,6 +107,7 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response dwarn_replica(hint); return; case hotkey_collector_state::STOPPED: + _collector_start_time_second = dsn_now_s(); // TODO: (Tangyanzhao) start coarse detecting _state.store(hotkey_collector_state::COARSE_DETECTING); resp.err = dsn::ERR_OK; @@ -105,13 +125,29 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp) { - _state.store(hotkey_collector_state::STOPPED); - _internal_collector.reset(); + terminate(); resp.err = dsn::ERR_OK; std::string hint = fmt::format("{} hotkey stopped, cache cleared", dsn::enum_to_string(_hotkey_type)); ddebug_replica(hint); } +void hotkey_collector::terminate() +{ + _state.store(hotkey_collector_state::STOPPED); + _internal_collector.reset(); + _collector_start_time_second = 0; +} + +bool hotkey_collector::terminate_if_timeout() +{ + if (dsn_now_s() >= _collector_start_time_second + FLAGS_max_seconds_to_detect_hotkey) { + ddebug_replica("hotkey collector work time is exhausted but no hotkey has been found"); + terminate(); + return true; + } + return false; +} + } // namespace server } // namespace pegasus diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h index c649fac541..6cf34cc8bb 100644 --- a/src/server/hotkey_collector.h +++ b/src/server/hotkey_collector.h @@ -79,9 +79,13 @@ class hotkey_collector : public dsn::replication::replica_base private: void on_start_detect(dsn::replication::detect_hotkey_response &resp); void on_stop_detect(dsn::replication::detect_hotkey_response &resp); + void terminate(); + bool terminate_if_timeout(); + std::atomic _state; const dsn::replication::hotkey_type::type _hotkey_type; std::shared_ptr _internal_collector; + uint64_t _collector_start_time_second; }; class internal_collector_base diff --git a/src/server/test/capacity_unit_calculator_test.cpp b/src/server/test/capacity_unit_calculator_test.cpp index 8964af5849..5dbf2126cb 100644 --- a/src/server/test/capacity_unit_calculator_test.cpp +++ b/src/server/test/capacity_unit_calculator_test.cpp @@ -30,8 +30,8 @@ class mock_capacity_unit_calculator : public capacity_unit_calculator explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r) : capacity_unit_calculator( r, - std::make_shared(dsn::replication::hotkey_type::READ, this), - std::make_shared(dsn::replication::hotkey_type::WRITE, this)) + std::make_shared(dsn::replication::hotkey_type::READ, r), + std::make_shared(dsn::replication::hotkey_type::WRITE, r)) { }