From 792df1ff93ad461106fa3aab36db4304620b9723 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Fri, 1 Nov 2019 11:11:28 +0800 Subject: [PATCH 1/7] table level latency --- .../dsn/dist/replication/replication_types.h | 6 ++++- include/dsn/tool-api/task_spec.h | 2 ++ src/core/core/task_spec.cpp | 5 ++++ .../replication/common/replication_types.cpp | 7 ++++++ src/dist/replication/lib/replica.cpp | 24 +++++++++++++++++++ src/dist/replication/lib/replica.h | 1 + 6 files changed, 44 insertions(+), 1 deletion(-) diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index 1c8a67e751..371f807bf4 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -429,12 +429,14 @@ class mutation_update mutation_update(mutation_update &&); mutation_update &operator=(const mutation_update &); mutation_update &operator=(mutation_update &&); - mutation_update() : serialization_type(0) {} + mutation_update() : serialization_type(0), start_time_ns(dsn_now_ns()) {} virtual ~mutation_update() throw(); ::dsn::task_code code; int32_t serialization_type; ::dsn::blob data; + // start_time_ns doesn't need to serialization, because we only use it in local node + uint64_t start_time_ns; _mutation_update__isset __isset; @@ -452,6 +454,8 @@ class mutation_update return false; if (!(data == rhs.data)) return false; + if (!(start_time_ns == rhs.start_time_ns)) + return false; return true; } bool operator!=(const mutation_update &rhs) const { return !(*this == rhs); } diff --git a/include/dsn/tool-api/task_spec.h b/include/dsn/tool-api/task_spec.h index 2aa78b1e42..b78dad7094 100644 --- a/include/dsn/tool-api/task_spec.h +++ b/include/dsn/tool-api/task_spec.h @@ -139,6 +139,8 @@ class message_ex; class admission_controller; typedef void (*task_rejection_handler)(task *, admission_controller *); +extern std::vector storage_rpc_req_codes; + class task_spec : public extensible_object { public: diff --git a/src/core/core/task_spec.cpp b/src/core/core/task_spec.cpp index d484eeb6cb..bebb1345dd 100644 --- a/src/core/core/task_spec.cpp +++ b/src/core/core/task_spec.cpp @@ -42,6 +42,8 @@ namespace dsn { constexpr int TASK_SPEC_STORE_CAPACITY = 512; +std::vector storage_rpc_req_codes; + // A sequential storage maps task_code to task_spec. static std::array, TASK_SPEC_STORE_CAPACITY> s_task_spec_store; @@ -112,6 +114,9 @@ void task_spec::register_storage_task_code(task_code code, spec->rpc_request_is_write_operation = is_write_operation; spec->rpc_request_is_write_allow_batch = allow_batch; spec->rpc_request_is_write_idempotent = is_idempotent; + if (TASK_TYPE_RPC_REQUEST == type) { + storage_rpc_req_codes.push_back(code); + } } task_spec *task_spec::get(int code) diff --git a/src/dist/replication/common/replication_types.cpp b/src/dist/replication/common/replication_types.cpp index 474d185408..04c77d74e7 100644 --- a/src/dist/replication/common/replication_types.cpp +++ b/src/dist/replication/common/replication_types.cpp @@ -435,6 +435,7 @@ void swap(mutation_update &a, mutation_update &b) swap(a.code, b.code); swap(a.serialization_type, b.serialization_type); swap(a.data, b.data); + swap(a.start_time_ns, b.start_time_ns); swap(a.__isset, b.__isset); } @@ -443,6 +444,7 @@ mutation_update::mutation_update(const mutation_update &other4) code = other4.code; serialization_type = other4.serialization_type; data = other4.data; + start_time_ns = other4.start_time_ns; __isset = other4.__isset; } mutation_update::mutation_update(mutation_update &&other5) @@ -450,6 +452,7 @@ mutation_update::mutation_update(mutation_update &&other5) code = std::move(other5.code); serialization_type = std::move(other5.serialization_type); data = std::move(other5.data); + start_time_ns = std::move(other5.start_time_ns); __isset = std::move(other5.__isset); } mutation_update &mutation_update::operator=(const mutation_update &other6) @@ -457,6 +460,7 @@ mutation_update &mutation_update::operator=(const mutation_update &other6) code = other6.code; serialization_type = other6.serialization_type; data = other6.data; + start_time_ns = other6.start_time_ns; __isset = other6.__isset; return *this; } @@ -465,6 +469,7 @@ mutation_update &mutation_update::operator=(mutation_update &&other7) code = std::move(other7.code); serialization_type = std::move(other7.serialization_type); data = std::move(other7.data); + start_time_ns = std::move(other7.start_time_ns); __isset = std::move(other7.__isset); return *this; } @@ -477,6 +482,8 @@ void mutation_update::printTo(std::ostream &out) const << "serialization_type=" << to_string(serialization_type); out << ", " << "data=" << to_string(data); + out << ", " + << "start_time_ns=" << to_string(start_time_ns); out << ")"; } diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index 06b52b26c5..1c7ef1e1d3 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -77,6 +77,15 @@ replica::replica( _counter_recent_write_throttling_reject_count.init_app_counter( "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str()); + // init table level latency perf counters + for (auto code : storage_rpc_req_codes) { + counter_str = fmt::format("table.level.{}.latency(us)@{}", code, app.app_name); + _counters_table_level_latency[code].init_app_counter("eon.replica", + counter_str.c_str(), + COUNTER_TYPE_NUMBER_PERCENTILES, + counter_str.c_str()); + } + if (need_restore) { // add an extra env for restore _extra_envs.insert( @@ -163,8 +172,14 @@ void replica::on_client_read(dsn::message_ex *request) } } + uint64_t start_time_ns = dsn_now_ns(); dassert(_app != nullptr, ""); _app->on_request(request); + + auto iter = _counters_table_level_latency.find(request->rpc_code()); + if (iter != _counters_table_level_latency.end()) { + iter->second->set((dsn_now_ns() - start_time_ns) * 1e-3); + } } void replica::response_client_read(dsn::message_ex *request, error_code error) @@ -284,6 +299,15 @@ void replica::execute_mutation(mutation_ptr &mu) init_prepare(next, false); } } + + // update table level latency perf-counters + uint64_t now_ns = dsn_now_ns(); + for (auto update : mu->data.updates) { + auto iter = _counters_table_level_latency.find(update.code); + if (iter != _counters_table_level_latency.end()) { + iter->second->set((now_ns - update.start_time_ns) * 1e-3); + } + } } mutation_ptr replica::new_mutation(decree decree) diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 7b28f0ed1a..1910b2669f 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -470,6 +470,7 @@ class replica : public serverlet, public ref_counter, public replica_ba perf_counter_wrapper _counter_private_log_size; perf_counter_wrapper _counter_recent_write_throttling_delay_count; perf_counter_wrapper _counter_recent_write_throttling_reject_count; + std::map _counters_table_level_latency; dsn::task_tracker _tracker; // the thread access checker From f52ffc9fcab2f676249330d0bc06f8aec03f38e3 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Fri, 1 Nov 2019 16:54:55 +0800 Subject: [PATCH 2/7] table level latency --- src/dist/replication/lib/replica.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index 1c7ef1e1d3..a1bc38ff0f 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -300,12 +300,14 @@ void replica::execute_mutation(mutation_ptr &mu) } } - // update table level latency perf-counters - uint64_t now_ns = dsn_now_ns(); - for (auto update : mu->data.updates) { - auto iter = _counters_table_level_latency.find(update.code); - if (iter != _counters_table_level_latency.end()) { - iter->second->set((now_ns - update.start_time_ns) * 1e-3); + // update table level latency perf-counters for primary partition + if (partition_status::PS_PRIMARY == status()) { + uint64_t now_ns = dsn_now_ns(); + for (auto update : mu->data.updates) { + auto iter = _counters_table_level_latency.find(update.code); + if (iter != _counters_table_level_latency.end()) { + iter->second->set((now_ns - update.start_time_ns) * 1e-3); + } } } } From fc355dde1decefff668242530980981b7aa7a369 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Tue, 5 Nov 2019 10:53:06 +0800 Subject: [PATCH 3/7] table level latency --- include/dsn/dist/replication/replication_types.h | 2 +- src/dist/replication/lib/replica.cpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index 371f807bf4..efac7641ac 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -435,7 +435,7 @@ class mutation_update ::dsn::task_code code; int32_t serialization_type; ::dsn::blob data; - // start_time_ns doesn't need to serialization, because we only use it in local node + // start_time_ns doesn't need to serialization, because we only use it on local node uint64_t start_time_ns; _mutation_update__isset __isset; diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index a1bc38ff0f..c9e4a30f49 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -79,7 +79,7 @@ replica::replica( // init table level latency perf counters for (auto code : storage_rpc_req_codes) { - counter_str = fmt::format("table.level.{}.latency(us)@{}", code, app.app_name); + counter_str = fmt::format("table.level.{}.latency(ns)@{}", code, app.app_name); _counters_table_level_latency[code].init_app_counter("eon.replica", counter_str.c_str(), COUNTER_TYPE_NUMBER_PERCENTILES, @@ -178,7 +178,7 @@ void replica::on_client_read(dsn::message_ex *request) auto iter = _counters_table_level_latency.find(request->rpc_code()); if (iter != _counters_table_level_latency.end()) { - iter->second->set((dsn_now_ns() - start_time_ns) * 1e-3); + iter->second->set(dsn_now_ns() - start_time_ns); } } @@ -306,7 +306,7 @@ void replica::execute_mutation(mutation_ptr &mu) for (auto update : mu->data.updates) { auto iter = _counters_table_level_latency.find(update.code); if (iter != _counters_table_level_latency.end()) { - iter->second->set((now_ns - update.start_time_ns) * 1e-3); + iter->second->set(now_ns - update.start_time_ns); } } } From 3546339c8b2b54ed1f02ffb399adda8676a4be27 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Tue, 5 Nov 2019 19:54:58 +0800 Subject: [PATCH 4/7] table level latency --- include/dsn/tool-api/task_spec.h | 2 +- src/core/core/task_spec.cpp | 4 +-- src/dist/replication/lib/replica.cpp | 44 ++++++++++++++++++++-------- src/dist/replication/lib/replica.h | 4 ++- 4 files changed, 37 insertions(+), 17 deletions(-) diff --git a/include/dsn/tool-api/task_spec.h b/include/dsn/tool-api/task_spec.h index b78dad7094..a5c335ee51 100644 --- a/include/dsn/tool-api/task_spec.h +++ b/include/dsn/tool-api/task_spec.h @@ -139,7 +139,7 @@ class message_ex; class admission_controller; typedef void (*task_rejection_handler)(task *, admission_controller *); -extern std::vector storage_rpc_req_codes; +extern std::set storage_rpc_req_codes; class task_spec : public extensible_object { diff --git a/src/core/core/task_spec.cpp b/src/core/core/task_spec.cpp index bebb1345dd..11827524b3 100644 --- a/src/core/core/task_spec.cpp +++ b/src/core/core/task_spec.cpp @@ -42,7 +42,7 @@ namespace dsn { constexpr int TASK_SPEC_STORE_CAPACITY = 512; -std::vector storage_rpc_req_codes; +std::set storage_rpc_req_codes; // A sequential storage maps task_code to task_spec. static std::array, TASK_SPEC_STORE_CAPACITY> s_task_spec_store; @@ -115,7 +115,7 @@ void task_spec::register_storage_task_code(task_code code, spec->rpc_request_is_write_allow_batch = allow_batch; spec->rpc_request_is_write_idempotent = is_idempotent; if (TASK_TYPE_RPC_REQUEST == type) { - storage_rpc_req_codes.push_back(code); + storage_rpc_req_codes.insert(code); } } diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index c9e4a30f49..48a55d53f1 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -78,13 +78,7 @@ replica::replica( "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str()); // init table level latency perf counters - for (auto code : storage_rpc_req_codes) { - counter_str = fmt::format("table.level.{}.latency(ns)@{}", code, app.app_name); - _counters_table_level_latency[code].init_app_counter("eon.replica", - counter_str.c_str(), - COUNTER_TYPE_NUMBER_PERCENTILES, - counter_str.c_str()); - } + init_table_level_latency_counters(); if (need_restore) { // add an extra env for restore @@ -176,9 +170,10 @@ void replica::on_client_read(dsn::message_ex *request) dassert(_app != nullptr, ""); _app->on_request(request); - auto iter = _counters_table_level_latency.find(request->rpc_code()); - if (iter != _counters_table_level_latency.end()) { - iter->second->set(dsn_now_ns() - start_time_ns); + // If the corresponding perf counter exist, count the duration of this operation. + // rpc code of request is already checked in message_ex::rpc_code, so it will always be legal + if (_counters_table_level_latency[request->rpc_code()] != nullptr) { + _counters_table_level_latency[request->rpc_code()]->set(dsn_now_ns() - start_time_ns); } } @@ -304,9 +299,10 @@ void replica::execute_mutation(mutation_ptr &mu) if (partition_status::PS_PRIMARY == status()) { uint64_t now_ns = dsn_now_ns(); for (auto update : mu->data.updates) { - auto iter = _counters_table_level_latency.find(update.code); - if (iter != _counters_table_level_latency.end()) { - iter->second->set(now_ns - update.start_time_ns); + // If the corresponding perf counter exist, count the duration of this operation. + // code in update will always be legal + if (_counters_table_level_latency[update.code] != nullptr) { + _counters_table_level_latency[update.code]->set(now_ns - update.start_time_ns); } } } @@ -410,5 +406,27 @@ std::string replica::query_compact_state() const dassert_replica(_app != nullptr, ""); return _app->query_compact_state(); } + +void replica::init_table_level_latency_counters() +{ + int max_task_code = task_code::max(); + _counters_table_level_latency.resize(max_task_code + 1); + + for (int code = 0; code <= max_task_code; code++) { + _counters_table_level_latency[code] = nullptr; + if (storage_rpc_req_codes.find(code) != storage_rpc_req_codes.end()) { + std::string counter_str = + fmt::format("table.level.{}.latency(ns)@{}", task_code(code), _app_info.app_name); + _counters_table_level_latency[code] = + dsn::perf_counters::instance() + .get_app_counter("eon.replica", + counter_str.c_str(), + COUNTER_TYPE_NUMBER_PERCENTILES, + counter_str.c_str(), + true) + .get(); + } + } +} } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 1910b2669f..665f4bdf01 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -372,6 +372,8 @@ class replica : public serverlet, public ref_counter, public replica_ba // child handle error while async learn parent states void child_handle_async_learn_error(); + void init_table_level_latency_counters(); + private: friend class ::dsn::replication::replication_checker; friend class ::dsn::replication::test::test_checker; @@ -470,7 +472,7 @@ class replica : public serverlet, public ref_counter, public replica_ba perf_counter_wrapper _counter_private_log_size; perf_counter_wrapper _counter_recent_write_throttling_delay_count; perf_counter_wrapper _counter_recent_write_throttling_reject_count; - std::map _counters_table_level_latency; + std::vector _counters_table_level_latency; dsn::task_tracker _tracker; // the thread access checker From e80eac56c6fd9a33fb1b9dcc62bb070727b02579 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Mon, 11 Nov 2019 10:59:52 +0800 Subject: [PATCH 5/7] table level latency --- include/dsn/tool-api/task_spec.h | 2 +- src/core/core/task_spec.cpp | 4 ++-- src/dist/replication/lib/replica.cpp | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/include/dsn/tool-api/task_spec.h b/include/dsn/tool-api/task_spec.h index a5c335ee51..2fe7dd6b66 100644 --- a/include/dsn/tool-api/task_spec.h +++ b/include/dsn/tool-api/task_spec.h @@ -139,7 +139,7 @@ class message_ex; class admission_controller; typedef void (*task_rejection_handler)(task *, admission_controller *); -extern std::set storage_rpc_req_codes; +extern std::set s_storage_rpc_req_codes; class task_spec : public extensible_object { diff --git a/src/core/core/task_spec.cpp b/src/core/core/task_spec.cpp index 11827524b3..a61b6a3969 100644 --- a/src/core/core/task_spec.cpp +++ b/src/core/core/task_spec.cpp @@ -42,7 +42,7 @@ namespace dsn { constexpr int TASK_SPEC_STORE_CAPACITY = 512; -std::set storage_rpc_req_codes; +std::set s_storage_rpc_req_codes; // A sequential storage maps task_code to task_spec. static std::array, TASK_SPEC_STORE_CAPACITY> s_task_spec_store; @@ -115,7 +115,7 @@ void task_spec::register_storage_task_code(task_code code, spec->rpc_request_is_write_allow_batch = allow_batch; spec->rpc_request_is_write_idempotent = is_idempotent; if (TASK_TYPE_RPC_REQUEST == type) { - storage_rpc_req_codes.insert(code); + s_storage_rpc_req_codes.insert(code); } } diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index 48a55d53f1..d38774decb 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -414,7 +414,7 @@ void replica::init_table_level_latency_counters() for (int code = 0; code <= max_task_code; code++) { _counters_table_level_latency[code] = nullptr; - if (storage_rpc_req_codes.find(code) != storage_rpc_req_codes.end()) { + if (s_storage_rpc_req_codes.find(code) != s_storage_rpc_req_codes.end()) { std::string counter_str = fmt::format("table.level.{}.latency(ns)@{}", task_code(code), _app_info.app_name); _counters_table_level_latency[code] = From d3aea5274830f1d793133c86a4667245e807efc2 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 13 Nov 2019 17:21:29 +0800 Subject: [PATCH 6/7] modified by pr --- include/dsn/tool-api/task_spec.h | 2 +- src/core/core/task_spec.cpp | 2 +- src/dist/replication/lib/replica.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/include/dsn/tool-api/task_spec.h b/include/dsn/tool-api/task_spec.h index 2fe7dd6b66..bd1b25d4c0 100644 --- a/include/dsn/tool-api/task_spec.h +++ b/include/dsn/tool-api/task_spec.h @@ -139,7 +139,7 @@ class message_ex; class admission_controller; typedef void (*task_rejection_handler)(task *, admission_controller *); -extern std::set s_storage_rpc_req_codes; +extern std::set s_storage_rpc_req_codes; class task_spec : public extensible_object { diff --git a/src/core/core/task_spec.cpp b/src/core/core/task_spec.cpp index a61b6a3969..3306ffc76d 100644 --- a/src/core/core/task_spec.cpp +++ b/src/core/core/task_spec.cpp @@ -42,7 +42,7 @@ namespace dsn { constexpr int TASK_SPEC_STORE_CAPACITY = 512; -std::set s_storage_rpc_req_codes; +std::set s_storage_rpc_req_codes; // A sequential storage maps task_code to task_spec. static std::array, TASK_SPEC_STORE_CAPACITY> s_task_spec_store; diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index d38774decb..050caac083 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -414,7 +414,7 @@ void replica::init_table_level_latency_counters() for (int code = 0; code <= max_task_code; code++) { _counters_table_level_latency[code] = nullptr; - if (s_storage_rpc_req_codes.find(code) != s_storage_rpc_req_codes.end()) { + if (s_storage_rpc_req_codes.find(task_code(code)) != s_storage_rpc_req_codes.end()) { std::string counter_str = fmt::format("table.level.{}.latency(ns)@{}", task_code(code), _app_info.app_name); _counters_table_level_latency[code] = From 6a451de39a2e8d9ccee8b2a2366217d9b492b57f Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Thu, 14 Nov 2019 14:31:55 +0800 Subject: [PATCH 7/7] add annotate --- src/dist/replication/lib/replica.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index 050caac083..915243919b 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -407,6 +407,9 @@ std::string replica::query_compact_state() const return _app->query_compact_state(); } +// Replicas on the server which serves for the same table will share the same perf-counter. +// For example counter `table.level.RPC_RRDB_RRDB_MULTI_PUT.latency(ns)@test_table` is shared by +// all the replicas for `test_table`. void replica::init_table_level_latency_counters() { int max_task_code = task_code::max();