diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp index 16984eeba5..2d81595ab8 100644 --- a/src/server/hotkey_collector.cpp +++ b/src/server/hotkey_collector.cpp @@ -19,6 +19,8 @@ #include #include +#include +#include #include "base/pegasus_key_schema.h" #include #include @@ -26,18 +28,88 @@ namespace pegasus { namespace server { +DSN_DEFINE_int32("pegasus.server", + coarse_data_variance_threshold, + 3, + "the threshold of variance calculate to find the outliers"); + +DSN_DEFINE_validator(coarse_data_variance_threshold, + [](int32_t threshold) -> bool { return (threshold >= 0); }); + +// TODO: (Tangyanzhao) add a limit to avoid changing when detecting +DSN_DEFINE_int32("pegasus.server", + data_capture_hash_bucket_num, + 37, + "the number of data capture hash buckets"); + +DSN_DEFINE_validator(data_capture_hash_bucket_num, [](int32_t bucket_num) -> bool { + if (bucket_num < 3) { + return false; + } + // data_capture_hash_bucket_num should be a prime number + for (int i = 2; i <= bucket_num / i; i++) { + if (bucket_num % i == 0) { + return false; + } + } + return true; +}); + DSN_DEFINE_int32( "pegasus.server", max_seconds_to_detect_hotkey, 150, "the max time (in seconds) allowed to capture hotkey, will stop if hotkey's not found"); +// 68–95–99.7 rule, same algorithm as hotspot_partition_calculator::stat_histories_analyse +static bool +find_outlier_index(const std::vector &captured_keys, int threshold, int &hot_index) +{ + dcheck_gt(captured_keys.size(), 2); + int data_size = captured_keys.size(); + // empirical rule to calculate hot point of each partition + // same algorithm as hotspot_partition_calculator::stat_histories_analyse + double table_captured_key_sum = 0; + int hot_value = 0; + for (int i = 0; i < data_size; i++) { + table_captured_key_sum += captured_keys[i]; + if (captured_keys[i] > hot_value) { + hot_index = i; + hot_value = captured_keys[i]; + } + } + // TODO: (Tangyanzhao) increase a judgment of table_captured_key_sum + double captured_keys_avg_count = + (table_captured_key_sum - captured_keys[hot_index]) / (data_size - 1); + double standard_deviation = 0; + for (int i = 0; i < data_size; i++) { + if (i != hot_index) { + standard_deviation += pow((captured_keys[i] - captured_keys_avg_count), 2); + } + } + standard_deviation = sqrt(standard_deviation / (data_size - 2)); + double hot_point = (hot_value - captured_keys_avg_count) / standard_deviation; + if (hot_point >= threshold) { + return true; + } else { + hot_index = -1; + return false; + } +} + +// TODO: (Tangyanzhao) replace it to xxhash +static int get_bucket_id(dsn::string_view data) +{ + size_t hash_value = boost::hash_range(data.begin(), data.end()); + return static_cast(hash_value % FLAGS_data_capture_hash_bucket_num); +} + hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type, dsn::replication::replica_base *r_base) : replica_base(r_base), _state(hotkey_collector_state::STOPPED), _hotkey_type(hotkey_type), - _internal_collector(std::make_shared()), + _internal_collector(std::make_shared(this)), _collector_start_time_second(0) { } @@ -78,7 +150,11 @@ void hotkey_collector::analyse_data() switch (_state.load()) { case hotkey_collector_state::COARSE_DETECTING: if (!terminate_if_timeout()) { - _internal_collector->analyse_data(); + _internal_collector->analyse_data(_result); + if (_result.coarse_bucket_index != -1) { + // TODO: (Tangyanzhao) reset _internal_collector to hotkey_fine_data_collector + _state.store(hotkey_collector_state::FINE_DETECTING); + } } return; default: @@ -108,7 +184,7 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response return; case hotkey_collector_state::STOPPED: _collector_start_time_second = dsn_now_s(); - // TODO: (Tangyanzhao) start coarse detecting + _internal_collector.reset(new hotkey_coarse_data_collector(this)); _state.store(hotkey_collector_state::COARSE_DETECTING); resp.err = dsn::ERR_OK; hint = fmt::format("starting to detect {} hotkey", dsn::enum_to_string(_hotkey_type)); @@ -149,5 +225,31 @@ bool hotkey_collector::terminate_if_timeout() return false; } +hotkey_coarse_data_collector::hotkey_coarse_data_collector(replica_base *base) + : internal_collector_base(base), _hash_buckets(FLAGS_data_capture_hash_bucket_num) +{ + for (auto &bucket : _hash_buckets) { + bucket.store(0); + } +} + +void hotkey_coarse_data_collector::capture_data(const dsn::blob &hash_key, uint64_t weight) +{ + _hash_buckets[get_bucket_id(hash_key)].fetch_add(weight); +} + +void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result) +{ + std::vector buckets(FLAGS_data_capture_hash_bucket_num); + for (int i = 0; i < buckets.size(); i++) { + buckets[i] = _hash_buckets[i].load(); + _hash_buckets[i].store(0); + } + if (!find_outlier_index( + buckets, FLAGS_coarse_data_variance_threshold, result.coarse_bucket_index)) { + result.coarse_bucket_index = -1; + } +} + } // namespace server } // namespace pegasus diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h index 6cf34cc8bb..d4b43a4f69 100644 --- a/src/server/hotkey_collector.h +++ b/src/server/hotkey_collector.h @@ -26,6 +26,11 @@ namespace server { class internal_collector_base; +struct detect_hotkey_result +{ + int coarse_bucket_index = -1; +}; + // hotkey_collector is responsible to find the hot keys after the partition // was detected to be hot. The two types of hotkey, READ & WRITE, are detected // separately. @@ -82,25 +87,40 @@ class hotkey_collector : public dsn::replication::replica_base void terminate(); bool terminate_if_timeout(); + detect_hotkey_result _result; std::atomic _state; const dsn::replication::hotkey_type::type _hotkey_type; std::shared_ptr _internal_collector; uint64_t _collector_start_time_second; }; -class internal_collector_base +class internal_collector_base : public dsn::replication::replica_base { public: + explicit internal_collector_base(replica_base *base) : replica_base(base){}; virtual void capture_data(const dsn::blob &hash_key, uint64_t weight) = 0; - virtual void analyse_data() = 0; + virtual void analyse_data(detect_hotkey_result &result) = 0; }; // used in hotkey_collector_state::STOPPED and hotkey_collector_state::FINISHED, avoid null pointers class hotkey_empty_data_collector : public internal_collector_base { public: - void capture_data(const dsn::blob &hash_key, uint64_t size) {} - void analyse_data() {} + explicit hotkey_empty_data_collector(replica_base *base) : internal_collector_base(base) {} + void capture_data(const dsn::blob &hash_key, uint64_t weight) override {} + void analyse_data(detect_hotkey_result &result) override {} +}; + +// TODO: (Tangyanzhao) add a unit test of hotkey_coarse_data_collector +class hotkey_coarse_data_collector : public internal_collector_base +{ +public: + explicit hotkey_coarse_data_collector(replica_base *base); + void capture_data(const dsn::blob &hash_key, uint64_t weight) override; + void analyse_data(detect_hotkey_result &result) override; + +private: + std::vector> _hash_buckets; }; } // namespace server