diff --git a/rdsn b/rdsn index 0941e768a0..0db7b36840 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 0941e768a07b9e18f5eeea606eeb57a24071a87c +Subproject commit 0db7b368409f96cfd468047ae75962741eabde61 diff --git a/src/server/hotspot_partition_data.h b/src/server/hotspot_partition_data.h new file mode 100644 index 0000000000..2f97c9ba0a --- /dev/null +++ b/src/server/hotspot_partition_data.h @@ -0,0 +1,25 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include "shell/commands.h" + +namespace pegasus { +namespace server { + +struct hotspot_partition_data +{ + hotspot_partition_data(const row_data &row) + : total_qps(row.get_total_qps()), + total_cu(row.get_total_cu()), + partition_name(row.row_name){}; + hotspot_partition_data() {} + double total_qps; + double total_cu; + std::string partition_name; +}; + +} // namespace server +} // namespace pegasus diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp index f13360f4ff..aa91b7988d 100644 --- a/src/server/info_collector.cpp +++ b/src/server/info_collector.cpp @@ -91,6 +91,9 @@ info_collector::~info_collector() for (auto kv : _app_stat_counters) { delete kv.second; } + for (auto store : _hotspot_calculator_store) { + delete store.second; + } } void info_collector::start() @@ -125,100 +128,46 @@ void info_collector::stop() { _tracker.cancel_outstanding_tasks(); } void info_collector::on_app_stat() { ddebug("start to stat apps"); - std::vector rows; - if (!get_app_stat(&_shell_context, "", rows)) { + std::map> all_rows; + if (!get_app_partition_stat(&_shell_context, all_rows)) { derror("call get_app_stat() failed"); return; } - std::vector read_qps; - std::vector write_qps; - rows.resize(rows.size() + 1); - read_qps.resize(rows.size()); - write_qps.resize(rows.size()); - row_data &all = rows.back(); - all.row_name = "_all_"; - for (int i = 0; i < rows.size() - 1; ++i) { - row_data &row = rows[i]; - all.get_qps += row.get_qps; - all.multi_get_qps += row.multi_get_qps; - all.put_qps += row.put_qps; - all.multi_put_qps += row.multi_put_qps; - all.remove_qps += row.remove_qps; - all.multi_remove_qps += row.multi_remove_qps; - all.incr_qps += row.incr_qps; - all.check_and_set_qps += row.check_and_set_qps; - all.check_and_mutate_qps += row.check_and_mutate_qps; - all.scan_qps += row.scan_qps; - all.recent_read_cu += row.recent_read_cu; - all.recent_write_cu += row.recent_write_cu; - all.recent_expire_count += row.recent_expire_count; - all.recent_filter_count += row.recent_filter_count; - all.recent_abnormal_count += row.recent_abnormal_count; - all.recent_write_throttling_delay_count += row.recent_write_throttling_delay_count; - all.recent_write_throttling_reject_count += row.recent_write_throttling_reject_count; - all.storage_mb += row.storage_mb; - all.storage_count += row.storage_count; - all.rdb_block_cache_hit_count += row.rdb_block_cache_hit_count; - all.rdb_block_cache_total_count += row.rdb_block_cache_total_count; - all.rdb_index_and_filter_blocks_mem_usage += row.rdb_index_and_filter_blocks_mem_usage; - all.rdb_memtable_mem_usage += row.rdb_memtable_mem_usage; - all.rdb_estimate_num_keys += row.rdb_estimate_num_keys; - read_qps[i] = row.get_qps + row.multi_get_qps + row.scan_qps; - write_qps[i] = row.put_qps + row.multi_put_qps + row.remove_qps + row.multi_remove_qps + - row.incr_qps + row.check_and_set_qps + row.check_and_mutate_qps; - } - read_qps[read_qps.size() - 1] = all.get_qps + all.multi_get_qps + all.scan_qps; - write_qps[read_qps.size() - 1] = all.put_qps + all.multi_put_qps + all.remove_qps + - all.multi_remove_qps + all.incr_qps + all.check_and_set_qps + - all.check_and_mutate_qps; - for (int i = 0; i < rows.size(); ++i) { - row_data &row = rows[i]; - AppStatCounters *counters = get_app_counters(row.row_name); - counters->get_qps->set(row.get_qps); - counters->multi_get_qps->set(row.multi_get_qps); - counters->put_qps->set(row.put_qps); - counters->multi_put_qps->set(row.multi_put_qps); - counters->remove_qps->set(row.remove_qps); - counters->multi_remove_qps->set(row.multi_remove_qps); - counters->incr_qps->set(row.incr_qps); - counters->check_and_set_qps->set(row.check_and_set_qps); - counters->check_and_mutate_qps->set(row.check_and_mutate_qps); - counters->scan_qps->set(row.scan_qps); - counters->recent_read_cu->set(row.recent_read_cu); - counters->recent_write_cu->set(row.recent_write_cu); - counters->recent_expire_count->set(row.recent_expire_count); - counters->recent_filter_count->set(row.recent_filter_count); - counters->recent_abnormal_count->set(row.recent_abnormal_count); - counters->recent_write_throttling_delay_count->set(row.recent_write_throttling_delay_count); - counters->recent_write_throttling_reject_count->set( - row.recent_write_throttling_reject_count); - counters->storage_mb->set(row.storage_mb); - counters->storage_count->set(row.storage_count); - counters->rdb_block_cache_hit_rate->set( - std::abs(row.rdb_block_cache_total_count) < 1e-6 - ? 0 - : row.rdb_block_cache_hit_count / row.rdb_block_cache_total_count * 1000000); - counters->rdb_index_and_filter_blocks_mem_usage->set( - row.rdb_index_and_filter_blocks_mem_usage); - counters->rdb_memtable_mem_usage->set(row.rdb_memtable_mem_usage); - counters->rdb_estimate_num_keys->set(row.rdb_estimate_num_keys); - counters->read_qps->set(read_qps[i]); - counters->write_qps->set(write_qps[i]); + + table_stats all_stats("_all_"); + for (const auto &app_rows : all_rows) { + // get statistics data for app + table_stats app_stats(app_rows.first); + for (auto partition_row : app_rows.second) { + app_stats.aggregate(partition_row); + } + get_app_counters(app_stats.app_name)->set(app_stats); + // get row data statistics for all of the apps + all_stats.merge(app_stats); + + // hotspot_calculator is to detect hotspots + hotspot_calculator *hotspot_calculator = + get_hotspot_calculator(app_rows.first, app_rows.second.size()); + hotspot_calculator->aggregate(app_rows.second); + // new policy can be designed by strategy pattern in hotspot_partition_data.h + hotspot_calculator->start_alg(); } + get_app_counters(all_stats.app_name)->set(all_stats); + ddebug("stat apps succeed, app_count = %d, total_read_qps = %.2f, total_write_qps = %.2f", - (int)(rows.size() - 1), - read_qps[read_qps.size() - 1], - write_qps[read_qps.size() - 1]); + (int)(all_rows.size() - 1), + all_stats.get_total_read_qps(), + all_stats.get_total_write_qps()); } -info_collector::AppStatCounters *info_collector::get_app_counters(const std::string &app_name) +info_collector::app_stat_counters *info_collector::get_app_counters(const std::string &app_name) { ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_app_stat_counter_lock); auto find = _app_stat_counters.find(app_name); if (find != _app_stat_counters.end()) { return find->second; } - AppStatCounters *counters = new AppStatCounters(); + app_stat_counters *counters = new app_stat_counters(); char counter_name[1024]; char counter_desc[1024]; @@ -329,5 +278,17 @@ void info_collector::on_storage_size_stat(int remaining_retry_count) _result_writer->set_result(st_stat.timestamp, "ss", st_stat.dump_to_json()); } +hotspot_calculator *info_collector::get_hotspot_calculator(const std::string &app_name, + const int partition_num) +{ + auto iter = _hotspot_calculator_store.find(app_name); + if (iter != _hotspot_calculator_store.end()) { + return iter->second; + } + hotspot_calculator *calculator_address = new hotspot_calculator(app_name, partition_num); + _hotspot_calculator_store[app_name] = calculator_address; + return calculator_address; +} + } // namespace server } // namespace pegasus diff --git a/src/server/info_collector.h b/src/server/info_collector.h index da4144c3a8..e06d121dd0 100644 --- a/src/server/info_collector.h +++ b/src/server/info_collector.h @@ -16,9 +16,10 @@ #include #include #include -#include #include "../shell/commands.h" +#include "table_stats.h" +#include "table_hotspot_policy.h" namespace pegasus { namespace server { @@ -28,8 +29,44 @@ class result_writer; class info_collector { public: - struct AppStatCounters + struct app_stat_counters { + void set(const table_stats &row_stats) + { + get_qps->set(row_stats.total_get_qps); + multi_get_qps->set(row_stats.total_multi_get_qps); + put_qps->set(row_stats.total_put_qps); + multi_put_qps->set(row_stats.total_multi_put_qps); + remove_qps->set(row_stats.total_remove_qps); + multi_remove_qps->set(row_stats.total_multi_remove_qps); + incr_qps->set(row_stats.total_incr_qps); + check_and_set_qps->set(row_stats.total_check_and_set_qps); + check_and_mutate_qps->set(row_stats.total_check_and_mutate_qps); + scan_qps->set(row_stats.total_scan_qps); + recent_read_cu->set(row_stats.total_recent_read_cu); + recent_write_cu->set(row_stats.total_recent_write_cu); + recent_expire_count->set(row_stats.total_recent_expire_count); + recent_filter_count->set(row_stats.total_recent_filter_count); + recent_abnormal_count->set(row_stats.total_recent_abnormal_count); + recent_write_throttling_delay_count->set( + row_stats.total_recent_write_throttling_delay_count); + recent_write_throttling_reject_count->set( + row_stats.total_recent_write_throttling_reject_count); + storage_mb->set(row_stats.total_storage_mb); + storage_count->set(row_stats.total_storage_count); + rdb_block_cache_hit_rate->set( + std::abs(row_stats.total_rdb_block_cache_total_count) < 1e-6 + ? 0 + : row_stats.total_rdb_block_cache_hit_count / + row_stats.total_rdb_block_cache_total_count * 1000000); + rdb_index_and_filter_blocks_mem_usage->set( + row_stats.total_rdb_index_and_filter_blocks_mem_usage); + rdb_memtable_mem_usage->set(row_stats.total_rdb_memtable_mem_usage); + rdb_estimate_num_keys->set(row_stats.total_rdb_estimate_num_keys); + read_qps->set(row_stats.get_total_read_qps()); + write_qps->set(row_stats.get_total_write_qps()); + } + ::dsn::perf_counter_wrapper get_qps; ::dsn::perf_counter_wrapper multi_get_qps; ::dsn::perf_counter_wrapper put_qps; @@ -65,7 +102,7 @@ class info_collector void stop(); void on_app_stat(); - AppStatCounters *get_app_counters(const std::string &app_name); + app_stat_counters *get_app_counters(const std::string &app_name); void on_capacity_unit_stat(int remaining_retry_count); bool has_capacity_unit_updated(const std::string &node_address, const std::string ×tamp); @@ -80,7 +117,7 @@ class info_collector uint32_t _app_stat_interval_seconds; ::dsn::task_ptr _app_stat_timer_task; ::dsn::utils::ex_lock_nr _app_stat_counter_lock; - std::map _app_stat_counters; + std::map _app_stat_counters; // app for recording usage statistics, including read/write capacity unit and storage size. std::string _usage_stat_app; @@ -99,6 +136,11 @@ class info_collector ::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock; // mapping 'node address' --> 'last updated timestamp' std::map _capacity_unit_update_info; + std::map _hotspot_calculator_store; + + hotspot_calculator *get_hotspot_calculator(const std::string &app_name, + const int partition_num); }; + } // namespace server } // namespace pegasus diff --git a/src/server/table_hotspot_policy.cpp b/src/server/table_hotspot_policy.cpp new file mode 100644 index 0000000000..ce9cd8d931 --- /dev/null +++ b/src/server/table_hotspot_policy.cpp @@ -0,0 +1,40 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "table_hotspot_policy.h" + +#include + +namespace pegasus { +namespace server { + +void hotspot_calculator::aggregate(const std::vector &partitions) +{ + while (_app_data.size() > kMaxQueueSize - 1) { + _app_data.pop(); + } + std::vector temp(partitions.size()); + for (int i = 0; i < partitions.size(); i++) { + temp[i] = std::move(hotspot_partition_data(partitions[i])); + } + _app_data.emplace(temp); +} + +void hotspot_calculator::init_perf_counter(const int perf_counter_count) +{ + std::string counter_name; + std::string counter_desc; + for (int i = 0; i < perf_counter_count; i++) { + string paritition_desc = _app_name + '.' + std::to_string(i); + counter_name = fmt::format("app.stat.hotspots@{}", paritition_desc); + counter_desc = fmt::format("statistic the hotspots of app {}", paritition_desc); + _points[i].init_app_counter( + "app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str()); + } +} + +void hotspot_calculator::start_alg() { _policy->analysis(_app_data, _points); } + +} // namespace server +} // namespace pegasus diff --git a/src/server/table_hotspot_policy.h b/src/server/table_hotspot_policy.h new file mode 100644 index 0000000000..87881ddca0 --- /dev/null +++ b/src/server/table_hotspot_policy.h @@ -0,0 +1,68 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include "hotspot_partition_data.h" + +#include +#include +#include + +namespace pegasus { +namespace server { +class hotspot_policy +{ +public: + // hotspot_app_data store the historical data which related to hotspot + // it uses rolling queue to save one app's data + // vector is used to save the partitions' data of this app + // hotspot_partition_data is used to save data of one partition + virtual void analysis(const std::queue> &hotspot_app_data, + std::vector<::dsn::perf_counter_wrapper> &hot_points) = 0; +}; + +class hotspot_algo_qps_skew : public hotspot_policy +{ +public: + void analysis(const std::queue> &hotspot_app_data, + std::vector<::dsn::perf_counter_wrapper> &hot_points) + { + const auto &anly_data = hotspot_app_data.back(); + double min_total_qps = INT_MAX; + for (auto partition_anly_data : anly_data) { + min_total_qps = std::min(min_total_qps, partition_anly_data.total_qps); + } + min_total_qps = std::max(1.0, min_total_qps); + dassert(anly_data.size() == hot_points.size(), "partition counts error, please check"); + for (int i = 0; i < hot_points.size(); i++) { + hot_points[i]->set(anly_data[i].total_qps / min_total_qps); + } + } +}; + +// hotspot_calculator is used to find the hotspot in Pegasus +class hotspot_calculator +{ +public: + hotspot_calculator(const std::string &app_name, const int partition_num) + : _app_name(app_name), _points(partition_num), _policy(new hotspot_algo_qps_skew()) + { + init_perf_counter(partition_num); + } + void aggregate(const std::vector &partitions); + void start_alg(); + void init_perf_counter(const int perf_counter_count); + +private: + const std::string _app_name; + std::vector<::dsn::perf_counter_wrapper> _points; + std::queue> _app_data; + std::unique_ptr _policy; + static const int kMaxQueueSize = 100; + + FRIEND_TEST(table_hotspot_policy, hotspot_algo_qps_skew); +}; +} // namespace server +} // namespace pegasus diff --git a/src/server/table_stats.h b/src/server/table_stats.h new file mode 100644 index 0000000000..6aeeedff7d --- /dev/null +++ b/src/server/table_stats.h @@ -0,0 +1,112 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +struct table_stats +{ + table_stats(const std::string &app_name) : app_name(app_name) {} + + double get_total_read_qps() const + { + return total_get_qps + total_multi_get_qps + total_scan_qps; + } + + double get_total_write_qps() const + { + return total_put_qps + total_multi_put_qps + total_remove_qps + total_multi_remove_qps + + total_incr_qps + total_check_and_set_qps + total_check_and_mutate_qps; + } + + void aggregate(const row_data &row) + { + total_get_qps += row.get_qps; + total_multi_get_qps += row.multi_get_qps; + total_put_qps += row.put_qps; + total_multi_put_qps += row.multi_put_qps; + total_remove_qps += row.remove_qps; + total_multi_remove_qps += row.multi_remove_qps; + total_incr_qps += row.incr_qps; + total_check_and_set_qps += row.check_and_set_qps; + total_check_and_mutate_qps += row.check_and_mutate_qps; + total_scan_qps += row.scan_qps; + total_recent_read_cu += row.recent_read_cu; + total_recent_write_cu += row.recent_write_cu; + total_recent_expire_count += row.recent_expire_count; + total_recent_filter_count += row.recent_filter_count; + total_recent_abnormal_count += row.recent_abnormal_count; + total_recent_write_throttling_delay_count += row.recent_write_throttling_delay_count; + total_recent_write_throttling_reject_count += row.recent_write_throttling_reject_count; + total_storage_mb += row.storage_mb; + total_storage_count += row.storage_count; + total_rdb_block_cache_hit_count += row.rdb_block_cache_hit_count; + total_rdb_block_cache_total_count += row.rdb_block_cache_total_count; + total_rdb_index_and_filter_blocks_mem_usage += row.rdb_index_and_filter_blocks_mem_usage; + total_rdb_memtable_mem_usage += row.rdb_memtable_mem_usage; + total_rdb_estimate_num_keys += row.rdb_estimate_num_keys; + } + + void merge(const table_stats &row_stats) + { + total_get_qps += row_stats.total_get_qps; + total_multi_get_qps += row_stats.total_multi_get_qps; + total_put_qps += row_stats.total_put_qps; + total_multi_put_qps += row_stats.total_multi_put_qps; + total_remove_qps += row_stats.total_remove_qps; + total_multi_remove_qps += row_stats.total_multi_remove_qps; + total_incr_qps += row_stats.total_incr_qps; + total_check_and_set_qps += row_stats.total_check_and_set_qps; + total_check_and_mutate_qps += row_stats.total_check_and_mutate_qps; + total_scan_qps += row_stats.total_scan_qps; + total_recent_read_cu += row_stats.total_recent_read_cu; + total_recent_write_cu += row_stats.total_recent_write_cu; + total_recent_expire_count += row_stats.total_recent_expire_count; + total_recent_filter_count += row_stats.total_recent_filter_count; + total_recent_abnormal_count += row_stats.total_recent_abnormal_count; + total_recent_write_throttling_delay_count += + row_stats.total_recent_write_throttling_delay_count; + total_recent_write_throttling_reject_count += + row_stats.total_recent_write_throttling_reject_count; + total_storage_mb += row_stats.total_storage_mb; + total_storage_count += row_stats.total_storage_count; + total_rdb_block_cache_hit_count += row_stats.total_rdb_block_cache_hit_count; + total_rdb_block_cache_total_count += row_stats.total_rdb_block_cache_total_count; + total_rdb_index_and_filter_blocks_mem_usage += + row_stats.total_rdb_index_and_filter_blocks_mem_usage; + total_rdb_memtable_mem_usage += row_stats.total_rdb_memtable_mem_usage; + total_rdb_estimate_num_keys += row_stats.total_rdb_estimate_num_keys; + } + + std::string app_name; + double total_get_qps = 0; + double total_multi_get_qps = 0; + double total_put_qps = 0; + double total_multi_put_qps = 0; + double total_remove_qps = 0; + double total_multi_remove_qps = 0; + double total_incr_qps = 0; + double total_check_and_set_qps = 0; + double total_check_and_mutate_qps = 0; + double total_scan_qps = 0; + double total_recent_read_cu = 0; + double total_recent_write_cu = 0; + double total_recent_expire_count = 0; + double total_recent_filter_count = 0; + double total_recent_abnormal_count = 0; + double total_recent_write_throttling_delay_count = 0; + double total_recent_write_throttling_reject_count = 0; + double total_storage_mb = 0; + double total_storage_count = 0; + double total_rdb_block_cache_hit_count = 0; + double total_rdb_block_cache_total_count = 0; + double total_rdb_index_and_filter_blocks_mem_usage = 0; + double total_rdb_memtable_mem_usage = 0; + double total_rdb_estimate_num_keys = 0; + double max_total_qps = 0; + double min_total_qps = INT_MAX; + double max_total_cu = 0; + double min_total_cu = INT_MAX; + std::string max_qps_partition_id; + std::string max_cu_partition_id; +}; diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt index df5f5a73f9..be11f104ad 100644 --- a/src/server/test/CMakeLists.txt +++ b/src/server/test/CMakeLists.txt @@ -7,6 +7,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp" "../pegasus_server_write.cpp" "../capacity_unit_calculator.cpp" "../pegasus_mutation_duplicator.cpp" + "../table_hotspot_policy.cpp" ) set(MY_SRC_SEARCH_MODE "GLOB") diff --git a/src/server/test/pegasus_tablehotspot_test.cpp b/src/server/test/pegasus_tablehotspot_test.cpp new file mode 100644 index 0000000000..5cbe6dd015 --- /dev/null +++ b/src/server/test/pegasus_tablehotspot_test.cpp @@ -0,0 +1,29 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "server/table_hotspot_policy.h" + +#include + +namespace pegasus { +namespace server { + +TEST(table_hotspot_policy, hotspot_algo_qps_skew) +{ + std::vector test_rows(2); + test_rows[0].get_qps = 1234.0; + test_rows[1].get_qps = 4321.0; + hotspot_calculator test_hotspot_calculator("TEST", 2); + test_hotspot_calculator.aggregate(test_rows); + test_hotspot_calculator.start_alg(); + std::vector result(2); + for (int i = 0; i < test_hotspot_calculator._points.size(); i++) { + result[i] = test_hotspot_calculator._points[i]->get_value(); + } + std::vector expect_vector{1, 3}; + ASSERT_EQ(expect_vector, result); +} + +} // namespace server +} // namespace pegasus diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index c5669204bd..07a8d15d76 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -512,6 +512,14 @@ inline bool parse_app_pegasus_perf_counter_name(const std::string &name, struct row_data { + double get_total_qps() const + { + return get_qps + multi_get_qps + scan_qps + put_qps + multi_put_qps + remove_qps + + multi_remove_qps + incr_qps + check_and_set_qps + check_and_mutate_qps; + } + + double get_total_cu() const { return recent_read_cu + recent_write_cu; } + std::string row_name; int32_t app_id = 0; int32_t partition_count = 0;