Skip to content

Commit

Permalink
refactor(collector): sort out the structure of partition hotspot dete…
Browse files Browse the repository at this point in the history
…ction (#597)
  • Loading branch information
Smityz authored Sep 10, 2020
1 parent d2485ff commit b057848
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 219 deletions.
102 changes: 102 additions & 0 deletions src/server/hotspot_partition_calculator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "hotspot_partition_calculator.h"

#include <algorithm>
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>

namespace pegasus {
namespace server {

DSN_DEFINE_int64("pegasus.collector",
max_hotspot_store_size,
100,
"the max count of historical data "
"stored in calculator, The FIFO "
"queue design is used to "
"eliminate outdated historical "
"data");

void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partitions)
{
while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 1) {
_partition_stat_histories.pop();
}
std::vector<hotspot_partition_data> temp(partitions.size());
// TODO refactor the data structure
for (int i = 0; i < partitions.size(); i++) {
temp[i] = std::move(hotspot_partition_data(partitions[i]));
}
_partition_stat_histories.emplace(temp);
}

void hotspot_partition_calculator::init_perf_counter(int partition_count)
{
std::string counter_name;
std::string counter_desc;
for (int i = 0; i < partition_count; i++) {
string partition_desc = _app_name + '.' + std::to_string(i);
counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
counter_desc = fmt::format("statistic the hotspots of app {}", partition_desc);
_hot_points[i].init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
}
}

void hotspot_partition_calculator::data_analyse()
{
dassert(_partition_stat_histories.back().size() == _hot_points.size(),
"partition counts error, please check");
std::vector<double> data_samples;
data_samples.reserve(_partition_stat_histories.size() * _hot_points.size());
auto temp_data = _partition_stat_histories;
double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
int sample_count = 0;
while (!temp_data.empty()) {
for (const auto &partition_data : temp_data.front()) {
if (partition_data.total_qps - 1.00 > 0) {
data_samples.push_back(partition_data.total_qps);
table_qps_sum += partition_data.total_qps;
sample_count++;
}
}
temp_data.pop();
}
if (sample_count == 0) {
ddebug("_partition_stat_histories size == 0");
return;
}
table_qps_avg = table_qps_sum / sample_count;
for (const auto &data_sample : data_samples) {
standard_deviation += pow((data_sample - table_qps_avg), 2);
}
standard_deviation = sqrt(standard_deviation / sample_count);
const auto &anly_data = _partition_stat_histories.back();
for (int i = 0; i < _hot_points.size(); i++) {
double hot_point = (anly_data[i].total_qps - table_qps_avg) / standard_deviation;
// perf_counter->set can only be unsigned __int64
// use ceil to guarantee conversion results
hot_point = ceil(std::max(hot_point, double(0)));
_hot_points[i]->set(hot_point);
}
}

} // namespace server
} // namespace pegasus
54 changes: 54 additions & 0 deletions src/server/hotspot_partition_calculator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "hotspot_partition_data.h"
#include <gtest/gtest_prod.h>
#include <dsn/perf_counter/perf_counter.h>

namespace pegasus {
namespace server {

// hotspot_partition_calculator is used to find the hot partition in a table.
class hotspot_partition_calculator
{
public:
hotspot_partition_calculator(const std::string &app_name, int partition_count)
: _app_name(app_name), _hot_points(partition_count)
{
init_perf_counter(partition_count);
}
// aggregate related data of hotspot detection
void data_aggregate(const std::vector<row_data> &partitions);
// analyse the saved data to find hotspot partition
void data_analyse();

private:
const std::string _app_name;

void init_perf_counter(int perf_counter_count);
// usually a partition with "hot-point value" >= 3 can be considered as a hotspot partition.
std::vector<dsn::perf_counter_wrapper> _hot_points;
// saving historical data can improve accuracy
std::queue<std::vector<hotspot_partition_data>> _partition_stat_histories;

FRIEND_TEST(hotspot_partition_calculator, hotspot_partition_policy);
};

} // namespace server
} // namespace pegasus
9 changes: 2 additions & 7 deletions src/server/hotspot_partition_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,16 @@

#pragma once

#include "shell/commands.h"
#include "shell/command_helper.h"

namespace pegasus {
namespace server {

struct hotspot_partition_data
{
hotspot_partition_data(const row_data &row)
: total_qps(row.get_total_qps()),
total_cu(row.get_total_cu()),
partition_name(row.row_name){};
hotspot_partition_data(const row_data &row) : total_qps(row.get_total_qps()){};
hotspot_partition_data() {}
double total_qps;
double total_cu;
std::string partition_name;
};

} // namespace server
Expand Down
39 changes: 10 additions & 29 deletions src/server/info_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "base/pegasus_const.h"
#include "result_writer.h"
#include "hotspot_partition_calculator.h"

using namespace ::dsn;
using namespace ::dsn::replication;
Expand Down Expand Up @@ -78,10 +79,6 @@ info_collector::info_collector()
"storage_size_fetch_interval_seconds",
3600, // default value 1h
"storage size fetch interval seconds");
_hotspot_detect_algorithm = dsn_config_get_value_string("pegasus.collector",
"hotspot_detect_algorithm",
"hotspot_algo_qps_variance",
"hotspot_detect_algorithm");
// _storage_size_retry_wait_seconds is in range of [1, 60]
_storage_size_retry_wait_seconds =
std::min(60u, std::max(1u, _storage_size_fetch_interval_seconds / 10));
Expand All @@ -96,9 +93,6 @@ info_collector::~info_collector()
for (auto kv : _app_stat_counters) {
delete kv.second;
}
for (auto store : _hotspot_calculator_store) {
delete store.second;
}
}

void info_collector::start()
Expand Down Expand Up @@ -150,15 +144,11 @@ void info_collector::on_app_stat()
// get row data statistics for all of the apps
all_stats.merge(app_stats);

// hotspot_calculator is to detect hotspots
hotspot_calculator *hotspot_calculator =
// hotspot_partition_calculator is used for detecting hotspots
auto hotspot_partition_calculator =
get_hotspot_calculator(app_rows.first, app_rows.second.size());
if (!hotspot_calculator) {
continue;
}
hotspot_calculator->aggregate(app_rows.second);
// new policy can be designed by strategy pattern in hotspot_partition_data.h
hotspot_calculator->start_alg();
hotspot_partition_calculator->data_aggregate(app_rows.second);
hotspot_partition_calculator->data_analyse();
}
get_app_counters(all_stats.app_name)->set(all_stats);

Expand Down Expand Up @@ -302,25 +292,16 @@ void info_collector::on_storage_size_stat(int remaining_retry_count)
_result_writer->set_result(st_stat.timestamp, "ss", st_stat.dump_to_json());
}

hotspot_calculator *info_collector::get_hotspot_calculator(const std::string &app_name,
const int partition_num)
std::shared_ptr<hotspot_partition_calculator>
info_collector::get_hotspot_calculator(const std::string &app_name, const int partition_count)
{
// use appname+partition_num as a key can prevent the impact of dynamic partition changes
std::string app_name_pcount = fmt::format("{}.{}", app_name, partition_num);
// use app_name+partition_count as a key can prevent the impact of dynamic partition changes
std::string app_name_pcount = fmt::format("{}.{}", app_name, partition_count);
auto iter = _hotspot_calculator_store.find(app_name_pcount);
if (iter != _hotspot_calculator_store.end()) {
return iter->second;
}
std::unique_ptr<hotspot_policy> policy;
if (_hotspot_detect_algorithm == "hotspot_algo_qps_variance") {
policy.reset(new hotspot_algo_qps_variance());
} else {
dwarn("hotspot detection is disabled");
_hotspot_calculator_store[app_name_pcount] = nullptr;
return nullptr;
}
hotspot_calculator *calculator =
new hotspot_calculator(app_name, partition_num, std::move(policy));
auto calculator = std::make_shared<hotspot_partition_calculator>(app_name, partition_count);
_hotspot_calculator_store[app_name_pcount] = calculator;
return calculator;
}
Expand Down
13 changes: 7 additions & 6 deletions src/server/info_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

#include "../shell/commands.h"
#include "table_stats.h"
#include "table_hotspot_policy.h"

namespace pegasus {
namespace server {

class result_writer;
class hotspot_partition_calculator;

class info_collector
{
Expand Down Expand Up @@ -177,15 +177,16 @@ class info_collector
uint32_t _storage_size_fetch_interval_seconds;
uint32_t _storage_size_retry_wait_seconds;
uint32_t _storage_size_retry_max_count;
std::string _hotspot_detect_algorithm;
::dsn::task_ptr _storage_size_stat_timer_task;
::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock;
// mapping 'node address' --> 'last updated timestamp'
std::map<std::string, string> _capacity_unit_update_info;
std::map<std::string, hotspot_calculator *> _hotspot_calculator_store;

hotspot_calculator *get_hotspot_calculator(const std::string &app_name,
const int partition_num);
// _hotspot_calculator_store is to save hotspot_partition_calculator for each table, a
// hotspot_partition_calculator saves historical hotspot data and alert perf_counters of
// corresponding table
std::map<std::string, std::shared_ptr<hotspot_partition_calculator>> _hotspot_calculator_store;
std::shared_ptr<hotspot_partition_calculator>
get_hotspot_calculator(const std::string &app_name, const int partition_count);
};

} // namespace server
Expand Down
40 changes: 0 additions & 40 deletions src/server/table_hotspot_policy.cpp

This file was deleted.

Loading

0 comments on commit b057848

Please sign in to comment.