Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: split read/write hotspot of hotspot_partition_calculator #592

Merged
merged 18 commits into from
Sep 17, 2020
Merged
108 changes: 67 additions & 41 deletions src/server/hotspot_partition_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <rrdb/rrdb_types.h>
#include <dsn/utility/flags.h>

namespace pegasus {
Expand All @@ -34,67 +35,92 @@ DSN_DEFINE_int64("pegasus.collector",
"eliminate outdated historical "
"data");

void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partitions)
void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partition_stats)
{
while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 1) {
_partition_stat_histories.pop();
while (_partitions_stat_histories.size() >= FLAGS_max_hotspot_store_size) {
_partitions_stat_histories.pop_front();
}
std::vector<hotspot_partition_data> temp(partitions.size());
// TODO refactor the data structure
for (int i = 0; i < partitions.size(); i++) {
temp[i] = std::move(hotspot_partition_data(partitions[i]));
std::vector<hotspot_partition_stat> temp;
for (const auto &partition_stat : partition_stats) {
temp.emplace_back(hotspot_partition_stat(partition_stat));
hycdong marked this conversation as resolved.
Show resolved Hide resolved
}
_partition_stat_histories.emplace(temp);
_partitions_stat_histories.emplace_back(temp);
}

void hotspot_partition_calculator::init_perf_counter(int partition_count)
{
std::string counter_name;
std::string counter_desc;
for (int i = 0; i < partition_count; i++) {
string partition_desc = _app_name + '.' + std::to_string(i);
counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
counter_desc = fmt::format("statistic the hotspots of app {}", partition_desc);
_hot_points[i].init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
for (int data_type = 0; data_type <= 1; data_type++) {
for (int i = 0; i < partition_count; i++) {
string partition_desc =
_app_name + '.' +
(data_type == partition_qps_type::WRITE_HOTSPOT_DATA ? "write." : "read.") +
std::to_string(i);
std::string counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
std::string counter_desc =
fmt::format("statistic the hotspots of app {}", partition_desc);
_hot_points[i].emplace_back(std::make_unique<dsn::perf_counter_wrapper>());
_hot_points[i][data_type]->init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
}
}
}

void hotspot_partition_calculator::data_analyse()
void hotspot_partition_calculator::stat_histories_analyse(int data_type,
std::vector<int> &hot_points)
{
dassert(_partition_stat_histories.back().size() == _hot_points.size(),
"partition counts error, please check");
std::vector<double> data_samples;
data_samples.reserve(_partition_stat_histories.size() * _hot_points.size());
auto temp_data = _partition_stat_histories;
double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
int sample_count = 0;
while (!temp_data.empty()) {
for (const auto &partition_data : temp_data.front()) {
if (partition_data.total_qps - 1.00 > 0) {
data_samples.push_back(partition_data.total_qps);
table_qps_sum += partition_data.total_qps;
sample_count++;
}
for (const auto &one_partition_stat_histories : _partitions_stat_histories) {
for (const auto &partition_stat : one_partition_stat_histories) {
table_qps_sum += partition_stat.total_qps[data_type];
sample_count++;
}
temp_data.pop();
}
if (sample_count == 0) {
ddebug("_partition_stat_histories size == 0");
if (sample_count <= 1) {
ddebug("_partitions_stat_histories size <= 1, not enough data for calculation");
return;
}
table_qps_avg = table_qps_sum / sample_count;
for (const auto &data_sample : data_samples) {
standard_deviation += pow((data_sample - table_qps_avg), 2);
for (const auto &one_partition_stat_histories : _partitions_stat_histories) {
for (const auto &partition_stat : one_partition_stat_histories) {
standard_deviation += pow((partition_stat.total_qps[data_type] - table_qps_avg), 2);
}
}
standard_deviation = sqrt(standard_deviation / sample_count);
const auto &anly_data = _partition_stat_histories.back();
for (int i = 0; i < _hot_points.size(); i++) {
double hot_point = (anly_data[i].total_qps - table_qps_avg) / standard_deviation;
// perf_counter->set can only be unsigned __int64
standard_deviation = sqrt(standard_deviation / (sample_count - 1));
const auto &anly_data = _partitions_stat_histories.back();
int hot_point_size = _hot_points.size();
hot_points.resize(hot_point_size);
for (int i = 0; i < hot_point_size; i++) {
double hot_point = 0;
if (standard_deviation != 0) {
hot_point = (anly_data[i].total_qps[data_type] - table_qps_avg) / standard_deviation;
}
Smityz marked this conversation as resolved.
Show resolved Hide resolved
// perf_counter->set can only be unsigned uint64_t
// use ceil to guarantee conversion results
hot_point = ceil(std::max(hot_point, double(0)));
_hot_points[i]->set(hot_point);
hot_points[i] = ceil(std::max(hot_point, double(0)));
}
}

void hotspot_partition_calculator::update_hot_point(int data_type, std::vector<int> &hot_points)
{
dcheck_eq(_hot_points.size(), hot_points.size());
int size = hot_points.size();
for (int i = 0; i < size; i++) {
_hot_points[i][data_type]->get()->set(hot_points[i]);
}
}

void hotspot_partition_calculator::data_analyse()
{
dassert(_partitions_stat_histories.back().size() == _hot_points.size(),
"The number of partitions in this table has changed, and hotspot analysis cannot be "
"performed,in %s",
_app_name.c_str());
for (int data_type = 0; data_type <= 1; data_type++) {
// data_type 0: READ_HOTSPOT_DATA; 1: WRITE_HOTSPOT_DATA
std::vector<int> hot_points;
stat_histories_analyse(data_type, hot_points);
update_hot_point(data_type, hot_points);
}
}

Expand Down
22 changes: 17 additions & 5 deletions src/server/hotspot_partition_calculator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@

#pragma once

#include "hotspot_partition_data.h"
#include "hotspot_partition_stat.h"
#include <gtest/gtest_prod.h>
#include <dsn/perf_counter/perf_counter.h>

namespace pegasus {
namespace server {

// stores the whole histories of all partitions in one table
typedef std::list<std::vector<hotspot_partition_stat>> stat_histories;
// hot_partition_counters c[index_of_partitions][type_of_read(0)/write(1)_stat]
// so if we have n partitions, we will get 2*n hot_partition_counters, to demonstrate both
// read/write hotspot value
typedef std::vector<std::vector<std::unique_ptr<dsn::perf_counter_wrapper>>> hot_partition_counters;
Smityz marked this conversation as resolved.
Show resolved Hide resolved
Smityz marked this conversation as resolved.
Show resolved Hide resolved

// hotspot_partition_calculator is used to find the hot partition in a table.
class hotspot_partition_calculator
{
Expand All @@ -39,15 +46,20 @@ class hotspot_partition_calculator
void data_analyse();

private:
const std::string _app_name;
// empirical rule to calculate hot point of each partition
// ref: https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule
void stat_histories_analyse(int data_type, std::vector<int> &hot_points);
// set hot_point to corresponding perf_counter
void update_hot_point(int data_type, std::vector<int> &hot_points);

const std::string _app_name;
void init_perf_counter(int perf_counter_count);
// usually a partition with "hot-point value" >= 3 can be considered as a hotspot partition.
hycdong marked this conversation as resolved.
Show resolved Hide resolved
std::vector<dsn::perf_counter_wrapper> _hot_points;
hot_partition_counters _hot_points;
// saving historical data can improve accuracy
std::queue<std::vector<hotspot_partition_data>> _partition_stat_histories;
stat_histories _partitions_stat_histories;

FRIEND_TEST(hotspot_partition_calculator, hotspot_partition_policy);
friend class hotspot_partition_test;
};

} // namespace server
Expand Down
20 changes: 0 additions & 20 deletions src/server/hotspot_partition_data.h

This file was deleted.

43 changes: 43 additions & 0 deletions src/server/hotspot_partition_stat.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "shell/command_helper.h"

namespace pegasus {
namespace server {

enum partition_qps_type
{
READ_HOTSPOT_DATA = 0,
WRITE_HOTSPOT_DATA
};

struct hotspot_partition_stat
{
hotspot_partition_stat(const row_data &row)
Smityz marked this conversation as resolved.
Show resolved Hide resolved
{
total_qps[READ_HOTSPOT_DATA] = row.get_total_read_qps();
total_qps[WRITE_HOTSPOT_DATA] = row.get_total_write_qps();
}
hotspot_partition_stat() {}
double total_qps[2];
};

} // namespace server
} // namespace pegasus
87 changes: 68 additions & 19 deletions src/server/test/hotspot_partition_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,81 @@

#include "server/hotspot_partition_calculator.h"

#include "pegasus_server_test_base.h"
#include <gtest/gtest.h>

namespace pegasus {
namespace server {

TEST(hotspot_partition_calculator, hotspot_partition_policy)
class hotspot_partition_test : public pegasus_server_test_base
{
// TODO: refactor the unit test
std::vector<row_data> test_rows(8);
test_rows[0].get_qps = 1000.0;
test_rows[1].get_qps = 1000.0;
test_rows[2].get_qps = 1000.0;
test_rows[3].get_qps = 1000.0;
test_rows[4].get_qps = 1000.0;
test_rows[5].get_qps = 1000.0;
test_rows[6].get_qps = 1000.0;
test_rows[7].get_qps = 5000.0;
hotspot_partition_calculator test_hotspot_calculator("TEST", 8);
test_hotspot_calculator.data_aggregate(test_rows);
test_hotspot_calculator.data_analyse();
std::vector<double> result(8);
for (int i = 0; i < test_hotspot_calculator._hot_points.size(); i++) {
result[i] = test_hotspot_calculator._hot_points[i]->get_value();
public:
hotspot_partition_test() : calculator("TEST", 8){};
hotspot_partition_calculator calculator;
std::vector<row_data> generate_row_data()
{
std::vector<row_data> test_rows;
test_rows.resize(8);
for (int i = 0; i < 8; i++) {
test_rows[i].get_qps = 1000.0;
test_rows[i].put_qps = 1000.0;
}
return test_rows;
}
std::vector<double> expect_vector{0, 0, 0, 0, 0, 0, 0, 3};
ASSERT_EQ(expect_vector, result);
std::vector<std::vector<double>> get_calculator_result(const hot_partition_counters &counters)
{
std::vector<std::vector<double>> result;
result.resize(2);
for (int i = 0; i < counters.size(); i++) {
result[READ_HOTSPOT_DATA].push_back(counters[i][READ_HOTSPOT_DATA]->get()->get_value());
result[WRITE_HOTSPOT_DATA].push_back(
counters[i][WRITE_HOTSPOT_DATA]->get()->get_value());
}
return result;
}
void test_policy_in_scenarios(std::vector<row_data> scenario,
std::vector<std::vector<double>> &expect_result,
hotspot_partition_calculator &calculator)
{
calculator.data_aggregate(std::move(scenario));
calculator.data_analyse();
std::vector<std::vector<double>> result = get_calculator_result(calculator._hot_points);
ASSERT_EQ(result, expect_result);
}
};

TEST_F(hotspot_partition_test, hotspot_partition_policy)
{
// Insert normal scenario data to test
std::vector<row_data> test_rows = generate_row_data();
std::vector<std::vector<double>> expect_vector = {{0, 0, 0, 0, 0, 0, 0, 0},
{0, 0, 0, 0, 0, 0, 0, 0}};
test_policy_in_scenarios(test_rows, expect_vector, calculator);

// Insert hotspot scenario_0 data to test
test_rows = generate_row_data();
const int HOT_SCENARIO_0_READ_HOT_PARTITION = 7;
const int HOT_SCENARIO_0_WRITE_HOT_PARTITION = 0;
test_rows[HOT_SCENARIO_0_READ_HOT_PARTITION].get_qps = 5000.0;
test_rows[HOT_SCENARIO_0_WRITE_HOT_PARTITION].put_qps = 5000.0;
expect_vector = {{0, 0, 0, 0, 0, 0, 0, 4}, {4, 0, 0, 0, 0, 0, 0, 0}};
test_policy_in_scenarios(test_rows, expect_vector, calculator);

// Insert hotspot scenario_0 data to test again
test_rows = generate_row_data();
test_rows[HOT_SCENARIO_0_READ_HOT_PARTITION].get_qps = 5000.0;
test_rows[HOT_SCENARIO_0_WRITE_HOT_PARTITION].put_qps = 5000.0;
expect_vector = {{0, 0, 0, 0, 0, 0, 0, 4}, {4, 0, 0, 0, 0, 0, 0, 0}};
test_policy_in_scenarios(test_rows, expect_vector, calculator);

// Insert hotspot scenario_1 data to test again
test_rows = generate_row_data();
const int HOT_SCENARIO_1_READ_HOT_PARTITION = 3;
const int HOT_SCENARIO_1_WRITE_HOT_PARTITION = 2;
test_rows[HOT_SCENARIO_1_READ_HOT_PARTITION].get_qps = 5000.0;
test_rows[HOT_SCENARIO_1_WRITE_HOT_PARTITION].put_qps = 5000.0;
expect_vector = {{0, 0, 0, 4, 0, 0, 0, 0}, {0, 0, 4, 0, 0, 0, 0, 0}};
test_policy_in_scenarios(test_rows, expect_vector, calculator);
}

} // namespace server
Expand Down
8 changes: 8 additions & 0 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,14 @@ struct row_data

double get_total_cu() const { return recent_read_cu + recent_write_cu; }

double get_total_read_qps() const { return get_qps + multi_get_qps + scan_qps; }

double get_total_write_qps() const
hycdong marked this conversation as resolved.
Show resolved Hide resolved
{
return put_qps + remove_qps + multi_put_qps + multi_remove_qps + check_and_set_qps +
check_and_mutate_qps;
}

std::string row_name;
int32_t app_id = 0;
int32_t partition_count = 0;
Expand Down