Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: add table level latency perf counters #336

Merged
merged 7 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion include/dsn/dist/replication/replication_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions include/dsn/tool-api/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class message_ex;
class admission_controller;
typedef void (*task_rejection_handler)(task *, admission_controller *);

extern std::set<dsn::task_code> s_storage_rpc_req_codes;

class task_spec : public extensible_object<task_spec, 4>
{
public:
Expand Down
5 changes: 5 additions & 0 deletions src/core/core/task_spec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace dsn {

constexpr int TASK_SPEC_STORE_CAPACITY = 512;

std::set<dsn::task_code> s_storage_rpc_req_codes;

// A sequential storage maps task_code to task_spec.
static std::array<std::unique_ptr<task_spec>, TASK_SPEC_STORE_CAPACITY> s_task_spec_store;

Expand Down Expand Up @@ -112,6 +114,9 @@ void task_spec::register_storage_task_code(task_code code,
spec->rpc_request_is_write_operation = is_write_operation;
spec->rpc_request_is_write_allow_batch = allow_batch;
spec->rpc_request_is_write_idempotent = is_idempotent;
if (TASK_TYPE_RPC_REQUEST == type) {
s_storage_rpc_req_codes.insert(code);
}
}

task_spec *task_spec::get(int code)
Expand Down
7 changes: 7 additions & 0 deletions src/dist/replication/common/replication_types.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ replica::replica(
_counter_recent_write_throttling_reject_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

// init table level latency perf counters
init_table_level_latency_counters();

if (need_restore) {
// add an extra env for restore
_extra_envs.insert(
Expand Down Expand Up @@ -163,8 +166,15 @@ void replica::on_client_read(dsn::message_ex *request)
}
}

uint64_t start_time_ns = dsn_now_ns();
dassert(_app != nullptr, "");
_app->on_request(request);

// If the corresponding perf counter exist, count the duration of this operation.
// rpc code of request is already checked in message_ex::rpc_code, so it will always be legal
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
if (_counters_table_level_latency[request->rpc_code()] != nullptr) {
_counters_table_level_latency[request->rpc_code()]->set(dsn_now_ns() - start_time_ns);
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
}
}

void replica::response_client_read(dsn::message_ex *request, error_code error)
Expand Down Expand Up @@ -284,6 +294,18 @@ void replica::execute_mutation(mutation_ptr &mu)
init_prepare(next, false);
}
}

// update table level latency perf-counters for primary partition
if (partition_status::PS_PRIMARY == status()) {
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
uint64_t now_ns = dsn_now_ns();
for (auto update : mu->data.updates) {
// If the corresponding perf counter exist, count the duration of this operation.
// code in update will always be legal
if (_counters_table_level_latency[update.code] != nullptr) {
_counters_table_level_latency[update.code]->set(now_ns - update.start_time_ns);
}
}
}
}

mutation_ptr replica::new_mutation(decree decree)
Expand Down Expand Up @@ -384,5 +406,30 @@ std::string replica::query_compact_state() const
dassert_replica(_app != nullptr, "");
return _app->query_compact_state();
}

// Replicas on the server which serves for the same table will share the same perf-counter.
// For example counter `table.level.RPC_RRDB_RRDB_MULTI_PUT.latency(ns)@test_table` is shared by
// all the replicas for `test_table`.
void replica::init_table_level_latency_counters()
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
{
int max_task_code = task_code::max();
_counters_table_level_latency.resize(max_task_code + 1);

for (int code = 0; code <= max_task_code; code++) {
_counters_table_level_latency[code] = nullptr;
if (s_storage_rpc_req_codes.find(task_code(code)) != s_storage_rpc_req_codes.end()) {
std::string counter_str =
fmt::format("table.level.{}.latency(ns)@{}", task_code(code), _app_info.app_name);
_counters_table_level_latency[code] =
dsn::perf_counters::instance()
.get_app_counter("eon.replica",
counter_str.c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
counter_str.c_str(),
true)
.get();
}
}
}
} // namespace replication
} // namespace dsn
3 changes: 3 additions & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// child handle error while async learn parent states
void child_handle_async_learn_error();

void init_table_level_latency_counters();

private:
friend class ::dsn::replication::replication_checker;
friend class ::dsn::replication::test::test_checker;
Expand Down Expand Up @@ -470,6 +472,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
perf_counter_wrapper _counter_private_log_size;
perf_counter_wrapper _counter_recent_write_throttling_delay_count;
perf_counter_wrapper _counter_recent_write_throttling_reject_count;
std::vector<perf_counter *> _counters_table_level_latency;

dsn::task_tracker _tracker;
// the thread access checker
Expand Down