Skip to content

Commit

Permalink
feat(new_metrics): show estimated number of keys by shell `count_data…
Browse files Browse the repository at this point in the history
…` command based on new metrics (apache#1920)
  • Loading branch information
empiredan authored Feb 27, 2024
1 parent 4bd798a commit 550da57
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 30 deletions.
11 changes: 7 additions & 4 deletions src/client_lib/pegasus_client_factory_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "runtime/app_model.h"
#include "runtime/tool_api.h"
#include "utils/fmt_logging.h"
#include "utils/strings.h"
#include "utils/zlocks.h"

namespace pegasus {
Expand Down Expand Up @@ -62,11 +63,12 @@ bool pegasus_client_factory_impl::initialize(const char *config_file)
pegasus_client *pegasus_client_factory_impl::get_client(const char *cluster_name,
const char *app_name)
{
if (cluster_name == nullptr || cluster_name[0] == '\0') {
if (dsn::utils::is_empty(cluster_name)) {
LOG_ERROR("invalid parameter 'cluster_name'");
return nullptr;
}
if (app_name == nullptr || app_name[0] == '\0') {

if (dsn::utils::is_empty(app_name)) {
LOG_ERROR("invalid parameter 'app_name'");
return nullptr;
}
Expand All @@ -88,5 +90,6 @@ pegasus_client *pegasus_client_factory_impl::get_client(const char *cluster_name

return it2->second;
}
}
} // namespace

} // namespace client
} // namespace pegasus
36 changes: 28 additions & 8 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -811,16 +811,32 @@ class aggregate_stats_calcs

#undef DEF_CALC_CREATOR

// Perform the chosen aggregations on the fetched metrics.
#define CALC_ACCUM_STATS(entities) \
do { \
if (_sums) { \
RETURN_NOT_OK(_sums->add_assign(entities)); \
} \
} while (0)

// Perform the chosen accum aggregations on the fetched metrics.
dsn::error_s aggregate_metrics(const std::string &json_string)
{
DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, query_snapshot);

CALC_ACCUM_STATS(query_snapshot.entities);

return dsn::error_s::ok();
}

// Perform all of the chosen aggregations (both accum and delta) on the fetched metrics.
dsn::error_s aggregate_metrics(const std::string &json_string_start,
const std::string &json_string_end)
{
DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES(
json_string_start, json_string_end, query_snapshot_start, query_snapshot_end);

if (_sums) {
RETURN_NOT_OK(_sums->add_assign(query_snapshot_end.entities));
}
// Apply ending sample to the accum aggregations.
CALC_ACCUM_STATS(query_snapshot_end.entities);

const std::array deltas_list = {&_increases, &_rates};
for (const auto stats : deltas_list) {
Expand All @@ -839,6 +855,8 @@ class aggregate_stats_calcs
return dsn::error_s::ok();
}

#undef CALC_ACCUM_STATS

private:
DISALLOW_COPY_AND_ASSIGN(aggregate_stats_calcs);

Expand Down Expand Up @@ -1335,7 +1353,8 @@ inline stat_var_map create_rates(row_data &row)

#undef BIND_ROW

// Create all aggregations for the table-level stats.
// Given all tables, create all aggregations needed for the table-level stats. All selected
// partitions should have their primary replicas on this node.
inline std::unique_ptr<aggregate_stats_calcs> create_table_aggregate_stats_calcs(
const std::map<int32_t, std::vector<dsn::partition_configuration>> &table_partitions,
const dsn::rpc_address &node,
Expand Down Expand Up @@ -1379,7 +1398,8 @@ inline std::unique_ptr<aggregate_stats_calcs> create_table_aggregate_stats_calcs
return calcs;
}

// Create all aggregations for the partition-level stats.
// Given a table and all of its partitions, create all aggregations needed for the partition-level
// stats. All selected partitions should have their primary replicas on this node.
inline std::unique_ptr<aggregate_stats_calcs>
create_partition_aggregate_stats_calcs(const int32_t table_id,
const std::vector<dsn::partition_configuration> &partitions,
Expand Down Expand Up @@ -1681,7 +1701,7 @@ get_table_stats(shell_context *sc, uint32_t sample_interval_ms, std::vector<row_
return false;
}

const auto query_string = row_data_filters().to_query_string();
const auto &query_string = row_data_filters().to_query_string();
const auto &results_start = get_metrics(nodes, query_string);
std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
const auto &results_end = get_metrics(nodes, query_string);
Expand Down Expand Up @@ -1741,7 +1761,7 @@ inline bool get_partition_stats(shell_context *sc,
}
CHECK_EQ(partitions.size(), partition_count);

const auto query_string = row_data_filters(table_id).to_query_string();
const auto &query_string = row_data_filters(table_id).to_query_string();
const auto &results_start = get_metrics(nodes, query_string);
std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
const auto &results_end = get_metrics(nodes, query_string);
Expand Down
114 changes: 101 additions & 13 deletions src/shell/commands/data_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <vector>

#include "client/replication_ddl_client.h"
#include "common/gpid.h"
#include "dsn.layer2_types.h"
#include "geo/lib/geo_client.h"
#include "idl_utils.h"
Expand All @@ -60,8 +61,10 @@
#include "utils/blob.h"
#include "utils/defer.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/metrics.h"
#include "utils/output_utils.h"
#include "utils/string_conv.h"

Expand Down Expand Up @@ -2212,6 +2215,93 @@ bool clear_data(command_executor *e, shell_context *sc, arguments args)
return true;
}

namespace {

inline dsn::metric_filters rdb_estimated_keys_filters(int32_t table_id)
{
dsn::metric_filters filters;
filters.with_metric_fields = {dsn::kMetricNameField, dsn::kMetricSingleValueField};
filters.entity_types = {"replica"};
filters.entity_attrs = {"table_id", std::to_string(table_id)};
filters.entity_metrics = {"rdb_estimated_keys"};
return filters;
}

// Given a table and all of its partitions, aggregate partition-level stats for rdb_estimated_keys.
// All selected partitions should have their primary replicas on this node.
std::unique_ptr<aggregate_stats_calcs>
create_rdb_estimated_keys_stats_calcs(const int32_t table_id,
const std::vector<dsn::partition_configuration> &partitions,
const dsn::rpc_address &node,
const std::string &entity_type,
std::vector<row_data> &rows)
{
CHECK_EQ(rows.size(), partitions.size());

partition_stat_map sums;
for (size_t i = 0; i < rows.size(); ++i) {
if (partitions[i].primary != node) {
// Ignore once the replica of the metrics is not the primary of the partition.
continue;
}

// Add (table id, partition_id, metric_name) as dimensions.
sums.emplace(dsn::gpid(table_id, i),
stat_var_map({{"rdb_estimated_keys", &rows[i].rdb_estimate_num_keys}}));
}

auto calcs = std::make_unique<aggregate_stats_calcs>();
calcs->create_sums<partition_aggregate_stats>(entity_type, std::move(sums));
return calcs;
}

// Aggregate the partition-level rdb_estimated_keys for the specified table.
bool get_rdb_estimated_keys_stats(shell_context *sc,
const std::string &table_name,
std::vector<row_data> &rows)
{
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
LOG_ERROR("get replica server node list failed");
return false;
}

int32_t table_id = 0;
int32_t partition_count = 0;
std::vector<dsn::partition_configuration> partitions;
const auto &err = sc->ddl_client->list_app(table_name, table_id, partition_count, partitions);
if (err != ::dsn::ERR_OK) {
LOG_ERROR("list app {} failed, error = {}", table_name, err);
return false;
}
CHECK_EQ(partitions.size(), partition_count);

const auto &results =
get_metrics(nodes, rdb_estimated_keys_filters(table_id).to_query_string());

rows.clear();
rows.reserve(partition_count);
for (int32_t i = 0; i < partition_count; ++i) {
rows.emplace_back(std::to_string(i));
}

for (size_t i = 0; i < nodes.size(); ++i) {
RETURN_SHELL_IF_GET_METRICS_FAILED(
results[i], nodes[i], "rdb_estimated_keys for table(id={})", table_id);

auto calcs = create_rdb_estimated_keys_stats_calcs(
table_id, partitions, nodes[i].address, "replica", rows);
RETURN_SHELL_IF_PARSE_METRICS_FAILED(calcs->aggregate_metrics(results[i].body()),
nodes[i],
"rdb_estimated_keys for table(id={})",
table_id);
}

return true;
}

} // anonymous namespace

bool count_data(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"precise", no_argument, 0, 'c'},
Expand Down Expand Up @@ -2352,20 +2442,18 @@ bool count_data(command_executor *e, shell_context *sc, arguments args)
return false;
}

// get estimate key number
std::vector<row_data> rows;
std::string app_name = sc->pg_client->get_app_name();
// TODO(wangdan): no need to use get_app_stat since only rdb_estimate_num_keys is needed.
// Would be refactored later.
// if (!get_app_stat(sc, app_name, rows)) {
// fprintf(stderr, "ERROR: query app stat from server failed");
// return true;
// }

rows.resize(rows.size() + 1);
row_data &sum = rows.back();
sum.row_name = "(total:" + std::to_string(rows.size() - 1) + ")";
for (int i = 0; i < rows.size() - 1; ++i) {
const std::string table_name(sc->pg_client->get_app_name());
CHECK(!table_name.empty(), "table_name must be non-empty, see data_operations()");

if (!get_rdb_estimated_keys_stats(sc, table_name, rows)) {
fprintf(stderr, "ERROR: get rdb_estimated_keys stats failed");
return true;
}

rows.emplace_back(fmt::format("(total:{})", rows.size() - 1));
auto &sum = rows.back();
for (size_t i = 0; i < rows.size() - 1; ++i) {
const row_data &row = rows[i];
sum.rdb_estimate_num_keys += row.rdb_estimate_num_keys;
}
Expand Down
2 changes: 1 addition & 1 deletion src/shell/commands/node_management.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
return true;
}

const auto query_string = rw_requests_filters().to_query_string();
const auto &query_string = rw_requests_filters().to_query_string();
const auto &results_start = get_metrics(nodes, query_string);
std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_nodes_sample_interval_ms));
const auto &results_end = get_metrics(nodes, query_string);
Expand Down
7 changes: 3 additions & 4 deletions src/shell/commands/table_management.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,9 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args)
return true;
}

rows.resize(rows.size() + 1);
row_data &sum = rows.back();
sum.row_name = "(total:" + std::to_string(rows.size() - 1) + ")";
for (int i = 0; i < rows.size() - 1; ++i) {
rows.emplace_back(fmt::format("(total:{})", rows.size() - 1));
auto &sum = rows.back();
for (size_t i = 0; i < rows.size() - 1; ++i) {
row_data &row = rows[i];
sum.partition_count += row.partition_count;
sum.get_qps += row.get_qps;
Expand Down

0 comments on commit 550da57

Please sign in to comment.