Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(collector): add statistics for partition hotspot #427

Closed
wants to merge 22 commits into from
98 changes: 21 additions & 77 deletions src/server/info_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,88 +130,30 @@ void info_collector::stop() { _tracker.cancel_outstanding_tasks(); }
void info_collector::on_app_stat()
{
ddebug("start to stat apps");
std::vector<row_data> rows;
if (!get_app_stat(&_shell_context, "", rows)) {
std::map<std::string, std::vector<row_data>> all_rows;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些只是重构吧?重构的先单独提一个PR便于review吧

if (!get_app_partition_stat(&_shell_context, all_rows)) {
derror("call get_app_stat() failed");
return;
}
std::vector<double> read_qps;
std::vector<double> write_qps;
rows.resize(rows.size() + 1);
read_qps.resize(rows.size());
write_qps.resize(rows.size());
row_data &all = rows.back();
all.row_name = "_all_";
for (int i = 0; i < rows.size() - 1; ++i) {
row_data &row = rows[i];
all.get_qps += row.get_qps;
all.multi_get_qps += row.multi_get_qps;
all.put_qps += row.put_qps;
all.multi_put_qps += row.multi_put_qps;
all.remove_qps += row.remove_qps;
all.multi_remove_qps += row.multi_remove_qps;
all.incr_qps += row.incr_qps;
all.check_and_set_qps += row.check_and_set_qps;
all.check_and_mutate_qps += row.check_and_mutate_qps;
all.scan_qps += row.scan_qps;
all.recent_read_cu += row.recent_read_cu;
all.recent_write_cu += row.recent_write_cu;
all.recent_expire_count += row.recent_expire_count;
all.recent_filter_count += row.recent_filter_count;
all.recent_abnormal_count += row.recent_abnormal_count;
all.recent_write_throttling_delay_count += row.recent_write_throttling_delay_count;
all.recent_write_throttling_reject_count += row.recent_write_throttling_reject_count;
all.storage_mb += row.storage_mb;
all.storage_count += row.storage_count;
all.rdb_block_cache_hit_count += row.rdb_block_cache_hit_count;
all.rdb_block_cache_total_count += row.rdb_block_cache_total_count;
all.rdb_index_and_filter_blocks_mem_usage += row.rdb_index_and_filter_blocks_mem_usage;
all.rdb_memtable_mem_usage += row.rdb_memtable_mem_usage;
read_qps[i] = row.get_qps + row.multi_get_qps + row.scan_qps;
write_qps[i] = row.put_qps + row.multi_put_qps + row.remove_qps + row.multi_remove_qps +
row.incr_qps + row.check_and_set_qps + row.check_and_mutate_qps;
}
read_qps[read_qps.size() - 1] = all.get_qps + all.multi_get_qps + all.scan_qps;
write_qps[read_qps.size() - 1] = all.put_qps + all.multi_put_qps + all.remove_qps +
all.multi_remove_qps + all.incr_qps + all.check_and_set_qps +
all.check_and_mutate_qps;
for (int i = 0; i < rows.size(); ++i) {
row_data &row = rows[i];
AppStatCounters *counters = get_app_counters(row.row_name);
counters->get_qps->set(row.get_qps);
counters->multi_get_qps->set(row.multi_get_qps);
counters->put_qps->set(row.put_qps);
counters->multi_put_qps->set(row.multi_put_qps);
counters->remove_qps->set(row.remove_qps);
counters->multi_remove_qps->set(row.multi_remove_qps);
counters->incr_qps->set(row.incr_qps);
counters->check_and_set_qps->set(row.check_and_set_qps);
counters->check_and_mutate_qps->set(row.check_and_mutate_qps);
counters->scan_qps->set(row.scan_qps);
counters->recent_read_cu->set(row.recent_read_cu);
counters->recent_write_cu->set(row.recent_write_cu);
counters->recent_expire_count->set(row.recent_expire_count);
counters->recent_filter_count->set(row.recent_filter_count);
counters->recent_abnormal_count->set(row.recent_abnormal_count);
counters->recent_write_throttling_delay_count->set(row.recent_write_throttling_delay_count);
counters->recent_write_throttling_reject_count->set(
row.recent_write_throttling_reject_count);
counters->storage_mb->set(row.storage_mb);
counters->storage_count->set(row.storage_count);
counters->rdb_block_cache_hit_rate->set(
std::abs(row.rdb_block_cache_total_count) < 1e-6
? 0
: row.rdb_block_cache_hit_count / row.rdb_block_cache_total_count * 1000000);
counters->rdb_index_and_filter_blocks_mem_usage->set(
row.rdb_index_and_filter_blocks_mem_usage);
counters->rdb_memtable_mem_usage->set(row.rdb_memtable_mem_usage);
counters->read_qps->set(read_qps[i]);
counters->write_qps->set(write_qps[i]);

row_statistics all_stats("_all_");
for (auto app_rows : all_rows) {
// get statistics data for app
row_statistics app_stats(app_rows.first);
for (auto partition_row : app_rows.second) {
app_stats.calc(partition_row);
}
get_app_counters(app_stats.app_name)->set(app_stats);

// get row data statistics for all of the apps
all_stats.merge(app_stats);
}
get_app_counters(all_stats.app_name)->set(all_stats);

ddebug("stat apps succeed, app_count = %d, total_read_qps = %.2f, total_write_qps = %.2f",
(int)(rows.size() - 1),
read_qps[read_qps.size() - 1],
write_qps[read_qps.size() - 1]);
(int)(all_rows.size() - 1),
all_stats.get_total_read_qps(),
all_stats.get_total_write_qps());
}

info_collector::AppStatCounters *info_collector::get_app_counters(const std::string &app_name)
Expand Down Expand Up @@ -257,6 +199,8 @@ info_collector::AppStatCounters *info_collector::get_app_counters(const std::str
INIT_COUNTER(rdb_memtable_mem_usage);
INIT_COUNTER(read_qps);
INIT_COUNTER(write_qps);
INIT_COUNTER(qps_max_min_scale);
INIT_COUNTER(cu_max_min_scale);
_app_stat_counters[app_name] = counters;
return counters;
}
Expand Down
195 changes: 194 additions & 1 deletion src/server/info_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,204 @@ namespace pegasus {
namespace server {

class result_writer;
static const int HOTSPOT_MAX_MIN_SCALE_THRESHOLD = 10;

class info_collector
{
public:
struct row_statistics
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
{
row_statistics(const std::string &app_name) { this->app_name = app_name; }

double get_total_read_qps() const
{
return total_get_qps + total_multi_get_qps + total_scan_qps;
}

double get_total_write_qps() const
{
return total_put_qps + total_multi_put_qps + total_remove_qps + total_multi_remove_qps +
total_incr_qps + total_check_and_set_qps + total_check_and_mutate_qps;
}

void calc(const row_data &row)
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
{
total_get_qps += row.get_qps;
total_multi_get_qps += row.multi_get_qps;
total_put_qps += row.put_qps;
total_multi_put_qps += row.multi_put_qps;
total_remove_qps += row.remove_qps;
total_multi_remove_qps += row.multi_remove_qps;
total_incr_qps += row.incr_qps;
total_check_and_set_qps += row.check_and_set_qps;
total_check_and_mutate_qps += row.check_and_mutate_qps;
total_scan_qps += row.scan_qps;
total_recent_read_cu += row.recent_read_cu;
total_recent_write_cu += row.recent_write_cu;
total_recent_expire_count += row.recent_expire_count;
total_recent_filter_count += row.recent_filter_count;
total_recent_abnormal_count += row.recent_abnormal_count;
total_recent_write_throttling_delay_count += row.recent_write_throttling_delay_count;
total_recent_write_throttling_reject_count += row.recent_write_throttling_reject_count;
total_storage_mb += row.storage_mb;
total_storage_count += row.storage_count;
total_rdb_block_cache_hit_count += row.rdb_block_cache_hit_count;
total_rdb_block_cache_total_count += row.rdb_block_cache_total_count;
total_rdb_index_and_filter_blocks_mem_usage +=
row.rdb_index_and_filter_blocks_mem_usage;
total_rdb_memtable_mem_usage += row.rdb_memtable_mem_usage;

// get max_total_qps、min_total_qps and the id of this partition which has max_total_qps
double row_total_qps = row.get_total_qps();
min_total_qps = std::min(min_total_qps, row_total_qps);
if (max_total_qps < row_total_qps) {
max_total_qps = row_total_qps;
max_qps_partition_id = row.row_name;
}

// get max_total_cu、min_total_cu and the id of this partition which has max_total_cu
double row_total_cu = row.get_total_cu();
min_total_cu = std::min(min_total_cu, row_total_cu);
if (max_total_cu < row_total_cu) {
max_total_cu = row_total_cu;
max_cu_partition_id = row.row_name;
}
}

void merge(const row_statistics &row_stats)
{
total_get_qps += row_stats.total_get_qps;
total_multi_get_qps += row_stats.total_multi_get_qps;
total_put_qps += row_stats.total_put_qps;
total_multi_put_qps += row_stats.total_multi_put_qps;
total_remove_qps += row_stats.total_remove_qps;
total_multi_remove_qps += row_stats.total_multi_remove_qps;
total_incr_qps += row_stats.total_incr_qps;
total_check_and_set_qps += row_stats.total_check_and_set_qps;
total_check_and_mutate_qps += row_stats.total_check_and_mutate_qps;
total_scan_qps += row_stats.total_scan_qps;
total_recent_read_cu += row_stats.total_recent_read_cu;
total_recent_write_cu += row_stats.total_recent_write_cu;
total_recent_expire_count += row_stats.total_recent_expire_count;
total_recent_filter_count += row_stats.total_recent_filter_count;
total_recent_abnormal_count += row_stats.total_recent_abnormal_count;
total_recent_write_throttling_delay_count +=
row_stats.total_recent_write_throttling_delay_count;
total_recent_write_throttling_reject_count +=
row_stats.total_recent_write_throttling_reject_count;
total_storage_mb += row_stats.total_storage_mb;
total_storage_count += row_stats.total_storage_count;
total_rdb_block_cache_hit_count += row_stats.total_rdb_block_cache_hit_count;
total_rdb_block_cache_total_count += row_stats.total_rdb_block_cache_total_count;
total_rdb_index_and_filter_blocks_mem_usage +=
row_stats.total_rdb_index_and_filter_blocks_mem_usage;
total_rdb_memtable_mem_usage += row_stats.total_rdb_memtable_mem_usage;

// We only need max_total_qps/min_total_qps/max_total_cu/min_total_cu in the same app
if (this->app_name == row_stats.app_name) {
// get max_total_qps、min_total_qps and id of the partition which has max_total_qps
min_total_qps = std::min(min_total_qps, row_stats.min_total_qps);
if (max_total_qps < row_stats.max_total_qps) {
max_total_qps = row_stats.max_total_qps;
max_qps_partition_id = row_stats.max_qps_partition_id;
}

// get max_total_cu、min_total_cu and id of the partition which has max_total_cu
min_total_cu = std::min(min_total_cu, row_stats.min_total_cu);
if (max_total_cu < row_stats.max_total_cu) {
max_total_cu = row_stats.max_total_cu;
max_cu_partition_id = row_stats.max_cu_partition_id;
}
}
}

std::string app_name;
double total_get_qps = 0;
double total_multi_get_qps = 0;
double total_put_qps = 0;
double total_multi_put_qps = 0;
double total_remove_qps = 0;
double total_multi_remove_qps = 0;
double total_incr_qps = 0;
double total_check_and_set_qps = 0;
double total_check_and_mutate_qps = 0;
double total_scan_qps = 0;
double total_recent_read_cu = 0;
double total_recent_write_cu = 0;
double total_recent_expire_count = 0;
double total_recent_filter_count = 0;
double total_recent_abnormal_count = 0;
double total_recent_write_throttling_delay_count = 0;
double total_recent_write_throttling_reject_count = 0;
double total_storage_mb = 0;
double total_storage_count = 0;
double total_rdb_block_cache_hit_count = 0;
double total_rdb_block_cache_total_count = 0;
double total_rdb_index_and_filter_blocks_mem_usage = 0;
double total_rdb_memtable_mem_usage = 0;
double max_total_qps = 0;
double min_total_qps = INT_MAX;
double max_total_cu = 0;
double min_total_cu = INT_MAX;
std::string max_qps_partition_id;
std::string max_cu_partition_id;
};

struct AppStatCounters
{
void set(const row_statistics &row_stats)
{
get_qps->set(row_stats.total_get_qps);
multi_get_qps->set(row_stats.total_multi_get_qps);
put_qps->set(row_stats.total_put_qps);
multi_put_qps->set(row_stats.total_multi_put_qps);
remove_qps->set(row_stats.total_remove_qps);
multi_remove_qps->set(row_stats.total_multi_remove_qps);
incr_qps->set(row_stats.total_incr_qps);
check_and_set_qps->set(row_stats.total_check_and_set_qps);
check_and_mutate_qps->set(row_stats.total_check_and_mutate_qps);
scan_qps->set(row_stats.total_scan_qps);
recent_read_cu->set(row_stats.total_recent_read_cu);
recent_write_cu->set(row_stats.total_recent_write_cu);
recent_expire_count->set(row_stats.total_recent_expire_count);
recent_filter_count->set(row_stats.total_recent_filter_count);
recent_abnormal_count->set(row_stats.total_recent_abnormal_count);
recent_write_throttling_delay_count->set(
row_stats.total_recent_write_throttling_delay_count);
recent_write_throttling_reject_count->set(
row_stats.total_recent_write_throttling_reject_count);
storage_mb->set(row_stats.total_storage_mb);
storage_count->set(row_stats.total_storage_count);
rdb_block_cache_hit_rate->set(
std::abs(row_stats.total_rdb_block_cache_total_count) < 1e-6
? 0
: row_stats.total_rdb_block_cache_hit_count /
row_stats.total_rdb_block_cache_total_count * 1000000);
rdb_index_and_filter_blocks_mem_usage->set(
row_stats.total_rdb_index_and_filter_blocks_mem_usage);
rdb_memtable_mem_usage->set(row_stats.total_rdb_memtable_mem_usage);
read_qps->set(row_stats.get_total_read_qps());
write_qps->set(row_stats.get_total_write_qps());

double qps_scale = row_stats.max_total_qps / std::max(row_stats.min_total_qps, 1.0);
double cu_scale = row_stats.max_total_cu / std::max(row_stats.min_total_cu, 1.0);
qps_max_min_scale->set(qps_scale);
cu_max_min_scale->set(cu_scale);
if (qps_scale >= HOTSPOT_MAX_MIN_SCALE_THRESHOLD) {
ddebug(
"There is a hot spot about qps in app %s(partition id: %s), max/min scale=%d",
row_stats.app_name.c_str(),
row_stats.max_qps_partition_id.c_str(),
qps_scale);
}
if (cu_scale >= HOTSPOT_MAX_MIN_SCALE_THRESHOLD) {
ddebug("There is a hot spot about cu in app %s(partition id: %s), max/min scale=%d",
row_stats.app_name.c_str(),
row_stats.max_cu_partition_id.c_str(),
cu_scale);
}
}

::dsn::perf_counter_wrapper get_qps;
::dsn::perf_counter_wrapper multi_get_qps;
::dsn::perf_counter_wrapper put_qps;
Expand All @@ -50,11 +242,12 @@ class info_collector
::dsn::perf_counter_wrapper storage_mb;
::dsn::perf_counter_wrapper storage_count;
::dsn::perf_counter_wrapper rdb_block_cache_hit_rate;
::dsn::perf_counter_wrapper rdb_block_cache_mem_usage;
::dsn::perf_counter_wrapper rdb_index_and_filter_blocks_mem_usage;
::dsn::perf_counter_wrapper rdb_memtable_mem_usage;
::dsn::perf_counter_wrapper read_qps;
::dsn::perf_counter_wrapper write_qps;
::dsn::perf_counter_wrapper qps_max_min_scale;
::dsn::perf_counter_wrapper cu_max_min_scale;
};

info_collector();
Expand Down
8 changes: 8 additions & 0 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,14 @@ inline bool parse_app_pegasus_perf_counter_name(const std::string &name,

struct row_data
{
double get_total_qps() const
{
return get_qps + multi_get_qps + scan_qps + put_qps + multi_put_qps + remove_qps +
multi_remove_qps + incr_qps + check_and_set_qps + check_and_mutate_qps;
}

double get_total_cu() const { return recent_read_cu + recent_write_cu; }

std::string row_name;
int32_t app_id = 0;
int32_t partition_count = 0;
Expand Down