From 20b3cfc726ad790223830348b0fa724cfba66c38 Mon Sep 17 00:00:00 2001 From: tangyanzhao Date: Mon, 12 Oct 2020 16:11:30 +0800 Subject: [PATCH 1/5] merge --- src/server/capacity_unit_calculator.cpp | 74 ++++++++++++++----- src/server/capacity_unit_calculator.h | 23 +++++- src/server/pegasus_server_impl.cpp | 7 +- src/server/pegasus_write_service.cpp | 2 +- .../test/capacity_unit_calculator_test.cpp | 10 ++- 5 files changed, 84 insertions(+), 32 deletions(-) diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp index 62e7dd3c71..0b2f563e4e 100644 --- a/src/server/capacity_unit_calculator.cpp +++ b/src/server/capacity_unit_calculator.cpp @@ -3,14 +3,25 @@ // can be found in the LICENSE file in the root directory of this source tree. #include "capacity_unit_calculator.h" + #include #include +#include "hotkey_collector.h" namespace pegasus { namespace server { -capacity_unit_calculator::capacity_unit_calculator(replica_base *r) : replica_base(r) +capacity_unit_calculator::capacity_unit_calculator( + replica_base *r, + std::shared_ptr read_hotkey_collector, + std::shared_ptr write_hotkey_collector) + : replica_base(r), + _read_hotkey_collector(read_hotkey_collector), + _write_hotkey_collector(write_hotkey_collector) { + dassert(_read_hotkey_collector != nullptr, "read hotkey collector is a nullptr"); + dassert(_write_hotkey_collector != nullptr, "write hotkey collector is a nullptr"); + _read_capacity_unit_size = dsn_config_get_value_uint64("pegasus.server", "perf_counter_read_capacity_unit_size", @@ -70,6 +81,26 @@ capacity_unit_calculator::capacity_unit_calculator(replica_base *r) : replica_ba "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and mutate bytes"); } +void capacity_unit_calculator::count_read_data(const dsn::blob &key, key_type type, int64_t size) +{ + add_read_cu(size); + if (type == key_type::HASH_KEY) { + _read_hotkey_collector->capture_hash_key(key, size); + } else if (type == key_type::RAW_KEY) { + _read_hotkey_collector->capture_raw_key(key, size); + } +} + +void capacity_unit_calculator::count_write_data(const dsn::blob &key, key_type type, int64_t size) +{ + add_write_cu(size); + if (type == key_type::HASH_KEY) { + _write_hotkey_collector->capture_hash_key(key, size); + } else if (type == key_type::RAW_KEY) { + _write_hotkey_collector->capture_raw_key(key, size); + } +} + int64_t capacity_unit_calculator::add_read_cu(int64_t read_data_size) { int64_t read_cu = read_data_size > 0 @@ -98,10 +129,10 @@ void capacity_unit_calculator::add_get_cu(int32_t status, } if (status == rocksdb::Status::kNotFound) { - add_read_cu(1); + count_read_data(key, key_type::RAW_KEY, 1); return; } - add_read_cu(key.size() + value.size()); + count_read_data(key, key_type::RAW_KEY, key.size() + value.size()); } void capacity_unit_calculator::add_multi_get_cu(int32_t status, @@ -122,10 +153,10 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status, } if (status == rocksdb::Status::kNotFound) { - add_read_cu(1); + count_read_data(hash_key, key_type::HASH_KEY, 1); return; } - add_read_cu(data_size); + count_read_data(hash_key, key_type::HASH_KEY, data_size); } void capacity_unit_calculator::add_scan_cu(int32_t status, @@ -144,25 +175,27 @@ void capacity_unit_calculator::add_scan_cu(int32_t status, int64_t data_size = 0; for (const auto &kv : kvs) { data_size += kv.key.size() + kv.value.size(); + // special case of count_read_data + _read_hotkey_collector->capture_raw_key(kv.key, kv.key.size() + kv.value.size()); } add_read_cu(data_size); _pfc_scan_bytes->add(data_size); } -void capacity_unit_calculator::add_sortkey_count_cu(int32_t status) +void capacity_unit_calculator::add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key) { if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) { return; } - add_read_cu(1); + count_read_data(hash_key, key_type::HASH_KEY, 1); } -void capacity_unit_calculator::add_ttl_cu(int32_t status) +void capacity_unit_calculator::add_ttl_cu(int32_t status, const dsn::blob &key) { if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) { return; } - add_read_cu(1); + count_read_data(key, key_type::RAW_KEY, 1); } void capacity_unit_calculator::add_put_cu(int32_t status, @@ -173,7 +206,7 @@ void capacity_unit_calculator::add_put_cu(int32_t status, if (status != rocksdb::Status::kOk) { return; } - add_write_cu(key.size() + value.size()); + count_write_data(key, key_type::RAW_KEY, key.size() + value.size()); } void capacity_unit_calculator::add_remove_cu(int32_t status, const dsn::blob &key) @@ -181,7 +214,7 @@ void capacity_unit_calculator::add_remove_cu(int32_t status, const dsn::blob &ke if (status != rocksdb::Status::kOk) { return; } - add_write_cu(key.size()); + count_write_data(key, key_type::RAW_KEY, key.size()); } void capacity_unit_calculator::add_multi_put_cu(int32_t status, @@ -199,7 +232,7 @@ void capacity_unit_calculator::add_multi_put_cu(int32_t status, if (status != rocksdb::Status::kOk) { return; } - add_write_cu(data_size); + count_write_data(hash_key, key_type::HASH_KEY, data_size); } void capacity_unit_calculator::add_multi_remove_cu(int32_t status, @@ -214,18 +247,18 @@ void capacity_unit_calculator::add_multi_remove_cu(int32_t status, for (const auto &sort_key : sort_keys) { data_size += hash_key.size() + sort_key.size(); } - add_write_cu(data_size); + count_write_data(hash_key, key_type::HASH_KEY, data_size); } -void capacity_unit_calculator::add_incr_cu(int32_t status) +void capacity_unit_calculator::add_incr_cu(int32_t status, const dsn::blob &key) { if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument) { return; } if (status == rocksdb::Status::kOk) { - add_write_cu(1); + count_write_data(key, key_type::RAW_KEY, 1); } - add_read_cu(1); + count_read_data(key, key_type::RAW_KEY, 1); } void capacity_unit_calculator::add_check_and_set_cu(int32_t status, @@ -243,9 +276,10 @@ void capacity_unit_calculator::add_check_and_set_cu(int32_t status, } if (status == rocksdb::Status::kOk) { - add_write_cu(hash_key.size() + set_sort_key.size() + value.size()); + count_write_data( + hash_key, key_type::HASH_KEY, hash_key.size() + set_sort_key.size() + value.size()); } - add_read_cu(hash_key.size() + check_sort_key.size()); + count_read_data(hash_key, key_type::HASH_KEY, hash_key.size() + check_sort_key.size()); } void capacity_unit_calculator::add_check_and_mutate_cu( @@ -269,9 +303,9 @@ void capacity_unit_calculator::add_check_and_mutate_cu( } if (status == rocksdb::Status::kOk) { - add_write_cu(data_size); + count_write_data(hash_key, key_type::HASH_KEY, data_size); } - add_read_cu(hash_key.size() + check_sort_key.size()); + count_read_data(hash_key, key_type::HASH_KEY, hash_key.size() + check_sort_key.size()); } } // namespace server diff --git a/src/server/capacity_unit_calculator.h b/src/server/capacity_unit_calculator.h index a731fe4a4d..9a7c17c16c 100644 --- a/src/server/capacity_unit_calculator.h +++ b/src/server/capacity_unit_calculator.h @@ -11,18 +11,27 @@ namespace pegasus { namespace server { +class hotkey_collector; +enum class key_type +{ + RAW_KEY = 0, + HASH_KEY +}; + class capacity_unit_calculator : public dsn::replication::replica_base { public: - explicit capacity_unit_calculator(replica_base *r); + capacity_unit_calculator(replica_base *r, + std::shared_ptr read_hotkey_collector, + std::shared_ptr write_hotkey_collector); void add_get_cu(int32_t status, const dsn::blob &key, const dsn::blob &value); void add_multi_get_cu(int32_t status, const dsn::blob &hash_key, const std::vector<::dsn::apps::key_value> &kvs); void add_scan_cu(int32_t status, const std::vector<::dsn::apps::key_value> &kvs); - void add_sortkey_count_cu(int32_t status); - void add_ttl_cu(int32_t status); + void add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key); + void add_ttl_cu(int32_t status, const dsn::blob &key); void add_put_cu(int32_t status, const dsn::blob &key, const dsn::blob &value); void add_remove_cu(int32_t status, const dsn::blob &key); @@ -32,7 +41,7 @@ class capacity_unit_calculator : public dsn::replication::replica_base void add_multi_remove_cu(int32_t status, const dsn::blob &hash_key, const std::vector<::dsn::blob> &sort_keys); - void add_incr_cu(int32_t status); + void add_incr_cu(int32_t status, const dsn::blob &key); void add_check_and_set_cu(int32_t status, const dsn::blob &hash_key, const dsn::blob &check_sort_key, @@ -55,6 +64,9 @@ class capacity_unit_calculator : public dsn::replication::replica_base #endif private: + void count_read_data(const dsn::blob &key, key_type type, int64_t size); + void count_write_data(const dsn::blob &key, key_type type, int64_t size); + uint64_t _read_capacity_unit_size; uint64_t _write_capacity_unit_size; uint32_t _log_read_cu_size; @@ -70,6 +82,9 @@ class capacity_unit_calculator : public dsn::replication::replica_base ::dsn::perf_counter_wrapper _pfc_multi_put_bytes; ::dsn::perf_counter_wrapper _pfc_check_and_set_bytes; ::dsn::perf_counter_wrapper _pfc_check_and_mutate_bytes; + + std::shared_ptr _read_hotkey_collector; + std::shared_ptr _write_hotkey_collector; }; } // namespace server diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 122c638537..cdd5b35070 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -795,7 +795,7 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc) resp.count = -1; } - _cu_calculator->add_sortkey_count_cu(resp.error); + _cu_calculator->add_sortkey_count_cu(resp.error, hash_key); _pfc_scan_latency->set(dsn_now_ns() - start_time); } @@ -857,7 +857,7 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc) } } - _cu_calculator->add_ttl_cu(resp.error); + _cu_calculator->add_ttl_cu(resp.error, key); } void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) @@ -1486,7 +1486,8 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) }); // initialize cu calculator and write service after server being initialized. - _cu_calculator = dsn::make_unique(this); + _cu_calculator = dsn::make_unique( + this, _read_hotkey_collector, _write_hotkey_collector); _server_write = dsn::make_unique(this, _verbose_log); return ::dsn::ERR_OK; diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index f448344cf8..e6d090c088 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -173,7 +173,7 @@ int pegasus_write_service::incr(int64_t decree, int err = _impl->incr(decree, update, resp); if (_server->is_primary()) { - _cu_calculator->add_incr_cu(resp.error); + _cu_calculator->add_incr_cu(resp.error, update.key); } _pfc_incr_latency->set(dsn_now_ns() - start_time); diff --git a/src/server/test/capacity_unit_calculator_test.cpp b/src/server/test/capacity_unit_calculator_test.cpp index 10ac99e159..dee34ff833 100644 --- a/src/server/test/capacity_unit_calculator_test.cpp +++ b/src/server/test/capacity_unit_calculator_test.cpp @@ -6,6 +6,7 @@ #include "server/capacity_unit_calculator.h" #include +#include "server/hotkey_collector.h" namespace pegasus { namespace server { @@ -26,7 +27,8 @@ class mock_capacity_unit_calculator : public capacity_unit_calculator } explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r) - : capacity_unit_calculator(r) + : capacity_unit_calculator( + r, std::make_shared(), std::make_shared()) { } @@ -200,7 +202,7 @@ TEST_F(capacity_unit_calculator_test, scan) TEST_F(capacity_unit_calculator_test, sortkey_count) { for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) { - _cal->add_sortkey_count_cu(i); + _cal->add_sortkey_count_cu(i, dsn::blob()); if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) { ASSERT_EQ(_cal->read_cu, 1); } else { @@ -214,7 +216,7 @@ TEST_F(capacity_unit_calculator_test, sortkey_count) TEST_F(capacity_unit_calculator_test, ttl) { for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) { - _cal->add_ttl_cu(i); + _cal->add_ttl_cu(i, dsn::blob()); if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) { ASSERT_EQ(_cal->read_cu, 1); } else { @@ -300,7 +302,7 @@ TEST_F(capacity_unit_calculator_test, multi_remove) TEST_F(capacity_unit_calculator_test, incr) { for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) { - _cal->add_incr_cu(i); + _cal->add_incr_cu(i, dsn::blob()); if (i == rocksdb::Status::kOk) { ASSERT_EQ(_cal->read_cu, 1); ASSERT_EQ(_cal->write_cu, 1); From a17b50cfc5496c7ed6ee13eee419c9802fd7688f Mon Sep 17 00:00:00 2001 From: tangyanzhao Date: Mon, 12 Oct 2020 17:11:07 +0800 Subject: [PATCH 2/5] change count method --- src/server/capacity_unit_calculator.cpp | 76 +++++++++---------- src/server/capacity_unit_calculator.h | 8 -- .../test/capacity_unit_calculator_test.cpp | 6 +- 3 files changed, 40 insertions(+), 50 deletions(-) diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp index 0b2f563e4e..d0b2a8e22e 100644 --- a/src/server/capacity_unit_calculator.cpp +++ b/src/server/capacity_unit_calculator.cpp @@ -81,26 +81,6 @@ capacity_unit_calculator::capacity_unit_calculator( "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and mutate bytes"); } -void capacity_unit_calculator::count_read_data(const dsn::blob &key, key_type type, int64_t size) -{ - add_read_cu(size); - if (type == key_type::HASH_KEY) { - _read_hotkey_collector->capture_hash_key(key, size); - } else if (type == key_type::RAW_KEY) { - _read_hotkey_collector->capture_raw_key(key, size); - } -} - -void capacity_unit_calculator::count_write_data(const dsn::blob &key, key_type type, int64_t size) -{ - add_write_cu(size); - if (type == key_type::HASH_KEY) { - _write_hotkey_collector->capture_hash_key(key, size); - } else if (type == key_type::RAW_KEY) { - _write_hotkey_collector->capture_raw_key(key, size); - } -} - int64_t capacity_unit_calculator::add_read_cu(int64_t read_data_size) { int64_t read_cu = read_data_size > 0 @@ -129,10 +109,12 @@ void capacity_unit_calculator::add_get_cu(int32_t status, } if (status == rocksdb::Status::kNotFound) { - count_read_data(key, key_type::RAW_KEY, 1); + add_read_cu(1); + _read_hotkey_collector->capture_raw_key(key, 1); return; } - count_read_data(key, key_type::RAW_KEY, key.size() + value.size()); + add_read_cu(key.size() + value.size()); + _read_hotkey_collector->capture_raw_key(key, 1); } void capacity_unit_calculator::add_multi_get_cu(int32_t status, @@ -141,9 +123,11 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status, { int64_t data_size = 0; int64_t multi_get_bytes = 0; + int64_t key_count = 0; for (const auto &kv : kvs) { multi_get_bytes += kv.key.size() + kv.value.size(); data_size += hash_key.size() + kv.key.size() + kv.value.size(); + key_count++; } _pfc_multi_get_bytes->add(hash_key.size() + multi_get_bytes); @@ -153,10 +137,12 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status, } if (status == rocksdb::Status::kNotFound) { - count_read_data(hash_key, key_type::HASH_KEY, 1); + add_read_cu(1); + _read_hotkey_collector->capture_raw_key(hash_key, key_count); return; } - count_read_data(hash_key, key_type::HASH_KEY, data_size); + add_read_cu(data_size); + _read_hotkey_collector->capture_raw_key(hash_key, key_count); } void capacity_unit_calculator::add_scan_cu(int32_t status, @@ -175,8 +161,7 @@ void capacity_unit_calculator::add_scan_cu(int32_t status, int64_t data_size = 0; for (const auto &kv : kvs) { data_size += kv.key.size() + kv.value.size(); - // special case of count_read_data - _read_hotkey_collector->capture_raw_key(kv.key, kv.key.size() + kv.value.size()); + _read_hotkey_collector->capture_raw_key(kv.key, 1); } add_read_cu(data_size); _pfc_scan_bytes->add(data_size); @@ -187,7 +172,8 @@ void capacity_unit_calculator::add_sortkey_count_cu(int32_t status, const dsn::b if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) { return; } - count_read_data(hash_key, key_type::HASH_KEY, 1); + add_read_cu(1); + _read_hotkey_collector->capture_hash_key(hash_key, 1); } void capacity_unit_calculator::add_ttl_cu(int32_t status, const dsn::blob &key) @@ -195,7 +181,8 @@ void capacity_unit_calculator::add_ttl_cu(int32_t status, const dsn::blob &key) if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) { return; } - count_read_data(key, key_type::RAW_KEY, 1); + add_read_cu(1); + _read_hotkey_collector->capture_raw_key(key, 1); } void capacity_unit_calculator::add_put_cu(int32_t status, @@ -206,7 +193,8 @@ void capacity_unit_calculator::add_put_cu(int32_t status, if (status != rocksdb::Status::kOk) { return; } - count_write_data(key, key_type::RAW_KEY, key.size() + value.size()); + add_write_cu(key.size() + value.size()); + _write_hotkey_collector->capture_raw_key(key, 1); } void capacity_unit_calculator::add_remove_cu(int32_t status, const dsn::blob &key) @@ -214,7 +202,8 @@ void capacity_unit_calculator::add_remove_cu(int32_t status, const dsn::blob &ke if (status != rocksdb::Status::kOk) { return; } - count_write_data(key, key_type::RAW_KEY, key.size()); + add_write_cu(key.size()); + _write_hotkey_collector->capture_raw_key(key, 1); } void capacity_unit_calculator::add_multi_put_cu(int32_t status, @@ -226,13 +215,14 @@ void capacity_unit_calculator::add_multi_put_cu(int32_t status, for (const auto &kv : kvs) { multi_put_bytes += kv.key.size() + kv.value.size(); data_size += hash_key.size() + kv.key.size() + kv.value.size(); + _write_hotkey_collector->capture_raw_key(kv.key, 1); } _pfc_multi_put_bytes->add(hash_key.size() + multi_put_bytes); if (status != rocksdb::Status::kOk) { return; } - count_write_data(hash_key, key_type::HASH_KEY, data_size); + add_write_cu(data_size); } void capacity_unit_calculator::add_multi_remove_cu(int32_t status, @@ -246,8 +236,9 @@ void capacity_unit_calculator::add_multi_remove_cu(int32_t status, int64_t data_size = 0; for (const auto &sort_key : sort_keys) { data_size += hash_key.size() + sort_key.size(); + _write_hotkey_collector->capture_hash_key(hash_key, 1); } - count_write_data(hash_key, key_type::HASH_KEY, data_size); + add_write_cu(data_size); } void capacity_unit_calculator::add_incr_cu(int32_t status, const dsn::blob &key) @@ -256,9 +247,11 @@ void capacity_unit_calculator::add_incr_cu(int32_t status, const dsn::blob &key) return; } if (status == rocksdb::Status::kOk) { - count_write_data(key, key_type::RAW_KEY, 1); + add_write_cu(1); + _write_hotkey_collector->capture_raw_key(key, 1); } - count_read_data(key, key_type::RAW_KEY, 1); + add_read_cu(1); + _read_hotkey_collector->capture_raw_key(key, 1); } void capacity_unit_calculator::add_check_and_set_cu(int32_t status, @@ -276,10 +269,11 @@ void capacity_unit_calculator::add_check_and_set_cu(int32_t status, } if (status == rocksdb::Status::kOk) { - count_write_data( - hash_key, key_type::HASH_KEY, hash_key.size() + set_sort_key.size() + value.size()); + add_write_cu(hash_key.size() + set_sort_key.size() + value.size()); + _write_hotkey_collector->capture_hash_key(hash_key, 1); } - count_read_data(hash_key, key_type::HASH_KEY, hash_key.size() + check_sort_key.size()); + add_read_cu(hash_key.size() + check_sort_key.size()); + _read_hotkey_collector->capture_hash_key(hash_key, 1); } void capacity_unit_calculator::add_check_and_mutate_cu( @@ -290,9 +284,11 @@ void capacity_unit_calculator::add_check_and_mutate_cu( { int64_t data_size = 0; int64_t check_and_mutate_bytes = 0; + int64_t key_count = 0; for (const auto &m : mutate_list) { check_and_mutate_bytes += m.sort_key.size() + m.value.size(); data_size += hash_key.size() + m.sort_key.size() + m.value.size(); + key_count++; } _pfc_check_and_mutate_bytes->add(hash_key.size() + check_sort_key.size() + check_and_mutate_bytes); @@ -303,9 +299,11 @@ void capacity_unit_calculator::add_check_and_mutate_cu( } if (status == rocksdb::Status::kOk) { - count_write_data(hash_key, key_type::HASH_KEY, data_size); + add_write_cu(data_size); + _write_hotkey_collector->capture_hash_key(hash_key, key_count); } - count_read_data(hash_key, key_type::HASH_KEY, hash_key.size() + check_sort_key.size()); + add_read_cu(hash_key.size() + check_sort_key.size()); + _read_hotkey_collector->capture_hash_key(hash_key, 1); } } // namespace server diff --git a/src/server/capacity_unit_calculator.h b/src/server/capacity_unit_calculator.h index 9a7c17c16c..a3508aab95 100644 --- a/src/server/capacity_unit_calculator.h +++ b/src/server/capacity_unit_calculator.h @@ -12,11 +12,6 @@ namespace pegasus { namespace server { class hotkey_collector; -enum class key_type -{ - RAW_KEY = 0, - HASH_KEY -}; class capacity_unit_calculator : public dsn::replication::replica_base { @@ -64,9 +59,6 @@ class capacity_unit_calculator : public dsn::replication::replica_base #endif private: - void count_read_data(const dsn::blob &key, key_type type, int64_t size); - void count_write_data(const dsn::blob &key, key_type type, int64_t size); - uint64_t _read_capacity_unit_size; uint64_t _write_capacity_unit_size; uint32_t _log_read_cu_size; diff --git a/src/server/test/capacity_unit_calculator_test.cpp b/src/server/test/capacity_unit_calculator_test.cpp index dee34ff833..0ad3f6e459 100644 --- a/src/server/test/capacity_unit_calculator_test.cpp +++ b/src/server/test/capacity_unit_calculator_test.cpp @@ -202,7 +202,7 @@ TEST_F(capacity_unit_calculator_test, scan) TEST_F(capacity_unit_calculator_test, sortkey_count) { for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) { - _cal->add_sortkey_count_cu(i, dsn::blob()); + _cal->add_sortkey_count_cu(i, key); if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) { ASSERT_EQ(_cal->read_cu, 1); } else { @@ -216,7 +216,7 @@ TEST_F(capacity_unit_calculator_test, sortkey_count) TEST_F(capacity_unit_calculator_test, ttl) { for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) { - _cal->add_ttl_cu(i, dsn::blob()); + _cal->add_ttl_cu(i, key); if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) { ASSERT_EQ(_cal->read_cu, 1); } else { @@ -302,7 +302,7 @@ TEST_F(capacity_unit_calculator_test, multi_remove) TEST_F(capacity_unit_calculator_test, incr) { for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) { - _cal->add_incr_cu(i, dsn::blob()); + _cal->add_incr_cu(i, key); if (i == rocksdb::Status::kOk) { ASSERT_EQ(_cal->read_cu, 1); ASSERT_EQ(_cal->write_cu, 1); From e0716cfa26c8e252b8616f348caf6e2108c4bc24 Mon Sep 17 00:00:00 2001 From: tangyanzhao Date: Mon, 12 Oct 2020 17:13:05 +0800 Subject: [PATCH 3/5] fix --- src/server/capacity_unit_calculator.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp index d0b2a8e22e..211fe801d6 100644 --- a/src/server/capacity_unit_calculator.cpp +++ b/src/server/capacity_unit_calculator.cpp @@ -138,11 +138,11 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status, if (status == rocksdb::Status::kNotFound) { add_read_cu(1); - _read_hotkey_collector->capture_raw_key(hash_key, key_count); + _read_hotkey_collector->capture_hash_key(hash_key, key_count); return; } add_read_cu(data_size); - _read_hotkey_collector->capture_raw_key(hash_key, key_count); + _read_hotkey_collector->capture_hash_key(hash_key, key_count); } void capacity_unit_calculator::add_scan_cu(int32_t status, From ed258f1f4cc44fda70f8048535fe82b5401e31aa Mon Sep 17 00:00:00 2001 From: tangyanzhao Date: Tue, 13 Oct 2020 14:53:17 +0800 Subject: [PATCH 4/5] fix --- src/server/capacity_unit_calculator.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp index 211fe801d6..6a055e6057 100644 --- a/src/server/capacity_unit_calculator.cpp +++ b/src/server/capacity_unit_calculator.cpp @@ -123,11 +123,9 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status, { int64_t data_size = 0; int64_t multi_get_bytes = 0; - int64_t key_count = 0; for (const auto &kv : kvs) { multi_get_bytes += kv.key.size() + kv.value.size(); data_size += hash_key.size() + kv.key.size() + kv.value.size(); - key_count++; } _pfc_multi_get_bytes->add(hash_key.size() + multi_get_bytes); @@ -136,6 +134,7 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status, return; } + uint64_t key_count = kvs.size(); if (status == rocksdb::Status::kNotFound) { add_read_cu(1); _read_hotkey_collector->capture_hash_key(hash_key, key_count); @@ -158,10 +157,10 @@ void capacity_unit_calculator::add_scan_cu(int32_t status, return; } + // TODO: (Tangyanzhao) hotkey detect in scan int64_t data_size = 0; for (const auto &kv : kvs) { data_size += kv.key.size() + kv.value.size(); - _read_hotkey_collector->capture_raw_key(kv.key, 1); } add_read_cu(data_size); _pfc_scan_bytes->add(data_size); @@ -215,9 +214,10 @@ void capacity_unit_calculator::add_multi_put_cu(int32_t status, for (const auto &kv : kvs) { multi_put_bytes += kv.key.size() + kv.value.size(); data_size += hash_key.size() + kv.key.size() + kv.value.size(); - _write_hotkey_collector->capture_raw_key(kv.key, 1); } _pfc_multi_put_bytes->add(hash_key.size() + multi_put_bytes); + uint64_t key_count = kvs.size(); + _write_hotkey_collector->capture_raw_key(hash_key, key_count); if (status != rocksdb::Status::kOk) { return; @@ -236,8 +236,9 @@ void capacity_unit_calculator::add_multi_remove_cu(int32_t status, int64_t data_size = 0; for (const auto &sort_key : sort_keys) { data_size += hash_key.size() + sort_key.size(); - _write_hotkey_collector->capture_hash_key(hash_key, 1); } + uint64_t key_count = sort_keys.size(); + _write_hotkey_collector->capture_hash_key(hash_key, key_count); add_write_cu(data_size); } @@ -284,11 +285,9 @@ void capacity_unit_calculator::add_check_and_mutate_cu( { int64_t data_size = 0; int64_t check_and_mutate_bytes = 0; - int64_t key_count = 0; for (const auto &m : mutate_list) { check_and_mutate_bytes += m.sort_key.size() + m.value.size(); data_size += hash_key.size() + m.sort_key.size() + m.value.size(); - key_count++; } _pfc_check_and_mutate_bytes->add(hash_key.size() + check_sort_key.size() + check_and_mutate_bytes); @@ -297,7 +296,7 @@ void capacity_unit_calculator::add_check_and_mutate_cu( status != rocksdb::Status::kTryAgain) { return; } - + uint64_t key_count = mutate_list.size(); if (status == rocksdb::Status::kOk) { add_write_cu(data_size); _write_hotkey_collector->capture_hash_key(hash_key, key_count); From eca456bc7d7fa285f2734f2343e4fe387e550d45 Mon Sep 17 00:00:00 2001 From: tangyanzhao Date: Tue, 13 Oct 2020 15:18:40 +0800 Subject: [PATCH 5/5] add comments --- src/server/capacity_unit_calculator.h | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/server/capacity_unit_calculator.h b/src/server/capacity_unit_calculator.h index a3508aab95..95e2e53a04 100644 --- a/src/server/capacity_unit_calculator.h +++ b/src/server/capacity_unit_calculator.h @@ -75,6 +75,24 @@ class capacity_unit_calculator : public dsn::replication::replica_base ::dsn::perf_counter_wrapper _pfc_check_and_set_bytes; ::dsn::perf_counter_wrapper _pfc_check_and_mutate_bytes; + /* + hotkey capturing weight rules: + add_get_cu: whether find the key or not, weight = 1(read_collector), + add_multi_get_cu: weight = returned sortkey count(read_collector), + add_scan_cu : not capture now, + add_sortkey_count_cu: weight = 1(read_collector), + add_ttl_cu: weight = 1(read_collector), + add_put_cu: weight = 1(write_collector), + add_remove_cu: weight = 1(write_collector), + add_multi_put_cu: weight = returned sortkey count(write_collector), + add_multi_remove_cu: weight = returned sortkey count(write_collector), + add_incr_cu: if find the key, weight = 1(write_collector), + else weight = 1(read_collector) + add_check_and_set_cu: if find the key, weight = 1(write_collector), + else weight = 1(read_collector) + add_check_and_mutate_cu: if find the key, weight = mutate_list size + else weight = 1 + */ std::shared_ptr _read_hotkey_collector; std::shared_ptr _write_hotkey_collector; };