From 1bec66d7be90ffe5a0c05b964db7f25a7c9426d3 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 4 Dec 2024 12:37:28 +0800 Subject: [PATCH] feat(new_metircs): collect the number of primary and secondary replicas (#2161) Currently we don't have metrics about how many primary replicas or secondary replicas on a replica server, which means we have to find them using `nodes` command by Pegasus shell rather than just monitoring graphs. --- src/replica/replica_stub.cpp | 60 ++++++++++++++++++++++++++++++------ src/replica/replica_stub.h | 4 +++ 2 files changed, 55 insertions(+), 9 deletions(-) diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 18b5566e63..7a51ee4cac 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -115,6 +115,26 @@ METRIC_DEFINE_gauge_int64(server, dsn::metric_unit::kReplicas, "The number of closing replicas"); +METRIC_DEFINE_gauge_int64(server, + inactive_replicas, + dsn::metric_unit::kReplicas, + "The number of inactive replicas"); + +METRIC_DEFINE_gauge_int64(server, + error_replicas, + dsn::metric_unit::kReplicas, + "The number of replicas with errors"); + +METRIC_DEFINE_gauge_int64(server, + primary_replicas, + dsn::metric_unit::kReplicas, + "The number of primary replicas"); + +METRIC_DEFINE_gauge_int64(server, + secondary_replicas, + dsn::metric_unit::kReplicas, + "The number of secondary replicas"); + METRIC_DEFINE_gauge_int64(server, learning_replicas, dsn::metric_unit::kReplicas, @@ -236,7 +256,6 @@ DSN_DECLARE_int32(fd_beacon_interval_seconds); DSN_DECLARE_int32(fd_check_interval_seconds); DSN_DECLARE_int32(fd_grace_seconds); DSN_DECLARE_int32(fd_lease_seconds); -DSN_DECLARE_int32(gc_interval_ms); DSN_DECLARE_string(data_dirs); DSN_DECLARE_string(encryption_cluster_key_name); DSN_DECLARE_string(server_key); @@ -322,6 +341,13 @@ bool check_mem_release_max_reserved_mem_percentage(int32_t value) DSN_DEFINE_validator(mem_release_max_reserved_mem_percentage, &check_mem_release_max_reserved_mem_percentage); +DSN_DEFINE_uint32(replication, + replicas_stat_interval_ms, + 30000, + "period in milliseconds that stats for replicas are calculated"); +DSN_TAG_VARIABLE(replicas_stat_interval_ms, FT_MUTABLE); +DSN_DEFINE_validator(replicas_stat_interval_ms, [](uint32_t value) -> bool { return value > 0; }); + DSN_DEFINE_string( pegasus.server, hadoop_kms_url, @@ -368,6 +394,10 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, METRIC_VAR_INIT_server(total_replicas), METRIC_VAR_INIT_server(opening_replicas), METRIC_VAR_INIT_server(closing_replicas), + METRIC_VAR_INIT_server(inactive_replicas), + METRIC_VAR_INIT_server(error_replicas), + METRIC_VAR_INIT_server(primary_replicas), + METRIC_VAR_INIT_server(secondary_replicas), METRIC_VAR_INIT_server(learning_replicas), METRIC_VAR_INIT_server(learning_replicas_max_duration_ms), METRIC_VAR_INIT_server(learning_replicas_max_copy_file_bytes), @@ -605,9 +635,9 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS, &_tracker, [this] { on_replicas_stat(); }, - std::chrono::milliseconds(FLAGS_gc_interval_ms), + std::chrono::milliseconds(FLAGS_replicas_stat_interval_ms), 0, - std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms))); + std::chrono::milliseconds(rand::next_u32(0, FLAGS_replicas_stat_interval_ms))); } // disk stat @@ -1625,26 +1655,30 @@ void replica_stub::on_replicas_stat() LOG_INFO("start replicas statistics, replica_count = {}", rep_stat_info_by_gpid.size()); // statistic learning info - uint64_t learning_count = 0; uint64_t learning_max_duration_time_ms = 0; uint64_t learning_max_copy_file_size = 0; uint64_t bulk_load_running_count = 0; uint64_t bulk_load_max_ingestion_time_ms = 0; uint64_t bulk_load_max_duration_time_ms = 0; - uint64_t splitting_count = 0; uint64_t splitting_max_duration_time_ms = 0; uint64_t splitting_max_async_learn_time_ms = 0; uint64_t splitting_max_copy_file_size = 0; + + std::map status_counts; for (const auto &[_, rep_stat_info] : rep_stat_info_by_gpid) { const auto &rep = rep_stat_info.rep; + ++status_counts[rep->status()]; + if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) { - learning_count++; learning_max_duration_time_ms = std::max( learning_max_duration_time_ms, rep->_potential_secondary_states.duration_ms()); learning_max_copy_file_size = std::max(learning_max_copy_file_size, rep->_potential_secondary_states.learning_copy_file_size); + + continue; } + if (rep->status() == partition_status::PS_PRIMARY || rep->status() == partition_status::PS_SECONDARY) { if (rep->get_bulk_loader()->get_bulk_load_status() != bulk_load_status::BLS_INVALID) { @@ -1654,26 +1688,34 @@ void replica_stub::on_replicas_stat() bulk_load_max_duration_time_ms = std::max(bulk_load_max_duration_time_ms, rep->get_bulk_loader()->duration_ms()); } + + continue; } + // splitting_max_copy_file_size, rep->_split_states.copy_file_size if (rep->status() == partition_status::PS_PARTITION_SPLIT) { - splitting_count++; splitting_max_duration_time_ms = std::max(splitting_max_duration_time_ms, rep->_split_states.total_ms()); splitting_max_async_learn_time_ms = std::max(splitting_max_async_learn_time_ms, rep->_split_states.async_learn_ms()); splitting_max_copy_file_size = std::max(splitting_max_copy_file_size, rep->_split_states.splitting_copy_file_size); + + continue; } } - METRIC_VAR_SET(learning_replicas, learning_count); + METRIC_VAR_SET(inactive_replicas, status_counts[partition_status::PS_INACTIVE]); + METRIC_VAR_SET(error_replicas, status_counts[partition_status::PS_ERROR]); + METRIC_VAR_SET(primary_replicas, status_counts[partition_status::PS_PRIMARY]); + METRIC_VAR_SET(secondary_replicas, status_counts[partition_status::PS_SECONDARY]); + METRIC_VAR_SET(learning_replicas, status_counts[partition_status::PS_POTENTIAL_SECONDARY]); METRIC_VAR_SET(learning_replicas_max_duration_ms, learning_max_duration_time_ms); METRIC_VAR_SET(learning_replicas_max_copy_file_bytes, learning_max_copy_file_size); METRIC_VAR_SET(bulk_load_running_count, bulk_load_running_count); METRIC_VAR_SET(bulk_load_ingestion_max_duration_ms, bulk_load_max_ingestion_time_ms); METRIC_VAR_SET(bulk_load_max_duration_ms, bulk_load_max_duration_time_ms); - METRIC_VAR_SET(splitting_replicas, splitting_count); + METRIC_VAR_SET(splitting_replicas, status_counts[partition_status::PS_PARTITION_SPLIT]); METRIC_VAR_SET(splitting_replicas_max_duration_ms, splitting_max_duration_time_ms); METRIC_VAR_SET(splitting_replicas_async_learn_max_duration_ms, splitting_max_async_learn_time_ms); diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index 09f1e624bb..328e975024 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -528,6 +528,10 @@ class replica_stub : public serverlet, public ref_counter METRIC_VAR_DECLARE_gauge_int64(opening_replicas); METRIC_VAR_DECLARE_gauge_int64(closing_replicas); + METRIC_VAR_DECLARE_gauge_int64(inactive_replicas); + METRIC_VAR_DECLARE_gauge_int64(error_replicas); + METRIC_VAR_DECLARE_gauge_int64(primary_replicas); + METRIC_VAR_DECLARE_gauge_int64(secondary_replicas); METRIC_VAR_DECLARE_gauge_int64(learning_replicas); METRIC_VAR_DECLARE_gauge_int64(learning_replicas_max_duration_ms); METRIC_VAR_DECLARE_gauge_int64(learning_replicas_max_copy_file_bytes);