Skip to content

Commit

Permalink
feat(new_metrics): add table-level metric entity and migrate table-le…
Browse files Browse the repository at this point in the history
…vel metrics for server_state of meta (#1431)

#1331

In perf counters, all metrics of server_state are server-level, for example,
the number of healthy partitions among all tables of a pegasus cluster.

However, sometimes this is not enough. For example, the metric shows
that there are 4 unwritable partitions: the 4 unwritable partitions might
belong to different tables; or, they might belong to one table.

Therefore, these server-level metrics could be changed to table-level.
This will provide us with the status of each table. On the other hand,
once server-level metrics is needed, just aggregate on table-level ones.

The metrics of server_state that are migrated and changed to table-level
include: The number of dead, unreadable, unwritable, writable-ill, and
healthy partitions among all partitions of a table, the number of times
the configuration has been changed and the number of times the status
of partition has been changed to unwritable or writable for a table.

To implement table-level metrics, table-level metric entity is also added.
  • Loading branch information
empiredan authored and wangdan committed Jun 5, 2023
1 parent 24e734c commit 18cb2cb
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 63 deletions.
87 changes: 35 additions & 52 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@
#include "meta/meta_service.h"
#include "meta/meta_state_service.h"
#include "meta/partition_guardian.h"
#include "meta/table_metrics.h"
#include "meta_admin_types.h"
#include "meta_bulk_load_service.h"
#include "metadata_types.h"
#include "perf_counter/perf_counter.h"
#include "replica_admin_types.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_address.h"
Expand All @@ -85,12 +85,9 @@
#include "utils/config_api.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/singleton.h"
#include "utils/string_conv.h"
#include "utils/strings.h"

using namespace dsn;

namespace dsn {
namespace replication {
DSN_DEFINE_bool(meta_server,
Expand Down Expand Up @@ -214,41 +211,6 @@ void server_state::initialize(meta_service *meta_svc, const std::string &apps_ro
_apps_root = apps_root;
_add_secondary_enable_flow_control = FLAGS_add_secondary_enable_flow_control;
_add_secondary_max_count_for_one_node = FLAGS_add_secondary_max_count_for_one_node;

_dead_partition_count.init_app_counter("eon.server_state",
"dead_partition_count",
COUNTER_TYPE_NUMBER,
"current dead partition count");
_unreadable_partition_count.init_app_counter("eon.server_state",
"unreadable_partition_count",
COUNTER_TYPE_NUMBER,
"current unreadable partition count");
_unwritable_partition_count.init_app_counter("eon.server_state",
"unwritable_partition_count",
COUNTER_TYPE_NUMBER,
"current unwritable partition count");
_writable_ill_partition_count.init_app_counter("eon.server_state",
"writable_ill_partition_count",
COUNTER_TYPE_NUMBER,
"current writable ill partition count");
_healthy_partition_count.init_app_counter("eon.server_state",
"healthy_partition_count",
COUNTER_TYPE_NUMBER,
"current healthy partition count");
_recent_update_config_count.init_app_counter("eon.server_state",
"recent_update_config_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"update configuration count in the recent period");
_recent_partition_change_unwritable_count.init_app_counter(
"eon.server_state",
"recent_partition_change_unwritable_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"partition change to unwritable count in the recent period");
_recent_partition_change_writable_count.init_app_counter(
"eon.server_state",
"recent_partition_change_writable_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"partition change to writable count in the recent period");
}

bool server_state::spin_wait_staging(int timeout_seconds)
Expand Down Expand Up @@ -529,12 +491,14 @@ error_code server_state::initialize_default_apps()
error_code server_state::sync_apps_to_remote_storage()
{
_exist_apps.clear();
_table_metric_entities.clear_entities();
for (auto &kv_pair : _all_apps) {
if (kv_pair.second->status == app_status::AS_CREATING) {
CHECK(_exist_apps.find(kv_pair.second->app_name) == _exist_apps.end(),
"invalid app name, name = {}",
kv_pair.second->app_name);
_exist_apps.emplace(kv_pair.second->app_name, kv_pair.second);
_table_metric_entities.create_entity(kv_pair.first);
}
}

Expand Down Expand Up @@ -584,6 +548,7 @@ error_code server_state::sync_apps_to_remote_storage()

if (err != ERR_OK) {
_exist_apps.clear();
_table_metric_entities.clear_entities();
return err;
}
for (auto &kv : _all_apps) {
Expand Down Expand Up @@ -699,6 +664,7 @@ dsn::error_code server_state::sync_apps_from_remote_storage()
if (app->status == app_status::AS_AVAILABLE) {
app->status = app_status::AS_CREATING;
_exist_apps.emplace(app->app_name, app);
_table_metric_entities.create_entity(app->app_id);
} else if (app->status == app_status::AS_DROPPED) {
app->status = app_status::AS_DROPPING;
} else {
Expand Down Expand Up @@ -726,6 +692,7 @@ dsn::error_code server_state::sync_apps_from_remote_storage()

_all_apps.clear();
_exist_apps.clear();
_table_metric_entities.clear_entities();

std::string transaction_state;
storage
Expand Down Expand Up @@ -1197,6 +1164,7 @@ void server_state::create_app(dsn::message_ex *msg)

_all_apps.emplace(app->app_id, app);
_exist_apps.emplace(request.app_name, app);
_table_metric_entities.create_entity(app->app_id);
}
}

Expand All @@ -1214,6 +1182,7 @@ void server_state::do_app_drop(std::shared_ptr<app_state> &app)
if (ERR_OK == ec) {
zauto_write_lock l(_lock);
_exist_apps.erase(app->app_name);
_table_metric_entities.remove_entity(app->app_id);
for (int i = 0; i < app->partition_count; ++i) {
drop_partition(app, i);
}
Expand Down Expand Up @@ -1425,6 +1394,7 @@ void server_state::recall_app(dsn::message_ex *msg)
target_app->helpers->pending_response = msg;

_exist_apps.emplace(target_app->app_name, target_app);
_table_metric_entities.create_entity(target_app->app_id);
}
}
}
Expand Down Expand Up @@ -1465,7 +1435,7 @@ void server_state::send_proposal(rpc_address target, const configuration_update_
proposal.node);
dsn::message_ex *msg =
dsn::message_ex::create_request(RPC_CONFIG_PROPOSAL, 0, proposal.config.pid.thread_hash());
::marshall(msg, proposal);
dsn::marshall(msg, proposal);
_meta_svc->send_message(target, msg);
}

Expand Down Expand Up @@ -1649,12 +1619,15 @@ void server_state::update_configuration_locally(
_config_change_subscriber(_all_apps);
}

_recent_update_config_count->increment();
METRIC_CALL_TABLE_INCREMENT_METHOD(
_table_metric_entities, partition_configuration_changes, app.app_id);
if (old_health_status >= HS_WRITABLE_ILL && new_health_status < HS_WRITABLE_ILL) {
_recent_partition_change_unwritable_count->increment();
METRIC_CALL_TABLE_INCREMENT_METHOD(
_table_metric_entities, unwritable_partition_changes, app.app_id);
}
if (old_health_status < HS_WRITABLE_ILL && new_health_status >= HS_WRITABLE_ILL) {
_recent_partition_change_writable_count->increment();
METRIC_CALL_TABLE_INCREMENT_METHOD(
_table_metric_entities, writable_partition_changes, app.app_id);
}
}

Expand Down Expand Up @@ -2446,25 +2419,35 @@ bool server_state::can_run_balancer()
return true;
}

void server_state::update_partition_perf_counter()
void server_state::update_partition_metrics()
{
int counters[HS_MAX_VALUE];
::memset(counters, 0, sizeof(counters));
auto func = [&](const std::shared_ptr<app_state> &app) {
int counters[HS_MAX_VALUE] = {0};

int min_2pc_count =
_meta_svc->get_options().app_mutation_2pc_min_replica_count(app->max_replica_count);
for (unsigned int i = 0; i != app->partition_count; ++i) {
health_status st = partition_health_status(app->partitions[i], min_2pc_count);
counters[st]++;
}

METRIC_CALL_TABLE_SET_METHOD(
_table_metric_entities, dead_partitions, app->app_id, counters[HS_DEAD]);
METRIC_CALL_TABLE_SET_METHOD(
_table_metric_entities, unreadable_partitions, app->app_id, counters[HS_UNREADABLE]);
METRIC_CALL_TABLE_SET_METHOD(
_table_metric_entities, unwritable_partitions, app->app_id, counters[HS_UNWRITABLE]);
METRIC_CALL_TABLE_SET_METHOD(_table_metric_entities,
writable_ill_partitions,
app->app_id,
counters[HS_WRITABLE_ILL]);
METRIC_CALL_TABLE_SET_METHOD(
_table_metric_entities, healthy_partitions, app->app_id, counters[HS_HEALTHY]);

return true;
};

for_each_available_app(_all_apps, func);
_dead_partition_count->set(counters[HS_DEAD]);
_unreadable_partition_count->set(counters[HS_UNREADABLE]);
_unwritable_partition_count->set(counters[HS_UNWRITABLE]);
_writable_ill_partition_count->set(counters[HS_WRITABLE_ILL]);
_healthy_partition_count->set(counters[HS_HEALTHY]);
}

bool server_state::check_all_partitions()
Expand All @@ -2475,7 +2458,7 @@ bool server_state::check_all_partitions()

zauto_write_lock l(_lock);

update_partition_perf_counter();
update_partition_metrics();

// first the cure stage
if (level <= meta_function_level::fl_freezed) {
Expand Down
13 changes: 3 additions & 10 deletions src/meta/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@
#include "dsn.layer2_types.h"
#include "meta/meta_rpc_types.h"
#include "meta_data.h"
#include "perf_counter/perf_counter_wrapper.h"
#include "runtime/task/task.h"
#include "runtime/task/task_tracker.h"
#include "table_metrics.h"
#include "utils/error_code.h"
#include "utils/zlocks.h"

Expand Down Expand Up @@ -230,7 +230,7 @@ class server_state
bool can_run_balancer();

// user should lock it first
void update_partition_perf_counter();
void update_partition_metrics();

error_code dump_app_states(const char *local_path,
const std::function<app_state *()> &iterator);
Expand Down Expand Up @@ -437,14 +437,7 @@ class server_state
int32_t _add_secondary_max_count_for_one_node;
std::vector<std::unique_ptr<command_deregister>> _cmds;

perf_counter_wrapper _dead_partition_count;
perf_counter_wrapper _unreadable_partition_count;
perf_counter_wrapper _unwritable_partition_count;
perf_counter_wrapper _writable_ill_partition_count;
perf_counter_wrapper _healthy_partition_count;
perf_counter_wrapper _recent_update_config_count;
perf_counter_wrapper _recent_partition_change_unwritable_count;
perf_counter_wrapper _recent_partition_change_writable_count;
table_metric_entities _table_metric_entities;
};

} // namespace replication
Expand Down
2 changes: 2 additions & 0 deletions src/meta/server_state_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "dsn.layer2_types.h"
#include "meta/meta_data.h"
#include "meta/meta_rpc_types.h"
#include "meta/table_metrics.h"
#include "meta_admin_types.h"
#include "meta_service.h"
#include "runtime/rpc/rpc_address.h"
Expand Down Expand Up @@ -147,6 +148,7 @@ std::pair<dsn::error_code, std::shared_ptr<app_state>> server_state::restore_app

_all_apps.emplace(app->app_id, app);
_exist_apps.emplace(info.app_name, app);
_table_metric_entities.create_entity(app->app_id);
}
}
// TODO: using one single env to replace
Expand Down
Loading

0 comments on commit 18cb2cb

Please sign in to comment.