From d2485ff18108d14ad85e1d7f08011e629aa09077 Mon Sep 17 00:00:00 2001 From: von gosling Date: Thu, 10 Sep 2020 16:08:09 +0800 Subject: [PATCH 1/2] refactor: update license statement (#600) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 074bea0ec2..9ceb3de145 100644 --- a/README.md +++ b/README.md @@ -79,5 +79,5 @@ Data import/export tools: ## License -Copyright 2015-now Xiaomi, Inc. Licensed under the Apache License, Version 2.0: +Copyright 2020 The Apache Software Foundation. Licensed under the Apache License, Version 2.0: From b05784823e5887babd3489fc4ef9c7d7d5d8257c Mon Sep 17 00:00:00 2001 From: Smilencer <527646889@qq.com> Date: Thu, 10 Sep 2020 17:59:48 +0800 Subject: [PATCH 2/2] refactor(collector): sort out the structure of partition hotspot detection (#597) --- src/server/hotspot_partition_calculator.cpp | 102 ++++++++++++++++++ src/server/hotspot_partition_calculator.h | 54 ++++++++++ src/server/hotspot_partition_data.h | 9 +- src/server/info_collector.cpp | 39 ++----- src/server/info_collector.h | 13 +-- src/server/table_hotspot_policy.cpp | 40 ------- src/server/table_hotspot_policy.h | 100 ----------------- src/server/test/CMakeLists.txt | 2 +- src/server/test/hotspot_partition_test.cpp | 49 +++++++++ src/server/test/pegasus_tablehotspot_test.cpp | 36 ------- 10 files changed, 225 insertions(+), 219 deletions(-) create mode 100644 src/server/hotspot_partition_calculator.cpp create mode 100644 src/server/hotspot_partition_calculator.h delete mode 100644 src/server/table_hotspot_policy.cpp delete mode 100644 src/server/table_hotspot_policy.h create mode 100644 src/server/test/hotspot_partition_test.cpp delete mode 100644 src/server/test/pegasus_tablehotspot_test.cpp diff --git a/src/server/hotspot_partition_calculator.cpp b/src/server/hotspot_partition_calculator.cpp new file mode 100644 index 0000000000..865814e750 --- /dev/null +++ b/src/server/hotspot_partition_calculator.cpp @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "hotspot_partition_calculator.h" + +#include +#include +#include +#include + +namespace pegasus { +namespace server { + +DSN_DEFINE_int64("pegasus.collector", + max_hotspot_store_size, + 100, + "the max count of historical data " + "stored in calculator, The FIFO " + "queue design is used to " + "eliminate outdated historical " + "data"); + +void hotspot_partition_calculator::data_aggregate(const std::vector &partitions) +{ + while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 1) { + _partition_stat_histories.pop(); + } + std::vector temp(partitions.size()); + // TODO refactor the data structure + for (int i = 0; i < partitions.size(); i++) { + temp[i] = std::move(hotspot_partition_data(partitions[i])); + } + _partition_stat_histories.emplace(temp); +} + +void hotspot_partition_calculator::init_perf_counter(int partition_count) +{ + std::string counter_name; + std::string counter_desc; + for (int i = 0; i < partition_count; i++) { + string partition_desc = _app_name + '.' + std::to_string(i); + counter_name = fmt::format("app.stat.hotspots@{}", partition_desc); + counter_desc = fmt::format("statistic the hotspots of app {}", partition_desc); + _hot_points[i].init_app_counter( + "app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str()); + } +} + +void hotspot_partition_calculator::data_analyse() +{ + dassert(_partition_stat_histories.back().size() == _hot_points.size(), + "partition counts error, please check"); + std::vector data_samples; + data_samples.reserve(_partition_stat_histories.size() * _hot_points.size()); + auto temp_data = _partition_stat_histories; + double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0; + int sample_count = 0; + while (!temp_data.empty()) { + for (const auto &partition_data : temp_data.front()) { + if (partition_data.total_qps - 1.00 > 0) { + data_samples.push_back(partition_data.total_qps); + table_qps_sum += partition_data.total_qps; + sample_count++; + } + } + temp_data.pop(); + } + if (sample_count == 0) { + ddebug("_partition_stat_histories size == 0"); + return; + } + table_qps_avg = table_qps_sum / sample_count; + for (const auto &data_sample : data_samples) { + standard_deviation += pow((data_sample - table_qps_avg), 2); + } + standard_deviation = sqrt(standard_deviation / sample_count); + const auto &anly_data = _partition_stat_histories.back(); + for (int i = 0; i < _hot_points.size(); i++) { + double hot_point = (anly_data[i].total_qps - table_qps_avg) / standard_deviation; + // perf_counter->set can only be unsigned __int64 + // use ceil to guarantee conversion results + hot_point = ceil(std::max(hot_point, double(0))); + _hot_points[i]->set(hot_point); + } +} + +} // namespace server +} // namespace pegasus diff --git a/src/server/hotspot_partition_calculator.h b/src/server/hotspot_partition_calculator.h new file mode 100644 index 0000000000..c950ebe82f --- /dev/null +++ b/src/server/hotspot_partition_calculator.h @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "hotspot_partition_data.h" +#include +#include + +namespace pegasus { +namespace server { + +// hotspot_partition_calculator is used to find the hot partition in a table. +class hotspot_partition_calculator +{ +public: + hotspot_partition_calculator(const std::string &app_name, int partition_count) + : _app_name(app_name), _hot_points(partition_count) + { + init_perf_counter(partition_count); + } + // aggregate related data of hotspot detection + void data_aggregate(const std::vector &partitions); + // analyse the saved data to find hotspot partition + void data_analyse(); + +private: + const std::string _app_name; + + void init_perf_counter(int perf_counter_count); + // usually a partition with "hot-point value" >= 3 can be considered as a hotspot partition. + std::vector _hot_points; + // saving historical data can improve accuracy + std::queue> _partition_stat_histories; + + FRIEND_TEST(hotspot_partition_calculator, hotspot_partition_policy); +}; + +} // namespace server +} // namespace pegasus diff --git a/src/server/hotspot_partition_data.h b/src/server/hotspot_partition_data.h index 2f97c9ba0a..b41b1b1acb 100644 --- a/src/server/hotspot_partition_data.h +++ b/src/server/hotspot_partition_data.h @@ -4,21 +4,16 @@ #pragma once -#include "shell/commands.h" +#include "shell/command_helper.h" namespace pegasus { namespace server { struct hotspot_partition_data { - hotspot_partition_data(const row_data &row) - : total_qps(row.get_total_qps()), - total_cu(row.get_total_cu()), - partition_name(row.row_name){}; + hotspot_partition_data(const row_data &row) : total_qps(row.get_total_qps()){}; hotspot_partition_data() {} double total_qps; - double total_cu; - std::string partition_name; }; } // namespace server diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp index 6caea4981a..0ce2539f47 100644 --- a/src/server/info_collector.cpp +++ b/src/server/info_collector.cpp @@ -14,6 +14,7 @@ #include "base/pegasus_const.h" #include "result_writer.h" +#include "hotspot_partition_calculator.h" using namespace ::dsn; using namespace ::dsn::replication; @@ -78,10 +79,6 @@ info_collector::info_collector() "storage_size_fetch_interval_seconds", 3600, // default value 1h "storage size fetch interval seconds"); - _hotspot_detect_algorithm = dsn_config_get_value_string("pegasus.collector", - "hotspot_detect_algorithm", - "hotspot_algo_qps_variance", - "hotspot_detect_algorithm"); // _storage_size_retry_wait_seconds is in range of [1, 60] _storage_size_retry_wait_seconds = std::min(60u, std::max(1u, _storage_size_fetch_interval_seconds / 10)); @@ -96,9 +93,6 @@ info_collector::~info_collector() for (auto kv : _app_stat_counters) { delete kv.second; } - for (auto store : _hotspot_calculator_store) { - delete store.second; - } } void info_collector::start() @@ -150,15 +144,11 @@ void info_collector::on_app_stat() // get row data statistics for all of the apps all_stats.merge(app_stats); - // hotspot_calculator is to detect hotspots - hotspot_calculator *hotspot_calculator = + // hotspot_partition_calculator is used for detecting hotspots + auto hotspot_partition_calculator = get_hotspot_calculator(app_rows.first, app_rows.second.size()); - if (!hotspot_calculator) { - continue; - } - hotspot_calculator->aggregate(app_rows.second); - // new policy can be designed by strategy pattern in hotspot_partition_data.h - hotspot_calculator->start_alg(); + hotspot_partition_calculator->data_aggregate(app_rows.second); + hotspot_partition_calculator->data_analyse(); } get_app_counters(all_stats.app_name)->set(all_stats); @@ -302,25 +292,16 @@ void info_collector::on_storage_size_stat(int remaining_retry_count) _result_writer->set_result(st_stat.timestamp, "ss", st_stat.dump_to_json()); } -hotspot_calculator *info_collector::get_hotspot_calculator(const std::string &app_name, - const int partition_num) +std::shared_ptr +info_collector::get_hotspot_calculator(const std::string &app_name, const int partition_count) { - // use appname+partition_num as a key can prevent the impact of dynamic partition changes - std::string app_name_pcount = fmt::format("{}.{}", app_name, partition_num); + // use app_name+partition_count as a key can prevent the impact of dynamic partition changes + std::string app_name_pcount = fmt::format("{}.{}", app_name, partition_count); auto iter = _hotspot_calculator_store.find(app_name_pcount); if (iter != _hotspot_calculator_store.end()) { return iter->second; } - std::unique_ptr policy; - if (_hotspot_detect_algorithm == "hotspot_algo_qps_variance") { - policy.reset(new hotspot_algo_qps_variance()); - } else { - dwarn("hotspot detection is disabled"); - _hotspot_calculator_store[app_name_pcount] = nullptr; - return nullptr; - } - hotspot_calculator *calculator = - new hotspot_calculator(app_name, partition_num, std::move(policy)); + auto calculator = std::make_shared(app_name, partition_count); _hotspot_calculator_store[app_name_pcount] = calculator; return calculator; } diff --git a/src/server/info_collector.h b/src/server/info_collector.h index 05d9929115..adc3cd96be 100644 --- a/src/server/info_collector.h +++ b/src/server/info_collector.h @@ -19,12 +19,12 @@ #include "../shell/commands.h" #include "table_stats.h" -#include "table_hotspot_policy.h" namespace pegasus { namespace server { class result_writer; +class hotspot_partition_calculator; class info_collector { @@ -177,15 +177,16 @@ class info_collector uint32_t _storage_size_fetch_interval_seconds; uint32_t _storage_size_retry_wait_seconds; uint32_t _storage_size_retry_max_count; - std::string _hotspot_detect_algorithm; ::dsn::task_ptr _storage_size_stat_timer_task; ::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock; // mapping 'node address' --> 'last updated timestamp' std::map _capacity_unit_update_info; - std::map _hotspot_calculator_store; - - hotspot_calculator *get_hotspot_calculator(const std::string &app_name, - const int partition_num); + // _hotspot_calculator_store is to save hotspot_partition_calculator for each table, a + // hotspot_partition_calculator saves historical hotspot data and alert perf_counters of + // corresponding table + std::map> _hotspot_calculator_store; + std::shared_ptr + get_hotspot_calculator(const std::string &app_name, const int partition_count); }; } // namespace server diff --git a/src/server/table_hotspot_policy.cpp b/src/server/table_hotspot_policy.cpp deleted file mode 100644 index ce9cd8d931..0000000000 --- a/src/server/table_hotspot_policy.cpp +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. -// This source code is licensed under the Apache License Version 2.0, which -// can be found in the LICENSE file in the root directory of this source tree. - -#include "table_hotspot_policy.h" - -#include - -namespace pegasus { -namespace server { - -void hotspot_calculator::aggregate(const std::vector &partitions) -{ - while (_app_data.size() > kMaxQueueSize - 1) { - _app_data.pop(); - } - std::vector temp(partitions.size()); - for (int i = 0; i < partitions.size(); i++) { - temp[i] = std::move(hotspot_partition_data(partitions[i])); - } - _app_data.emplace(temp); -} - -void hotspot_calculator::init_perf_counter(const int perf_counter_count) -{ - std::string counter_name; - std::string counter_desc; - for (int i = 0; i < perf_counter_count; i++) { - string paritition_desc = _app_name + '.' + std::to_string(i); - counter_name = fmt::format("app.stat.hotspots@{}", paritition_desc); - counter_desc = fmt::format("statistic the hotspots of app {}", paritition_desc); - _points[i].init_app_counter( - "app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str()); - } -} - -void hotspot_calculator::start_alg() { _policy->analysis(_app_data, _points); } - -} // namespace server -} // namespace pegasus diff --git a/src/server/table_hotspot_policy.h b/src/server/table_hotspot_policy.h deleted file mode 100644 index 62b7eb4a3e..0000000000 --- a/src/server/table_hotspot_policy.h +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. -// This source code is licensed under the Apache License Version 2.0, which -// can be found in the LICENSE file in the root directory of this source tree. - -#pragma once - -#include "hotspot_partition_data.h" - -#include -#include -#include - -#include - -namespace pegasus { -namespace server { -class hotspot_policy -{ -public: - // hotspot_app_data store the historical data which related to hotspot - // it uses rolling queue to save one app's data - // vector is used to save the partitions' data of this app - // hotspot_partition_data is used to save data of one partition - virtual void analysis(const std::queue> &hotspot_app_data, - std::vector<::dsn::perf_counter_wrapper> &perf_counters) = 0; -}; - -// PauTa Criterion -class hotspot_algo_qps_variance : public hotspot_policy -{ -public: - void analysis(const std::queue> &hotspot_app_data, - std::vector<::dsn::perf_counter_wrapper> &perf_counters) - { - dassert(hotspot_app_data.back().size() == perf_counters.size(), - "partition counts error, please check"); - std::vector data_samples; - data_samples.reserve(hotspot_app_data.size() * perf_counters.size()); - auto temp_data = hotspot_app_data; - double total = 0, sd = 0, avg = 0; - int sample_count = 0; - // avg: Average number - // sd: Standard deviation - // sample_count: Number of samples - while (!temp_data.empty()) { - for (auto partition_data : temp_data.front()) { - if (partition_data.total_qps - 1.00 > 0) { - data_samples.push_back(partition_data.total_qps); - total += partition_data.total_qps; - sample_count++; - } - } - temp_data.pop(); - } - if (sample_count == 0) { - ddebug("hotspot_app_data size == 0"); - return; - } - avg = total / sample_count; - for (auto data_sample : data_samples) { - sd += pow((data_sample - avg), 2); - } - sd = sqrt(sd / sample_count); - const auto &anly_data = hotspot_app_data.back(); - for (int i = 0; i < perf_counters.size(); i++) { - double hot_point = (anly_data[i].total_qps - avg) / sd; - // perf_counter->set can only be unsigned __int64 - // use ceil to guarantee conversion results - hot_point = ceil(std::max(hot_point, double(0))); - perf_counters[i]->set(hot_point); - } - } -}; - -// hotspot_calculator is used to find the hotspot in Pegasus -class hotspot_calculator -{ -public: - hotspot_calculator(const std::string &app_name, - const int partition_num, - std::unique_ptr policy) - : _app_name(app_name), _points(partition_num), _policy(std::move(policy)) - { - init_perf_counter(partition_num); - } - void aggregate(const std::vector &partitions); - void start_alg(); - void init_perf_counter(const int perf_counter_count); - -private: - const std::string _app_name; - std::vector<::dsn::perf_counter_wrapper> _points; - std::queue> _app_data; - std::unique_ptr _policy; - static const int kMaxQueueSize = 100; - - FRIEND_TEST(table_hotspot_policy, hotspot_algo_qps_variance); -}; -} // namespace server -} // namespace pegasus diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt index 7e70add427..0184dbf115 100644 --- a/src/server/test/CMakeLists.txt +++ b/src/server/test/CMakeLists.txt @@ -8,7 +8,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp" "../pegasus_server_write.cpp" "../capacity_unit_calculator.cpp" "../pegasus_mutation_duplicator.cpp" - "../table_hotspot_policy.cpp" + "../hotspot_partition_calculator.cpp" "../meta_store.cpp" ) diff --git a/src/server/test/hotspot_partition_test.cpp b/src/server/test/hotspot_partition_test.cpp new file mode 100644 index 0000000000..732a129b63 --- /dev/null +++ b/src/server/test/hotspot_partition_test.cpp @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "server/hotspot_partition_calculator.h" + +#include + +namespace pegasus { +namespace server { + +TEST(hotspot_partition_calculator, hotspot_partition_policy) +{ + // TODO: refactor the unit test + std::vector test_rows(8); + test_rows[0].get_qps = 1000.0; + test_rows[1].get_qps = 1000.0; + test_rows[2].get_qps = 1000.0; + test_rows[3].get_qps = 1000.0; + test_rows[4].get_qps = 1000.0; + test_rows[5].get_qps = 1000.0; + test_rows[6].get_qps = 1000.0; + test_rows[7].get_qps = 5000.0; + hotspot_partition_calculator test_hotspot_calculator("TEST", 8); + test_hotspot_calculator.data_aggregate(test_rows); + test_hotspot_calculator.data_analyse(); + std::vector result(8); + for (int i = 0; i < test_hotspot_calculator._hot_points.size(); i++) { + result[i] = test_hotspot_calculator._hot_points[i]->get_value(); + } + std::vector expect_vector{0, 0, 0, 0, 0, 0, 0, 3}; + ASSERT_EQ(expect_vector, result); +} + +} // namespace server +} // namespace pegasus diff --git a/src/server/test/pegasus_tablehotspot_test.cpp b/src/server/test/pegasus_tablehotspot_test.cpp deleted file mode 100644 index 63116b2237..0000000000 --- a/src/server/test/pegasus_tablehotspot_test.cpp +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. -// This source code is licensed under the Apache License Version 2.0, which -// can be found in the LICENSE file in the root directory of this source tree. - -#include "server/table_hotspot_policy.h" - -#include - -namespace pegasus { -namespace server { - -TEST(table_hotspot_policy, hotspot_algo_qps_variance) -{ - std::vector test_rows(8); - test_rows[0].get_qps = 1000.0; - test_rows[1].get_qps = 1000.0; - test_rows[2].get_qps = 1000.0; - test_rows[3].get_qps = 1000.0; - test_rows[4].get_qps = 1000.0; - test_rows[5].get_qps = 1000.0; - test_rows[6].get_qps = 1000.0; - test_rows[7].get_qps = 5000.0; - std::unique_ptr policy(new hotspot_algo_qps_variance()); - hotspot_calculator test_hotspot_calculator("TEST", 8, std::move(policy)); - test_hotspot_calculator.aggregate(test_rows); - test_hotspot_calculator.start_alg(); - std::vector result(8); - for (int i = 0; i < test_hotspot_calculator._points.size(); i++) { - result[i] = test_hotspot_calculator._points[i]->get_value(); - } - std::vector expect_vector{0, 0, 0, 0, 0, 0, 0, 3}; - ASSERT_EQ(expect_vector, result); -} - -} // namespace server -} // namespace pegasus