Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(over window): add compute count metric for general over window executor #18847

Merged
merged 4 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

228 changes: 136 additions & 92 deletions grafana/risingwave-dev-dashboard.dashboard.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ pub struct StreamingMetrics {
over_window_range_cache_lookup_count: LabelGuardedIntCounterVec<3>,
over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec<3>,
over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec<3>,
over_window_compute_count: LabelGuardedIntCounterVec<3>,
over_window_same_result_count: LabelGuardedIntCounterVec<3>,

/// The duration from receipt of barrier to all actors collection.
/// And the max of all node `barrier_inflight_latency` is the latency for a barrier
Expand Down Expand Up @@ -770,6 +772,22 @@ impl StreamingMetrics {
)
.unwrap();

let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
"stream_over_window_compute_count",
"Over window compute count",
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let over_window_same_result_count = register_guarded_int_counter_vec_with_registry!(
"stream_over_window_same_result_count",
"Over window same result count",
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let opts = histogram_opts!(
"stream_barrier_inflight_duration_seconds",
"barrier_inflight_latency",
Expand Down Expand Up @@ -1058,6 +1076,8 @@ impl StreamingMetrics {
over_window_range_cache_lookup_count,
over_window_range_cache_left_miss_count,
over_window_range_cache_right_miss_count,
over_window_compute_count,
over_window_same_result_count,
barrier_inflight_latency,
barrier_sync_latency,
barrier_manager_progress,
Expand Down Expand Up @@ -1405,6 +1425,12 @@ impl StreamingMetrics {
over_window_range_cache_right_miss_count: self
.over_window_range_cache_right_miss_count
.with_guarded_label_values(label_list),
over_window_compute_count: self
.over_window_compute_count
.with_guarded_label_values(label_list),
over_window_same_result_count: self
.over_window_same_result_count
.with_guarded_label_values(label_list),
}
}

Expand Down Expand Up @@ -1516,4 +1542,6 @@ pub struct OverWindowMetrics {
pub over_window_range_cache_lookup_count: LabelGuardedIntCounter<3>,
pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter<3>,
pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter<3>,
pub over_window_compute_count: LabelGuardedIntCounter<3>,
pub over_window_same_result_count: LabelGuardedIntCounter<3>,
}
6 changes: 6 additions & 0 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@ impl<S: StateStore> OverWindowExecutor<S> {
metrics
.over_window_range_cache_right_miss_count
.inc_by(stats.right_miss_count);
metrics
.over_window_compute_count
.inc_by(stats.compute_count);
metrics
.over_window_same_result_count
.inc_by(stats.same_result_count);

// Update recently accessed range for later shrinking cache.
if !this.cache_policy.is_full()
Expand Down
19 changes: 18 additions & 1 deletion src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,16 @@ pub(super) fn shrink_partition_cache(
}
}

#[derive(Default)]
#[derive(Default, Debug)]
pub(super) struct OverPartitionStats {
// stats for range cache operations
pub lookup_count: u64,
pub left_miss_count: u64,
pub right_miss_count: u64,

// stats for window function state computation
pub compute_count: u64,
pub same_result_count: u64,
}

/// [`AffectedRange`] represents a range of keys that are affected by a delta.
Expand Down Expand Up @@ -420,6 +425,8 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
)> {
let input_schema_len = table.get_data_types().len() - calls.len();
let mut part_changes = BTreeMap::new();
let mut compute_count = 0;
let mut same_result_count = 0;

// Find affected ranges, this also ensures that all rows in the affected ranges are loaded
// into the cache.
Expand Down Expand Up @@ -521,6 +528,13 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
.key_value()
.expect("cursor must be valid until `last_curr_key`");
let output = states.slide_no_evict_hint()?;

compute_count += 1;
let old_output = &row.as_inner()[input_schema_len..];
if !old_output.is_empty() && old_output == output {
same_result_count += 1;
}

let new_row = OwnedRow::new(
row.as_inner()
.iter()
Expand Down Expand Up @@ -555,6 +569,9 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
} {}
}

self.stats.compute_count += compute_count;
self.stats.same_result_count += same_result_count;

Ok((part_changes, accessed_range))
}

Expand Down
Loading