Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: add table level latency perf counters #336

Merged
merged 7 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion include/dsn/dist/replication/replication_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions include/dsn/tool-api/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class message_ex;
class admission_controller;
typedef void (*task_rejection_handler)(task *, admission_controller *);

extern std::set<dsn::task_code> s_storage_rpc_req_codes;

class task_spec : public extensible_object<task_spec, 4>
{
public:
Expand Down
5 changes: 5 additions & 0 deletions src/core/core/task_spec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace dsn {

constexpr int TASK_SPEC_STORE_CAPACITY = 512;

std::set<dsn::task_code> s_storage_rpc_req_codes;

// A sequential storage maps task_code to task_spec.
static std::array<std::unique_ptr<task_spec>, TASK_SPEC_STORE_CAPACITY> s_task_spec_store;

Expand Down Expand Up @@ -112,6 +114,9 @@ void task_spec::register_storage_task_code(task_code code,
spec->rpc_request_is_write_operation = is_write_operation;
spec->rpc_request_is_write_allow_batch = allow_batch;
spec->rpc_request_is_write_idempotent = is_idempotent;
if (TASK_TYPE_RPC_REQUEST == type) {
s_storage_rpc_req_codes.insert(code);
}
}

task_spec *task_spec::get(int code)
Expand Down
7 changes: 7 additions & 0 deletions src/dist/replication/common/replication_types.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ replica::replica(
_counter_recent_write_throttling_reject_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

// init table level latency perf counters
init_table_level_latency_counters();

if (need_restore) {
// add an extra env for restore
_extra_envs.insert(
Expand Down Expand Up @@ -163,8 +166,15 @@ void replica::on_client_read(dsn::message_ex *request)
}
}

uint64_t start_time_ns = dsn_now_ns();
dassert(_app != nullptr, "");
_app->on_request(request);

// If the corresponding perf counter exist, count the duration of this operation.
// rpc code of request is already checked in message_ex::rpc_code, so it will always be legal
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
if (_counters_table_level_latency[request->rpc_code()] != nullptr) {
_counters_table_level_latency[request->rpc_code()]->set(dsn_now_ns() - start_time_ns);
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
}
}

void replica::response_client_read(dsn::message_ex *request, error_code error)
Expand Down Expand Up @@ -284,6 +294,18 @@ void replica::execute_mutation(mutation_ptr &mu)
init_prepare(next, false);
}
}

// update table level latency perf-counters for primary partition
if (partition_status::PS_PRIMARY == status()) {
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
uint64_t now_ns = dsn_now_ns();
for (auto update : mu->data.updates) {
// If the corresponding perf counter exist, count the duration of this operation.
// code in update will always be legal
if (_counters_table_level_latency[update.code] != nullptr) {
_counters_table_level_latency[update.code]->set(now_ns - update.start_time_ns);
}
}
}
}

mutation_ptr replica::new_mutation(decree decree)
Expand Down Expand Up @@ -384,5 +406,27 @@ std::string replica::query_compact_state() const
dassert_replica(_app != nullptr, "");
return _app->query_compact_state();
}

void replica::init_table_level_latency_counters()
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
{
int max_task_code = task_code::max();
_counters_table_level_latency.resize(max_task_code + 1);

for (int code = 0; code <= max_task_code; code++) {
_counters_table_level_latency[code] = nullptr;
if (s_storage_rpc_req_codes.find(task_code(code)) != s_storage_rpc_req_codes.end()) {
std::string counter_str =
fmt::format("table.level.{}.latency(ns)@{}", task_code(code), _app_info.app_name);
_counters_table_level_latency[code] =
dsn::perf_counters::instance()
.get_app_counter("eon.replica",
counter_str.c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
counter_str.c_str(),
true)
.get();
}
}
}
} // namespace replication
} // namespace dsn
3 changes: 3 additions & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// child handle error while async learn parent states
void child_handle_async_learn_error();

void init_table_level_latency_counters();

private:
friend class ::dsn::replication::replication_checker;
friend class ::dsn::replication::test::test_checker;
Expand Down Expand Up @@ -470,6 +472,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
perf_counter_wrapper _counter_private_log_size;
perf_counter_wrapper _counter_recent_write_throttling_delay_count;
perf_counter_wrapper _counter_recent_write_throttling_reject_count;
std::vector<perf_counter *> _counters_table_level_latency;

dsn::task_tracker _tracker;
// the thread access checker
Expand Down