diff --git a/rdsn b/rdsn index 18120226e6..ec47b0cfd8 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 18120226e6123fbf2ed744cc580e914be6171f85 +Subproject commit ec47b0cfd8cbdc30b69b1d252e59588afb3f0b51 diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp index 2d81595ab8..053d72cbfd 100644 --- a/src/server/hotkey_collector.cpp +++ b/src/server/hotkey_collector.cpp @@ -21,32 +21,36 @@ #include #include #include -#include "base/pegasus_key_schema.h" #include #include +#include "base/pegasus_key_schema.h" namespace pegasus { namespace server { -DSN_DEFINE_int32("pegasus.server", - coarse_data_variance_threshold, - 3, - "the threshold of variance calculate to find the outliers"); +DSN_DEFINE_uint32( + "pegasus.server", + hot_bucket_variance_threshold, + 3, + "the variance threshold to detect hot bucket during coarse analysis of hotkey detection"); -DSN_DEFINE_validator(coarse_data_variance_threshold, - [](int32_t threshold) -> bool { return (threshold >= 0); }); +DSN_DEFINE_uint32( + "pegasus.server", + hot_key_variance_threshold, + 3, + "the variance threshold to detect hot key during fine analysis of hotkey detection"); // TODO: (Tangyanzhao) add a limit to avoid changing when detecting -DSN_DEFINE_int32("pegasus.server", - data_capture_hash_bucket_num, - 37, - "the number of data capture hash buckets"); +DSN_DEFINE_uint32("pegasus.server", + hotkey_buckets_num, + 37, + "the number of data capture hash buckets"); -DSN_DEFINE_validator(data_capture_hash_bucket_num, [](int32_t bucket_num) -> bool { +DSN_DEFINE_validator(hotkey_buckets_num, [](int32_t bucket_num) -> bool { if (bucket_num < 3) { return false; } - // data_capture_hash_bucket_num should be a prime number + // hotkey_buckets_num should be a prime number for (int i = 2; i <= bucket_num / i; i++) { if (bucket_num % i == 0) { return false; @@ -101,7 +105,7 @@ find_outlier_index(const std::vector &captured_keys, int threshold, in static int get_bucket_id(dsn::string_view data) { size_t hash_value = boost::hash_range(data.begin(), data.end()); - return static_cast(hash_value % FLAGS_data_capture_hash_bucket_num); + return static_cast(hash_value % FLAGS_hotkey_buckets_num); } hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type, @@ -226,7 +230,7 @@ bool hotkey_collector::terminate_if_timeout() } hotkey_coarse_data_collector::hotkey_coarse_data_collector(replica_base *base) - : internal_collector_base(base), _hash_buckets(FLAGS_data_capture_hash_bucket_num) + : internal_collector_base(base), _hash_buckets(FLAGS_hotkey_buckets_num) { for (auto &bucket : _hash_buckets) { bucket.store(0); @@ -240,16 +244,93 @@ void hotkey_coarse_data_collector::capture_data(const dsn::blob &hash_key, uint6 void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result) { - std::vector buckets(FLAGS_data_capture_hash_bucket_num); + std::vector buckets(FLAGS_hotkey_buckets_num); for (int i = 0; i < buckets.size(); i++) { buckets[i] = _hash_buckets[i].load(); _hash_buckets[i].store(0); } if (!find_outlier_index( - buckets, FLAGS_coarse_data_variance_threshold, result.coarse_bucket_index)) { + buckets, FLAGS_hot_bucket_variance_threshold, result.coarse_bucket_index)) { result.coarse_bucket_index = -1; } } +hotkey_fine_data_collector::hotkey_fine_data_collector(replica_base *base, + int target_bucket_index, + int max_queue_size) + : internal_collector_base(base), + _max_queue_size(max_queue_size), + _target_bucket_index(target_bucket_index), + _capture_key_queue(max_queue_size) +{ +} + +void hotkey_fine_data_collector::capture_data(const dsn::blob &hash_key, uint64_t weight) +{ + if (get_bucket_id(hash_key) != _target_bucket_index) { + return; + } + // abandon the key if enqueue failed (possibly because not enough room to enqueue) + _capture_key_queue.try_enqueue(std::make_pair(hash_key, weight)); +} + +struct blob_hash +{ + std::size_t operator()(const dsn::blob &str) const + { + dsn::string_view cp(str); + return boost::hash_range(cp.begin(), cp.end()); + } +}; + +struct blob_equal +{ + std::size_t operator()(const dsn::blob &lhs, const dsn::blob &rhs) const + { + return dsn::string_view(lhs) == dsn::string_view(rhs); + } +}; + +void hotkey_fine_data_collector::analyse_data(detect_hotkey_result &result) +{ + // hashkey -> weight + std::unordered_map hash_keys_weight; + std::pair key_weight_pair; + // prevent endless loop, limit the number of elements analyzed not to exceed the queue size + uint32_t dequeue_cnt = 0; + while (++dequeue_cnt <= _max_queue_size && _capture_key_queue.try_dequeue(key_weight_pair)) { + hash_keys_weight[key_weight_pair.first] += key_weight_pair.second; + } + + if (hash_keys_weight.empty()) { + return; + } + + // the weight of all the collected hash keys + std::vector weights; + weights.reserve(hash_keys_weight.size()); + dsn::string_view weight_max_key; // the hashkey with the max weight + uint64_t weight_max = 0; // the max weight by far + for (const auto &iter : hash_keys_weight) { + weights.push_back(iter.second); + if (iter.second > weight_max) { + weight_max = iter.second; + weight_max_key = iter.first; + } + } + + // hash_key_counts stores the number of occurrences of each string captured in a period of time + // The size of weights influences our hotkey determination strategy + // weights.size() <= 2: the hotkey must exist (the most weighted key), because + // the two-level filtering significantly reduces the + // possibility that the hottest key is not the actual hotkey. + // weights.size() >= 3: use find_outlier_index to determine whether a hotkey exists + int hot_index; + if (weights.size() < 3 || + find_outlier_index(weights, FLAGS_hot_key_variance_threshold, hot_index)) { + result.hot_hash_key = std::string(weight_max_key); + } +} + } // namespace server } // namespace pegasus diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h index d4b43a4f69..937d07da25 100644 --- a/src/server/hotkey_collector.h +++ b/src/server/hotkey_collector.h @@ -18,8 +18,9 @@ #pragma once #include -#include "hotkey_collector_state.h" +#include #include +#include "hotkey_collector_state.h" namespace pegasus { namespace server { @@ -29,6 +30,7 @@ class internal_collector_base; struct detect_hotkey_result { int coarse_bucket_index = -1; + std::string hot_hash_key; }; // hotkey_collector is responsible to find the hot keys after the partition @@ -120,8 +122,26 @@ class hotkey_coarse_data_collector : public internal_collector_base void analyse_data(detect_hotkey_result &result) override; private: + hotkey_coarse_data_collector() = delete; + std::vector> _hash_buckets; }; +class hotkey_fine_data_collector : public internal_collector_base +{ +public: + hotkey_fine_data_collector(replica_base *base, int target_bucket_index, int max_queue_size); + void capture_data(const dsn::blob &hash_key, uint64_t weight) override; + void analyse_data(detect_hotkey_result &result) override; + +private: + hotkey_fine_data_collector() = delete; + + const uint32_t _max_queue_size; + const uint32_t _target_bucket_index; + // ConcurrentQueue is a lock-free queue to capture keys + moodycamel::ConcurrentQueue> _capture_key_queue; +}; + } // namespace server } // namespace pegasus