Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hotkey): capture data part2 - declare coarse collector #624

Merged
merged 26 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 52 additions & 31 deletions src/server/hotkey_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,26 @@ DSN_DEFINE_int32("pegasus.server",
3,
"the threshold of variance calculate to find the outliers");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the blank line

DSN_DEFINE_validator(coarse_data_variance_threshold,
[](int32_t threshold) -> bool { return (threshold >= 0); });

DSN_DEFINE_int32("pegasus.server",
data_capture_hash_bucket_num,
37,
"the number of data capture hash buckets");
Smityz marked this conversation as resolved.
Show resolved Hide resolved

DSN_DEFINE_validator(data_capture_hash_bucket_num, [](int32_t bucket_num) -> bool {
if (bucket_num < 3) {
return false;
}
// data_capture_hash_bucket_num should be a prime number
for (int i = 2; i <= bucket_num / i; i++) {
if (bucket_num % i == 0)
return false;
Smityz marked this conversation as resolved.
Show resolved Hide resolved
}
return true;
});

hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
dsn::replication::replica_base *r_base)
: replica_base(r_base),
Expand Down Expand Up @@ -147,7 +162,7 @@ void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &
hotkey_coarse_data_collector::hotkey_coarse_data_collector(replica_base *base)
: internal_collector_base(base), _hash_buckets(FLAGS_data_capture_hash_bucket_num)
{
for (std::atomic<uint64_t> &bucket : _hash_buckets) {
for (auto &bucket : _hash_buckets) {
bucket.store(0);
}
}
Expand All @@ -164,43 +179,49 @@ void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result)
buckets[i] = _hash_buckets[i].load();
_hash_buckets[i].store(0);
}
result = internal_analysis_method(buckets, FLAGS_coarse_data_variance_threshold);
outlier_detection detection(buckets, FLAGS_coarse_data_variance_threshold);
int hotindex = -1;
Smityz marked this conversation as resolved.
Show resolved Hide resolved
if (detection.find_hotindex(hotindex)) {
result.coarse_bucket_index = hotindex;
}
}

detect_hotkey_result
hotkey_coarse_data_collector::internal_analysis_method(const std::vector<uint64_t> &captured_keys,
int threshold)
outlier_detection::outlier_detection(const std::vector<uint64_t> &captured_keys, int threshold)
: _data_size(captured_keys.size()), _threshold(threshold)

{
int data_size = captured_keys.size();
dcheck_gt(captured_keys.size(), 2);
// empirical rule to calculate hot point of each partition
// same algorithm as hotspot_partition_calculator::stat_histories_analyse
double table_captured_key_sum = 0;
int hot_index = 0;
int hot_value = 0;
for (int i = 0; i < data_size; i++) {
table_captured_key_sum += captured_keys[i];
if (captured_keys[i] > hot_value) {
hot_index = i;
hot_value = captured_keys[i];
calculate_data_count(captured_keys);
calculate_standard_deviation(captured_keys);
}

Smityz marked this conversation as resolved.
Show resolved Hide resolved
void outlier_detection::calculate_data_count(const std::vector<uint64_t> &captured_keys)
{
for (int i = 0; i < _data_size; i++) {
_data_count += captured_keys[i];
if (captured_keys[i] > _hot_value) {
_hot_index = i;
_hot_value = captured_keys[i];
}
}
Smityz marked this conversation as resolved.
Show resolved Hide resolved
// TODO: (Tangyanzhao) increase a judgment of table_captured_key_sum
double captured_keys_avg_count =
(table_captured_key_sum - captured_keys[hot_index]) / (data_size - 1);
double standard_deviation = 0;
for (int i = 0; i < data_size; i++) {
if (i != hot_index) {
standard_deviation += pow((captured_keys[i] - captured_keys_avg_count), 2);
}

void outlier_detection::calculate_standard_deviation(const std::vector<uint64_t> &captured_keys)
{
dcheck_gt(captured_keys.size(), 2);
_avg_count = (_data_count - captured_keys[_hot_index]) / (_data_size - 1);
for (int i = 0; i < _data_size; i++) {
if (i != _hot_index) {
_standard_deviation += pow((captured_keys[i] - _avg_count), 2);
}
}
standard_deviation = sqrt(standard_deviation / (data_size - 2));
double hot_point = (hot_value - captured_keys_avg_count) / standard_deviation;
detect_hotkey_result result;
if (hot_point > threshold) {
result.coarse_bucket_index = hot_index;
}
return result;
_standard_deviation = sqrt(_standard_deviation / (_data_size - 2));
}

bool outlier_detection::find_hotindex(int &hot_index)
{
double hot_point = (_hot_value - _avg_count) / _standard_deviation;
hot_index = _hot_index;
return hot_point >= _threshold;
}

} // namespace server
Expand Down
24 changes: 21 additions & 3 deletions src/server/hotkey_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,29 @@ class hotkey_coarse_data_collector : public internal_collector_base
void analyse_data(detect_hotkey_result &result) override;

private:
detect_hotkey_result internal_analysis_method(const std::vector<uint64_t> &captured_keys,
int threshold);

std::vector<std::atomic<uint64_t>> _hash_buckets;
};

// 68–95–99.7 rule, same algorithm as hotspot_partition_calculator::stat_histories_analyse
class outlier_detection
{
public:
outlier_detection() = delete;
outlier_detection(const std::vector<uint64_t> &captured_keys, int threshold);
bool find_hotindex(int &hot_index);

private:
void calculate_data_count(const std::vector<uint64_t> &captured_keys);
void calculate_standard_deviation(const std::vector<uint64_t> &captured_keys);

int _hot_index;
int _data_count;
int _data_size;
double _standard_deviation;
int _threshold;
int _hot_value;
double _avg_count;
};

} // namespace server
} // namespace pegasus