From 550da576c66cceb243382324c4f6aece2efb1cce Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Tue, 27 Feb 2024 15:31:31 +0800 Subject: [PATCH] feat(new_metrics): show estimated number of keys by shell `count_data` command based on new metrics (#1920) --- .../pegasus_client_factory_impl.cpp | 11 +- src/shell/command_helper.h | 36 ++++-- src/shell/commands/data_operations.cpp | 114 ++++++++++++++++-- src/shell/commands/node_management.cpp | 2 +- src/shell/commands/table_management.cpp | 7 +- 5 files changed, 140 insertions(+), 30 deletions(-) diff --git a/src/client_lib/pegasus_client_factory_impl.cpp b/src/client_lib/pegasus_client_factory_impl.cpp index a590e52553..b98cb139a1 100644 --- a/src/client_lib/pegasus_client_factory_impl.cpp +++ b/src/client_lib/pegasus_client_factory_impl.cpp @@ -26,6 +26,7 @@ #include "runtime/app_model.h" #include "runtime/tool_api.h" #include "utils/fmt_logging.h" +#include "utils/strings.h" #include "utils/zlocks.h" namespace pegasus { @@ -62,11 +63,12 @@ bool pegasus_client_factory_impl::initialize(const char *config_file) pegasus_client *pegasus_client_factory_impl::get_client(const char *cluster_name, const char *app_name) { - if (cluster_name == nullptr || cluster_name[0] == '\0') { + if (dsn::utils::is_empty(cluster_name)) { LOG_ERROR("invalid parameter 'cluster_name'"); return nullptr; } - if (app_name == nullptr || app_name[0] == '\0') { + + if (dsn::utils::is_empty(app_name)) { LOG_ERROR("invalid parameter 'app_name'"); return nullptr; } @@ -88,5 +90,6 @@ pegasus_client *pegasus_client_factory_impl::get_client(const char *cluster_name return it2->second; } -} -} // namespace + +} // namespace client +} // namespace pegasus diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index 54c8ed489e..2979c1a06a 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -811,16 +811,32 @@ class aggregate_stats_calcs #undef DEF_CALC_CREATOR - // Perform the chosen aggregations on the fetched metrics. +#define CALC_ACCUM_STATS(entities) \ + do { \ + if (_sums) { \ + RETURN_NOT_OK(_sums->add_assign(entities)); \ + } \ + } while (0) + + // Perform the chosen accum aggregations on the fetched metrics. + dsn::error_s aggregate_metrics(const std::string &json_string) + { + DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, query_snapshot); + + CALC_ACCUM_STATS(query_snapshot.entities); + + return dsn::error_s::ok(); + } + + // Perform all of the chosen aggregations (both accum and delta) on the fetched metrics. dsn::error_s aggregate_metrics(const std::string &json_string_start, const std::string &json_string_end) { DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES( json_string_start, json_string_end, query_snapshot_start, query_snapshot_end); - if (_sums) { - RETURN_NOT_OK(_sums->add_assign(query_snapshot_end.entities)); - } + // Apply ending sample to the accum aggregations. + CALC_ACCUM_STATS(query_snapshot_end.entities); const std::array deltas_list = {&_increases, &_rates}; for (const auto stats : deltas_list) { @@ -839,6 +855,8 @@ class aggregate_stats_calcs return dsn::error_s::ok(); } +#undef CALC_ACCUM_STATS + private: DISALLOW_COPY_AND_ASSIGN(aggregate_stats_calcs); @@ -1335,7 +1353,8 @@ inline stat_var_map create_rates(row_data &row) #undef BIND_ROW -// Create all aggregations for the table-level stats. +// Given all tables, create all aggregations needed for the table-level stats. All selected +// partitions should have their primary replicas on this node. inline std::unique_ptr create_table_aggregate_stats_calcs( const std::map> &table_partitions, const dsn::rpc_address &node, @@ -1379,7 +1398,8 @@ inline std::unique_ptr create_table_aggregate_stats_calcs return calcs; } -// Create all aggregations for the partition-level stats. +// Given a table and all of its partitions, create all aggregations needed for the partition-level +// stats. All selected partitions should have their primary replicas on this node. inline std::unique_ptr create_partition_aggregate_stats_calcs(const int32_t table_id, const std::vector &partitions, @@ -1681,7 +1701,7 @@ get_table_stats(shell_context *sc, uint32_t sample_interval_ms, std::vector #include "client/replication_ddl_client.h" +#include "common/gpid.h" #include "dsn.layer2_types.h" #include "geo/lib/geo_client.h" #include "idl_utils.h" @@ -60,8 +61,10 @@ #include "utils/blob.h" #include "utils/defer.h" #include "utils/error_code.h" +#include "utils/errors.h" #include "utils/flags.h" #include "utils/fmt_logging.h" +#include "utils/metrics.h" #include "utils/output_utils.h" #include "utils/string_conv.h" @@ -2212,6 +2215,93 @@ bool clear_data(command_executor *e, shell_context *sc, arguments args) return true; } +namespace { + +inline dsn::metric_filters rdb_estimated_keys_filters(int32_t table_id) +{ + dsn::metric_filters filters; + filters.with_metric_fields = {dsn::kMetricNameField, dsn::kMetricSingleValueField}; + filters.entity_types = {"replica"}; + filters.entity_attrs = {"table_id", std::to_string(table_id)}; + filters.entity_metrics = {"rdb_estimated_keys"}; + return filters; +} + +// Given a table and all of its partitions, aggregate partition-level stats for rdb_estimated_keys. +// All selected partitions should have their primary replicas on this node. +std::unique_ptr +create_rdb_estimated_keys_stats_calcs(const int32_t table_id, + const std::vector &partitions, + const dsn::rpc_address &node, + const std::string &entity_type, + std::vector &rows) +{ + CHECK_EQ(rows.size(), partitions.size()); + + partition_stat_map sums; + for (size_t i = 0; i < rows.size(); ++i) { + if (partitions[i].primary != node) { + // Ignore once the replica of the metrics is not the primary of the partition. + continue; + } + + // Add (table id, partition_id, metric_name) as dimensions. + sums.emplace(dsn::gpid(table_id, i), + stat_var_map({{"rdb_estimated_keys", &rows[i].rdb_estimate_num_keys}})); + } + + auto calcs = std::make_unique(); + calcs->create_sums(entity_type, std::move(sums)); + return calcs; +} + +// Aggregate the partition-level rdb_estimated_keys for the specified table. +bool get_rdb_estimated_keys_stats(shell_context *sc, + const std::string &table_name, + std::vector &rows) +{ + std::vector nodes; + if (!fill_nodes(sc, "replica-server", nodes)) { + LOG_ERROR("get replica server node list failed"); + return false; + } + + int32_t table_id = 0; + int32_t partition_count = 0; + std::vector partitions; + const auto &err = sc->ddl_client->list_app(table_name, table_id, partition_count, partitions); + if (err != ::dsn::ERR_OK) { + LOG_ERROR("list app {} failed, error = {}", table_name, err); + return false; + } + CHECK_EQ(partitions.size(), partition_count); + + const auto &results = + get_metrics(nodes, rdb_estimated_keys_filters(table_id).to_query_string()); + + rows.clear(); + rows.reserve(partition_count); + for (int32_t i = 0; i < partition_count; ++i) { + rows.emplace_back(std::to_string(i)); + } + + for (size_t i = 0; i < nodes.size(); ++i) { + RETURN_SHELL_IF_GET_METRICS_FAILED( + results[i], nodes[i], "rdb_estimated_keys for table(id={})", table_id); + + auto calcs = create_rdb_estimated_keys_stats_calcs( + table_id, partitions, nodes[i].address, "replica", rows); + RETURN_SHELL_IF_PARSE_METRICS_FAILED(calcs->aggregate_metrics(results[i].body()), + nodes[i], + "rdb_estimated_keys for table(id={})", + table_id); + } + + return true; +} + +} // anonymous namespace + bool count_data(command_executor *e, shell_context *sc, arguments args) { static struct option long_options[] = {{"precise", no_argument, 0, 'c'}, @@ -2352,20 +2442,18 @@ bool count_data(command_executor *e, shell_context *sc, arguments args) return false; } - // get estimate key number std::vector rows; - std::string app_name = sc->pg_client->get_app_name(); - // TODO(wangdan): no need to use get_app_stat since only rdb_estimate_num_keys is needed. - // Would be refactored later. - // if (!get_app_stat(sc, app_name, rows)) { - // fprintf(stderr, "ERROR: query app stat from server failed"); - // return true; - // } - - rows.resize(rows.size() + 1); - row_data &sum = rows.back(); - sum.row_name = "(total:" + std::to_string(rows.size() - 1) + ")"; - for (int i = 0; i < rows.size() - 1; ++i) { + const std::string table_name(sc->pg_client->get_app_name()); + CHECK(!table_name.empty(), "table_name must be non-empty, see data_operations()"); + + if (!get_rdb_estimated_keys_stats(sc, table_name, rows)) { + fprintf(stderr, "ERROR: get rdb_estimated_keys stats failed"); + return true; + } + + rows.emplace_back(fmt::format("(total:{})", rows.size() - 1)); + auto &sum = rows.back(); + for (size_t i = 0; i < rows.size() - 1; ++i) { const row_data &row = rows[i]; sum.rdb_estimate_num_keys += row.rdb_estimate_num_keys; } diff --git a/src/shell/commands/node_management.cpp b/src/shell/commands/node_management.cpp index 63e990b29c..52910671f5 100644 --- a/src/shell/commands/node_management.cpp +++ b/src/shell/commands/node_management.cpp @@ -390,7 +390,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args) return true; } - const auto query_string = rw_requests_filters().to_query_string(); + const auto &query_string = rw_requests_filters().to_query_string(); const auto &results_start = get_metrics(nodes, query_string); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_nodes_sample_interval_ms)); const auto &results_end = get_metrics(nodes, query_string); diff --git a/src/shell/commands/table_management.cpp b/src/shell/commands/table_management.cpp index 0458ed3cd1..6cede043dd 100644 --- a/src/shell/commands/table_management.cpp +++ b/src/shell/commands/table_management.cpp @@ -532,10 +532,9 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args) return true; } - rows.resize(rows.size() + 1); - row_data &sum = rows.back(); - sum.row_name = "(total:" + std::to_string(rows.size() - 1) + ")"; - for (int i = 0; i < rows.size() - 1; ++i) { + rows.emplace_back(fmt::format("(total:{})", rows.size() - 1)); + auto &sum = rows.back(); + for (size_t i = 0; i < rows.size() - 1; ++i) { row_data &row = rows[i]; sum.partition_count += row.partition_count; sum.get_qps += row.get_qps;