Skip to content

Commit

Permalink
feat(new_metrics): migrate metrics for replica_stub (part 1) (apache#…
Browse files Browse the repository at this point in the history
…1455)

apache#1454

This is the 1st part of migrating metrics of `replica_stub` to new framework.
After migrating to new framework, the 3 metrics, including the total number
of replicas, the number of opening/closing replicas, are still kept server-level.
Another metric, the number of committed requests, is changed to replica-level.

The naming of metric variable would lead to duplication with class member
(such as `_opening_replicas` in `replica_stub` class). Therefore, a macro
`METRIC_VAR_NAME` is introduced to manage the new naming, which is prefixed with
`_metric_` to avoid duplication. Also, generated metric function names are also
managed by related macros.
  • Loading branch information
empiredan committed Dec 11, 2023
1 parent 23debd2 commit a5c841e
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 77 deletions.
10 changes: 5 additions & 5 deletions src/meta/table_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class table_metrics
METRIC_DEFINE_SET(healthy_partitions, int64_t)

#define __METRIC_DEFINE_INCREMENT_BY(name) \
void increment_##name##_by(int32_t partition_id, int64_t x) \
void METRIC_FUNC_NAME_INCREMENT_BY(name)(int32_t partition_id, int64_t x) \
{ \
CHECK_LT(partition_id, _partition_metrics.size()); \
METRIC_INCREMENT_BY(*(_partition_metrics[partition_id]), name, x); \
Expand All @@ -106,7 +106,7 @@ class table_metrics
#undef __METRIC_DEFINE_INCREMENT_BY

#define __METRIC_DEFINE_INCREMENT(name) \
void increment_##name(int32_t partition_id) \
void METRIC_FUNC_NAME_INCREMENT(name)(int32_t partition_id) \
{ \
CHECK_LT(partition_id, _partition_metrics.size()); \
METRIC_INCREMENT(*(_partition_metrics[partition_id]), name); \
Expand All @@ -120,7 +120,7 @@ class table_metrics
#undef __METRIC_DEFINE_INCREMENT

#define __METRIC_DEFINE_SET(name, value_type) \
void set_##name(int32_t partition_id, value_type value) \
void METRIC_FUNC_NAME_SET(name)(int32_t partition_id, value_type value) \
{ \
CHECK_LT(partition_id, _partition_metrics.size()); \
METRIC_SET(*(_partition_metrics[partition_id]), name, value); \
Expand Down Expand Up @@ -167,7 +167,7 @@ class greedy_balance_stats
using partition_map = std::unordered_map<gpid, partition_stats>;

#define __METRIC_DEFINE_INCREMENT(name) \
void increment_##name(const gpid &id, bool balance_checker) \
void METRIC_FUNC_NAME_INCREMENT(name)(const gpid &id, bool balance_checker) \
{ \
auto &partition = _partition_map[id]; \
++(partition.greedy_recent_balance_operations); \
Expand Down Expand Up @@ -210,7 +210,7 @@ class table_metric_entities
void clear_entities();

#define __METRIC_DEFINE_INCREMENT(name) \
void increment_##name(const gpid &id) \
void METRIC_FUNC_NAME_INCREMENT(name)(const gpid &id) \
{ \
utils::auto_read_lock l(_lock); \
\
Expand Down
10 changes: 0 additions & 10 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
#include "mutation.h"
#include "mutation_log.h"
#include "perf_counter/perf_counter.h"
#include "perf_counter/perf_counter_wrapper.h"
#include "perf_counter/perf_counters.h"
#include "replica/prepare_list.h"
#include "replica/replica_context.h"
Expand Down Expand Up @@ -224,15 +223,6 @@ void replica::update_last_checkpoint_generate_time()
_last_checkpoint_generate_time_ms + rand::next_u64(max_interval_ms / 2, max_interval_ms);
}

// //
// Statistics //
// //

void replica::update_commit_qps(int count)
{
_stub->_counter_replicas_commit_qps->add((uint64_t)count);
}

void replica::init_state()
{
_inactive_is_transient = false;
Expand Down
5 changes: 0 additions & 5 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

replica_follower *get_replica_follower() const { return _replica_follower.get(); };

//
// Statistics
//
void update_commit_qps(int count);

// routine for get extra envs from replica
const std::map<std::string, std::string> &get_replica_extra_envs() const { return _extra_envs; }
const dir_node *get_dir_node() const { return _dir_node; }
Expand Down
60 changes: 32 additions & 28 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@
#include "remote_cmd/remote_command.h"
#include "utils/fail_point.h"

METRIC_DEFINE_gauge_int64(server,
total_replicas,
dsn::metric_unit::kReplicas,
"The total number of replicas");

METRIC_DEFINE_gauge_int64(server,
opening_replicas,
dsn::metric_unit::kReplicas,
"The number of opening replicas");

METRIC_DEFINE_gauge_int64(server,
closing_replicas,
dsn::metric_unit::kReplicas,
"The number of closing replicas");

namespace dsn {
namespace replication {
DSN_DEFINE_bool(replication,
Expand Down Expand Up @@ -203,7 +218,10 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_learn_app_concurrent_count(0),
_bulk_load_downloading_count(0),
_manual_emergency_checkpointing_count(0),
_is_running(false)
_is_running(false),
METRIC_VAR_INIT_server(total_replicas),
METRIC_VAR_INIT_server(opening_replicas),
METRIC_VAR_INIT_server(closing_replicas)
{
#ifdef DSN_ENABLE_GPERF
_is_releasing_memory = false;
Expand All @@ -221,20 +239,6 @@ replica_stub::~replica_stub(void) { close(); }

void replica_stub::install_perf_counters()
{
_counter_replicas_count.init_app_counter(
"eon.replica_stub", "replica(Count)", COUNTER_TYPE_NUMBER, "# in replica_stub._replicas");
_counter_replicas_opening_count.init_app_counter("eon.replica_stub",
"opening.replica(Count)",
COUNTER_TYPE_NUMBER,
"# in replica_stub._opening_replicas");
_counter_replicas_closing_count.init_app_counter("eon.replica_stub",
"closing.replica(Count)",
COUNTER_TYPE_NUMBER,
"# in replica_stub._closing_replicas");
_counter_replicas_commit_qps.init_app_counter("eon.replica_stub",
"replicas.commit.qps",
COUNTER_TYPE_RATE,
"server-level commit throughput");
_counter_replicas_learning_count.init_app_counter("eon.replica_stub",
"replicas.learning.count",
COUNTER_TYPE_NUMBER,
Expand Down Expand Up @@ -810,7 +814,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f

// attach rps
_replicas = std::move(rps);
_counter_replicas_count->add((uint64_t)_replicas.size());
METRIC_VAR_INCREMENT_BY(total_replicas, _replicas.size());
for (const auto &kv : _replicas) {
_fs_manager.add_replica(kv.first, kv.second->dir());
}
Expand Down Expand Up @@ -2075,10 +2079,10 @@ task_ptr replica_stub::begin_open_replica(
if (rep->status() == partition_status::PS_INACTIVE && tsk->cancel(false)) {
// reopen it
_closing_replicas.erase(it);
_counter_replicas_closing_count->decrement();
METRIC_VAR_DECREMENT(closing_replicas);

_replicas.emplace(id, rep);
_counter_replicas_count->increment();
METRIC_VAR_INCREMENT(total_replicas);

_closed_replicas.erase(id);

Expand All @@ -2104,7 +2108,7 @@ task_ptr replica_stub::begin_open_replica(
std::bind(&replica_stub::open_replica, this, app, id, group_check, configuration_update));

_opening_replicas[id] = task;
_counter_replicas_opening_count->increment();
METRIC_VAR_INCREMENT(opening_replicas);
_closed_replicas.erase(id);

_replicas_lock.unlock_write();
Expand Down Expand Up @@ -2217,7 +2221,7 @@ void replica_stub::open_replica(
0,
"replica {} is not in _opening_replicas",
id.to_string());
_counter_replicas_opening_count->decrement();
METRIC_VAR_DECREMENT(opening_replicas);
return;
}

Expand All @@ -2227,13 +2231,13 @@ void replica_stub::open_replica(
0,
"replica {} is not in _opening_replicas",
id.to_string());
_counter_replicas_opening_count->decrement();
METRIC_VAR_DECREMENT(opening_replicas);

CHECK(_replicas.find(id) == _replicas.end(),
"replica {} is already in _replicas",
id.to_string());
_replicas.insert(replicas::value_type(rep->get_gpid(), rep));
_counter_replicas_count->increment();
METRIC_VAR_INCREMENT(total_replicas);

_closed_replicas.erase(id);
}
Expand Down Expand Up @@ -2401,7 +2405,7 @@ task_ptr replica_stub::begin_close_replica(replica_ptr r)
return nullptr;
}

_counter_replicas_count->decrement();
METRIC_VAR_DECREMENT(total_replicas);

int delay_ms = 0;
if (r->status() == partition_status::PS_INACTIVE) {
Expand All @@ -2420,7 +2424,7 @@ task_ptr replica_stub::begin_close_replica(replica_ptr r)
0,
std::chrono::milliseconds(delay_ms));
_closing_replicas[id] = std::make_tuple(task, r, std::move(a_info), std::move(r_info));
_counter_replicas_closing_count->increment();
METRIC_VAR_INCREMENT(closing_replicas);
return task;
}

Expand All @@ -2440,7 +2444,7 @@ void replica_stub::close_replica(replica_ptr r)
_closed_replicas.emplace(
id, std::make_pair(std::get<2>(find->second), std::get<3>(find->second)));
_closing_replicas.erase(find);
_counter_replicas_closing_count->decrement();
METRIC_VAR_DECREMENT(closing_replicas);
}

_fs_manager.remove_replica(id);
Expand Down Expand Up @@ -2904,15 +2908,15 @@ void replica_stub::close()

task->cancel(true);

_counter_replicas_opening_count->decrement();
METRIC_VAR_DECREMENT(opening_replicas);
_replicas_lock.lock_write();
_opening_replicas.erase(_opening_replicas.begin());
}

while (!_replicas.empty()) {
_replicas.begin()->second->close();

_counter_replicas_count->decrement();
METRIC_VAR_DECREMENT(total_replicas);
_replicas.erase(_replicas.begin());
}
}
Expand Down Expand Up @@ -3036,7 +3040,7 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid,
if (rep != nullptr) {
auto pr = _replicas.insert(replicas::value_type(child_pid, rep));
CHECK(pr.second, "child replica {} has been existed", rep->name());
_counter_replicas_count->increment();
METRIC_VAR_INCREMENT(total_replicas);
_closed_replicas.erase(child_pid);
}
return rep;
Expand Down
8 changes: 4 additions & 4 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/fmt_utils.h"
#include "utils/metrics.h"
#include "utils/zlocks.h"

namespace dsn {
Expand Down Expand Up @@ -529,10 +530,9 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
#endif

// performance counters
perf_counter_wrapper _counter_replicas_count;
perf_counter_wrapper _counter_replicas_opening_count;
perf_counter_wrapper _counter_replicas_closing_count;
perf_counter_wrapper _counter_replicas_commit_qps;
METRIC_VAR_DECLARE_gauge_int64(total_replicas);
METRIC_VAR_DECLARE_gauge_int64(opening_replicas);
METRIC_VAR_DECLARE_gauge_int64(closing_replicas);

perf_counter_wrapper _counter_replicas_learning_count;
perf_counter_wrapper _counter_replicas_learning_max_duration_time_ms;
Expand Down
10 changes: 8 additions & 2 deletions src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
#include "utils/fmt_logging.h"
#include "utils/latency_tracer.h"

METRIC_DEFINE_counter(replica,
committed_requests,
dsn::metric_unit::kRequests,
"The number of committed requests");

namespace dsn {

namespace replication {
Expand Down Expand Up @@ -175,7 +180,8 @@ replication_app_base *replication_app_base::new_storage_instance(const std::stri
return utils::factory_store<replication_app_base>::create(name.c_str(), PROVIDER_TYPE_MAIN, r);
}

replication_app_base::replication_app_base(replica *replica) : replica_base(replica)
replication_app_base::replication_app_base(replica *replica)
: replica_base(replica), METRIC_VAR_INIT_replica(committed_requests)
{
_dir_data = utils::filesystem::path_combine(replica->dir(), "data");
_dir_learn = utils::filesystem::path_combine(replica->dir(), "learn");
Expand Down Expand Up @@ -417,7 +423,7 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
"mutation {} committed on {}, batched_count = {}", mu->name(), str, batched_count);
}

_replica->update_commit_qps(batched_count);
METRIC_VAR_INCREMENT_BY(committed_requests, batched_count);

return ERR_OK;
}
Expand Down
4 changes: 4 additions & 0 deletions src/replica/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/fmt_utils.h"
#include "utils/metrics.h"
#include "utils/ports.h"

namespace dsn {
Expand Down Expand Up @@ -344,6 +345,9 @@ class replication_app_base : public replica_base
replica_init_info _info;

explicit replication_app_base(replication::replica *replica);

private:
METRIC_VAR_DECLARE_counter(committed_requests);
};
USER_DEFINED_ENUM_FORMATTER(replication_app_base::chkpt_apply_mode)
} // namespace replication
Expand Down
Loading

0 comments on commit a5c841e

Please sign in to comment.