Skip to content

Commit

Permalink
feat: support collect statistics about the engine (#1451)
Browse files Browse the repository at this point in the history
## Rationale
Close #1438.

## Detailed Changes
Support report statistics on shard level.

## Test Plan
New unit tests is added and the CI should pass.
  • Loading branch information
ShiKaiWi authored Jan 26, 2024
1 parent 1f5dde8 commit 5ec187e
Show file tree
Hide file tree
Showing 13 changed files with 248 additions and 74 deletions.
143 changes: 140 additions & 3 deletions src/analytic_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,31 @@

//! Implements the TableEngine trait
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use common_types::table::ShardId;
use generic_error::BoxError;
use logger::{error, info};
use prometheus::{core::Collector, HistogramVec, IntCounterVec};
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::{
Close, CloseShardRequest, CloseTableRequest, CreateTableParams, CreateTableRequest,
DropTableRequest, OpenShard, OpenShardRequest, OpenShardResult, OpenTableNoCause,
OpenTableRequest, OpenTableWithCause, Result, TableDef, TableEngine,
OpenTableRequest, OpenTableWithCause, Result, ShardStats, TableDef, TableEngine,
TableEngineStats, Unexpected,
},
table::{SchemaId, TableRef},
ANALYTIC_ENGINE_TYPE,
};

use crate::{instance::InstanceRef, space::SpaceId, table::TableImpl};
use crate::{
instance::InstanceRef,
space::SpaceId,
sst::metrics::FETCHED_SST_BYTES_HISTOGRAM,
table::{metrics::TABLE_WRITE_BYTES_COUNTER, TableImpl},
};

/// TableEngine implementation
pub struct TableEngineImpl {
Expand Down Expand Up @@ -234,6 +242,63 @@ impl TableEngine for TableEngineImpl {

self.close_tables_of_shard(close_requests).await
}

async fn report_statistics(&self) -> Result<Option<TableEngineStats>> {
let table_engine_stats =
collect_stats_from_metric(&FETCHED_SST_BYTES_HISTOGRAM, &TABLE_WRITE_BYTES_COUNTER)?;

Ok(Some(table_engine_stats))
}
}

/// Collect the table engine stats from the two provided metric.
fn collect_stats_from_metric(
fetched_bytes_hist: &HistogramVec,
written_bytes_counter: &IntCounterVec,
) -> Result<TableEngineStats> {
let mut shard_stats: HashMap<ShardId, ShardStats> = HashMap::new();

// Collect the metrics for fetched bytes by shards.
for_shard_metric(fetched_bytes_hist, |shard_id, metric| {
let sum = metric.get_histogram().get_sample_sum() as u64;
let stats = shard_stats.entry(shard_id).or_default();
stats.num_fetched_bytes += sum;
})?;

// Collect the metrics for the written bytes by shards.
for_shard_metric(written_bytes_counter, |shard_id, metric| {
let sum = metric.get_counter().get_value() as u64;
let stats = shard_stats.entry(shard_id).or_default();
stats.num_written_bytes += sum;
})?;

Ok(TableEngineStats { shard_stats })
}

/// Iterate the metrics collected by `metric_collector`, and provide the metric
/// with a valid shard_id to the `f` closure.
fn for_shard_metric<C, F>(metric_collector: &C, mut f: F) -> Result<()>
where
C: Collector,
F: FnMut(ShardId, &prometheus::proto::Metric),
{
const SHARD_LABEL: &str = "shard_id";

let metric_families = metric_collector.collect();
for metric_family in metric_families {
for metric in metric_family.get_metric() {
let labels = metric.get_label();
let shard_id = labels
.iter()
.find_map(|pair| (pair.get_name() == SHARD_LABEL).then(|| pair.get_value()));
if let Some(raw_shard_id) = shard_id {
let shard_id: ShardId = str::parse(raw_shard_id).box_err().context(Unexpected)?;
f(shard_id, metric);
}
}
}

Ok(())
}

/// Generate the space id from the schema id with assumption schema id is unique
Expand All @@ -242,3 +307,75 @@ impl TableEngine for TableEngineImpl {
pub fn build_space_id(schema_id: SchemaId) -> SpaceId {
schema_id.as_u32()
}

#[cfg(test)]
mod tests {
use prometheus::{exponential_buckets, register_histogram_vec, register_int_counter_vec};

use super::*;

#[test]
fn test_collect_table_engine_stats() {
let hist = register_histogram_vec!(
"fetched_bytes",
"Histogram for sst get range length",
&["shard_id", "table"],
// The buckets: [1MB, 2MB, 4MB, 8MB, ... , 8GB]
exponential_buckets(1024.0 * 1024.0, 2.0, 13).unwrap()
)
.unwrap();

hist.with_label_values(&["0", "table_0"]).observe(1000.0);
hist.with_label_values(&["0", "table_1"]).observe(1000.0);
hist.with_label_values(&["0", "table_2"]).observe(1000.0);
hist.with_label_values(&["1", "table_3"]).observe(1000.0);
hist.with_label_values(&["1", "table_4"]).observe(1000.0);
hist.with_label_values(&["2", "table_5"]).observe(4000.0);

let counter = register_int_counter_vec!(
"written_counter",
"Write bytes counter of table",
&["shard_id", "table"]
)
.unwrap();

counter.with_label_values(&["0", "table_0"]).inc_by(100);
counter.with_label_values(&["0", "table_1"]).inc_by(100);
counter.with_label_values(&["0", "table_2"]).inc_by(100);
counter.with_label_values(&["1", "table_3"]).inc_by(100);
counter.with_label_values(&["1", "table_4"]).inc_by(100);
counter.with_label_values(&["2", "table_5"]).inc_by(400);

let stats = collect_stats_from_metric(&hist, &counter).unwrap();

let expected_stats = {
let mut shard_stats: HashMap<ShardId, ShardStats> = HashMap::new();

shard_stats.insert(
0,
ShardStats {
num_fetched_bytes: 3000,
num_written_bytes: 300,
},
);
shard_stats.insert(
1,
ShardStats {
num_fetched_bytes: 2000,
num_written_bytes: 200,
},
);
shard_stats.insert(
2,
ShardStats {
num_fetched_bytes: 4000,
num_written_bytes: 400,
},
);

shard_stats
};

assert_eq!(stats.shard_stats, expected_stats);
}
}
9 changes: 6 additions & 3 deletions src/analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,9 +555,12 @@ impl<'a> Writer<'a> {

// Collect metrics.
let num_columns = row_group.schema().num_columns();
table_data
.metrics
.on_write_request_done(row_group.num_rows(), num_columns);
let num_written_bytes: usize = row_group.iter().map(|row| row.size()).sum();
table_data.metrics.on_write_request_done(
row_group.num_rows(),
num_columns,
num_written_bytes,
);

Ok(())
}
Expand Down
9 changes: 5 additions & 4 deletions src/analytic_engine/src/sst/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ lazy_static! {
&["table"]
).unwrap();

static ref FETCHED_SST_BYTES_HISTOGRAM: HistogramVec = register_histogram_vec!(
pub static ref FETCHED_SST_BYTES_HISTOGRAM: HistogramVec = register_histogram_vec!(
"fetched_sst_bytes",
"Histogram for sst get range length",
&["table"],
&["shard_id", "table"],
// The buckets: [1MB, 2MB, 4MB, 8MB, ... , 8GB]
exponential_buckets(1024.0 * 1024.0, 2.0, 13).unwrap()
).unwrap();
Expand All @@ -72,13 +72,14 @@ pub struct MaybeTableLevelMetrics {
}

impl MaybeTableLevelMetrics {
pub fn new(table: &str) -> Self {
pub fn new(table: &str, shard_id_label: &str) -> Self {
Self {
row_group_before_prune_counter: ROW_GROUP_BEFORE_PRUNE_COUNTER
.with_label_values(&[table]),
row_group_after_prune_counter: ROW_GROUP_AFTER_PRUNE_COUNTER
.with_label_values(&[table]),
num_fetched_sst_bytes_hist: FETCHED_SST_BYTES_HISTOGRAM.with_label_values(&[table]),
num_fetched_sst_bytes_hist: FETCHED_SST_BYTES_HISTOGRAM
.with_label_values(&[&shard_id_label, table]),
num_fetched_sst_bytes: AtomicU64::new(0),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ mod tests {
None,
);
let sst_read_options = SstReadOptions {
maybe_table_level_metrics: Arc::new(MaybeTableLevelMetrics::new("test")),
maybe_table_level_metrics: Arc::new(MaybeTableLevelMetrics::new("test", "0")),
frequency: ReadFrequency::Frequent,
num_rows_per_row_group: 5,
predicate: Arc::new(Predicate::empty()),
Expand Down
4 changes: 2 additions & 2 deletions src/analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl TableData {
let purge_queue = purger.create_purge_queue(space_id, id);
let current_version =
TableVersion::new(mem_size_options.size_sampling_interval, purge_queue);
let metrics_ctx = MetricsContext::new(&name, metrics_opt);
let metrics_ctx = MetricsContext::new(&name, shard_id, metrics_opt);
let metrics = Metrics::new(metrics_ctx);
let mutable_limit = AtomicU32::new(compute_mutable_limit(
opts.write_buffer_size,
Expand Down Expand Up @@ -421,7 +421,7 @@ impl TableData {
let purge_queue = purger.create_purge_queue(add_meta.space_id, add_meta.table_id);
let current_version =
TableVersion::new(mem_size_options.size_sampling_interval, purge_queue);
let metrics_ctx = MetricsContext::new(&add_meta.table_name, metrics_opt);
let metrics_ctx = MetricsContext::new(&add_meta.table_name, shard_id, metrics_opt);
let metrics = Metrics::new(metrics_ctx);
let mutable_limit = AtomicU32::new(compute_mutable_limit(
add_meta.opts.write_buffer_size,
Expand Down
Loading

0 comments on commit 5ec187e

Please sign in to comment.