Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new_metrics): migrate metrics for replica_stub (part 1) #1455

Merged
10 changes: 5 additions & 5 deletions src/meta/table_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class table_metrics
METRIC_DEFINE_SET(healthy_partitions, int64_t)

#define __METRIC_DEFINE_INCREMENT_BY(name) \
void increment_##name##_by(int32_t partition_id, int64_t x) \
void METRIC_FUNC_NAME_INCREMENT_BY(name)(int32_t partition_id, int64_t x) \
{ \
CHECK_LT(partition_id, _partition_metrics.size()); \
METRIC_INCREMENT_BY(*(_partition_metrics[partition_id]), name, x); \
Expand All @@ -106,7 +106,7 @@ class table_metrics
#undef __METRIC_DEFINE_INCREMENT_BY

#define __METRIC_DEFINE_INCREMENT(name) \
void increment_##name(int32_t partition_id) \
void METRIC_FUNC_NAME_INCREMENT(name)(int32_t partition_id) \
{ \
CHECK_LT(partition_id, _partition_metrics.size()); \
METRIC_INCREMENT(*(_partition_metrics[partition_id]), name); \
Expand All @@ -120,7 +120,7 @@ class table_metrics
#undef __METRIC_DEFINE_INCREMENT

#define __METRIC_DEFINE_SET(name, value_type) \
void set_##name(int32_t partition_id, value_type value) \
void METRIC_FUNC_NAME_SET(name)(int32_t partition_id, value_type value) \
{ \
CHECK_LT(partition_id, _partition_metrics.size()); \
METRIC_SET(*(_partition_metrics[partition_id]), name, value); \
Expand Down Expand Up @@ -167,7 +167,7 @@ class greedy_balance_stats
using partition_map = std::unordered_map<gpid, partition_stats>;

#define __METRIC_DEFINE_INCREMENT(name) \
void increment_##name(const gpid &id, bool balance_checker) \
void METRIC_FUNC_NAME_INCREMENT(name)(const gpid &id, bool balance_checker) \
{ \
auto &partition = _partition_map[id]; \
++(partition.greedy_recent_balance_operations); \
Expand Down Expand Up @@ -210,7 +210,7 @@ class table_metric_entities
void clear_entities();

#define __METRIC_DEFINE_INCREMENT(name) \
void increment_##name(const gpid &id) \
void METRIC_FUNC_NAME_INCREMENT(name)(const gpid &id) \
{ \
utils::auto_read_lock l(_lock); \
\
Expand Down
10 changes: 0 additions & 10 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
#include "mutation.h"
#include "mutation_log.h"
#include "perf_counter/perf_counter.h"
#include "perf_counter/perf_counter_wrapper.h"
#include "perf_counter/perf_counters.h"
#include "replica/prepare_list.h"
#include "replica/replica_context.h"
Expand Down Expand Up @@ -226,15 +225,6 @@ void replica::update_last_checkpoint_generate_time()
_last_checkpoint_generate_time_ms + rand::next_u64(max_interval_ms / 2, max_interval_ms);
}

// //
// Statistics //
// //

void replica::update_commit_qps(int count)
{
_stub->_counter_replicas_commit_qps->add((uint64_t)count);
}

void replica::init_state()
{
_inactive_is_transient = false;
Expand Down
5 changes: 0 additions & 5 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

replica_follower *get_replica_follower() const { return _replica_follower.get(); };

//
// Statistics
//
void update_commit_qps(int count);

// routine for get extra envs from replica
const std::map<std::string, std::string> &get_replica_extra_envs() const { return _extra_envs; }

Expand Down
60 changes: 32 additions & 28 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@
#include "remote_cmd/remote_command.h"
#include "utils/fail_point.h"

METRIC_DEFINE_gauge_int64(server,
total_replicas,
dsn::metric_unit::kReplicas,
"The total number of replicas");

METRIC_DEFINE_gauge_int64(server,
opening_replicas,
dsn::metric_unit::kReplicas,
"The number of opening replicas");

METRIC_DEFINE_gauge_int64(server,
closing_replicas,
dsn::metric_unit::kReplicas,
"The number of closing replicas");

namespace dsn {
namespace replication {

Expand Down Expand Up @@ -197,7 +212,10 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_learn_app_concurrent_count(0),
_bulk_load_downloading_count(0),
_manual_emergency_checkpointing_count(0),
_is_running(false)
_is_running(false),
METRIC_VAR_INIT_server(total_replicas),
METRIC_VAR_INIT_server(opening_replicas),
METRIC_VAR_INIT_server(closing_replicas)
{
#ifdef DSN_ENABLE_GPERF
_is_releasing_memory = false;
Expand All @@ -215,20 +233,6 @@ replica_stub::~replica_stub(void) { close(); }

void replica_stub::install_perf_counters()
{
_counter_replicas_count.init_app_counter(
"eon.replica_stub", "replica(Count)", COUNTER_TYPE_NUMBER, "# in replica_stub._replicas");
_counter_replicas_opening_count.init_app_counter("eon.replica_stub",
"opening.replica(Count)",
COUNTER_TYPE_NUMBER,
"# in replica_stub._opening_replicas");
_counter_replicas_closing_count.init_app_counter("eon.replica_stub",
"closing.replica(Count)",
COUNTER_TYPE_NUMBER,
"# in replica_stub._closing_replicas");
_counter_replicas_commit_qps.init_app_counter("eon.replica_stub",
"replicas.commit.qps",
COUNTER_TYPE_RATE,
"server-level commit throughput");
_counter_replicas_learning_count.init_app_counter("eon.replica_stub",
"replicas.learning.count",
COUNTER_TYPE_NUMBER,
Expand Down Expand Up @@ -797,7 +801,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f

// attach rps
_replicas = std::move(rps);
_counter_replicas_count->add((uint64_t)_replicas.size());
METRIC_VAR_INCREMENT_BY(total_replicas, _replicas.size());
for (const auto &kv : _replicas) {
_fs_manager.add_replica(kv.first, kv.second->dir());
}
Expand Down Expand Up @@ -2032,10 +2036,10 @@ task_ptr replica_stub::begin_open_replica(
if (rep->status() == partition_status::PS_INACTIVE && tsk->cancel(false)) {
// reopen it
_closing_replicas.erase(it);
_counter_replicas_closing_count->decrement();
METRIC_VAR_DECREMENT(closing_replicas);
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved

_replicas.emplace(id, rep);
_counter_replicas_count->increment();
METRIC_VAR_INCREMENT(total_replicas);

_closed_replicas.erase(id);

Expand All @@ -2061,7 +2065,7 @@ task_ptr replica_stub::begin_open_replica(
std::bind(&replica_stub::open_replica, this, app, id, group_check, configuration_update));

_opening_replicas[id] = task;
_counter_replicas_opening_count->increment();
METRIC_VAR_INCREMENT(opening_replicas);
_closed_replicas.erase(id);

_replicas_lock.unlock_write();
Expand Down Expand Up @@ -2169,7 +2173,7 @@ void replica_stub::open_replica(
0,
"replica {} is not in _opening_replicas",
id.to_string());
_counter_replicas_opening_count->decrement();
METRIC_VAR_DECREMENT(opening_replicas);
return;
}

Expand All @@ -2179,13 +2183,13 @@ void replica_stub::open_replica(
0,
"replica {} is not in _opening_replicas",
id.to_string());
_counter_replicas_opening_count->decrement();
METRIC_VAR_DECREMENT(opening_replicas);

CHECK(_replicas.find(id) == _replicas.end(),
"replica {} is already in _replicas",
id.to_string());
_replicas.insert(replicas::value_type(rep->get_gpid(), rep));
_counter_replicas_count->increment();
METRIC_VAR_INCREMENT(total_replicas);

_closed_replicas.erase(id);
}
Expand Down Expand Up @@ -2342,7 +2346,7 @@ task_ptr replica_stub::begin_close_replica(replica_ptr r)
return nullptr;
}

_counter_replicas_count->decrement();
METRIC_VAR_DECREMENT(total_replicas);

int delay_ms = 0;
if (r->status() == partition_status::PS_INACTIVE) {
Expand All @@ -2361,7 +2365,7 @@ task_ptr replica_stub::begin_close_replica(replica_ptr r)
0,
std::chrono::milliseconds(delay_ms));
_closing_replicas[id] = std::make_tuple(task, r, std::move(a_info), std::move(r_info));
_counter_replicas_closing_count->increment();
METRIC_VAR_INCREMENT(closing_replicas);
return task;
}

Expand All @@ -2381,7 +2385,7 @@ void replica_stub::close_replica(replica_ptr r)
_closed_replicas.emplace(
id, std::make_pair(std::get<2>(find->second), std::get<3>(find->second)));
_closing_replicas.erase(find);
_counter_replicas_closing_count->decrement();
METRIC_VAR_DECREMENT(closing_replicas);
}

if (r->is_data_corrupted()) {
Expand Down Expand Up @@ -2840,15 +2844,15 @@ void replica_stub::close()

task->cancel(true);

_counter_replicas_opening_count->decrement();
METRIC_VAR_DECREMENT(opening_replicas);
_replicas_lock.lock_write();
_opening_replicas.erase(_opening_replicas.begin());
}

while (!_replicas.empty()) {
_replicas.begin()->second->close();

_counter_replicas_count->decrement();
METRIC_VAR_DECREMENT(total_replicas);
_replicas.erase(_replicas.begin());
}
}
Expand Down Expand Up @@ -3007,7 +3011,7 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid,
if (rep != nullptr) {
auto pr = _replicas.insert(replicas::value_type(child_pid, rep));
CHECK(pr.second, "child replica {} has been existed", rep->name());
_counter_replicas_count->increment();
METRIC_VAR_INCREMENT(total_replicas);
_closed_replicas.erase(child_pid);
}
return rep;
Expand Down
8 changes: 4 additions & 4 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/metrics.h"
#include "utils/zlocks.h"

namespace dsn {
Expand Down Expand Up @@ -467,10 +468,9 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
#endif

// performance counters
perf_counter_wrapper _counter_replicas_count;
perf_counter_wrapper _counter_replicas_opening_count;
perf_counter_wrapper _counter_replicas_closing_count;
perf_counter_wrapper _counter_replicas_commit_qps;
METRIC_VAR_DECLARE_gauge_int64(total_replicas);
METRIC_VAR_DECLARE_gauge_int64(opening_replicas);
METRIC_VAR_DECLARE_gauge_int64(closing_replicas);

perf_counter_wrapper _counter_replicas_learning_count;
perf_counter_wrapper _counter_replicas_learning_max_duration_time_ms;
Expand Down
10 changes: 8 additions & 2 deletions src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@
#include "utils/threadpool_code.h"
#include "utils/utils.h"

METRIC_DEFINE_counter(replica,
committed_requests,
dsn::metric_unit::kRequests,
"The number of committed requests");

namespace dsn {
class disk_file;

Expand Down Expand Up @@ -246,7 +251,8 @@ replication_app_base *replication_app_base::new_storage_instance(const std::stri
return utils::factory_store<replication_app_base>::create(name.c_str(), PROVIDER_TYPE_MAIN, r);
}

replication_app_base::replication_app_base(replica *replica) : replica_base(replica)
replication_app_base::replication_app_base(replica *replica)
: replica_base(replica), METRIC_VAR_INIT_replica(committed_requests)
{
_dir_data = utils::filesystem::path_combine(replica->dir(), "data");
_dir_learn = utils::filesystem::path_combine(replica->dir(), "learn");
Expand Down Expand Up @@ -485,7 +491,7 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
"mutation {} committed on {}, batched_count = {}", mu->name(), str, batched_count);
}

_replica->update_commit_qps(batched_count);
METRIC_VAR_INCREMENT_BY(committed_requests, batched_count);

return ERR_OK;
}
Expand Down
4 changes: 4 additions & 0 deletions src/replica/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "replica/replica_base.h"
#include "replica_admin_types.h"
#include "utils/error_code.h"
#include "utils/metrics.h"
#include "utils/ports.h"

namespace dsn {
Expand Down Expand Up @@ -313,6 +314,9 @@ class replication_app_base : public replica_base
replica_init_info _info;

explicit replication_app_base(replication::replica *replica);

private:
METRIC_VAR_DECLARE_counter(committed_requests);
};

} // namespace replication
Expand Down
Loading