Skip to content

Commit

Permalink
fix(metrics): source throughput should be of each source executor (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored May 30, 2023
1 parent 2b04ed5 commit 6cb4387
Show file tree
Hide file tree
Showing 11 changed files with 17 additions and 40 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

24 changes: 1 addition & 23 deletions grafana/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class Panels:
"fillOpacity": 10,
"interval": "1s",
"maxDataPoints": 1000,
"legendDisplayMode": "table",
}

def __init__(self, datasource):
Expand Down Expand Up @@ -141,7 +142,6 @@ def timeseries(self, title, description, targets):
description=description,
targets=targets,
gridPos=gridPos,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -158,7 +158,6 @@ def timeseries_count(self,
targets=targets,
gridPos=gridPos,
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -177,7 +176,6 @@ def timeseries_percentage(self,
gridPos=gridPos,
unit="percentunit",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -195,7 +193,6 @@ def timeseries_latency(self,
gridPos=gridPos,
unit="s",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -213,7 +210,6 @@ def timeseries_actor_latency(self,
gridPos=gridPos,
unit="s",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -231,7 +227,6 @@ def timeseries_actor_latency_small(self,
gridPos=gridPos,
unit="s",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -249,7 +244,6 @@ def timeseries_query_per_sec(self,
gridPos=gridPos,
unit="Qps",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -267,7 +261,6 @@ def timeseries_bytes_per_sec(self,
gridPos=gridPos,
unit="Bps",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -285,7 +278,6 @@ def timeseries_bytes(self,
gridPos=gridPos,
unit="bytes",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -299,7 +291,6 @@ def timeseries_row(self, title, description, targets, legendCols=["mean"]):
gridPos=gridPos,
unit="row",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -312,7 +303,6 @@ def timeseries_ms(self, title, description, targets, legendCols=["mean"]):
targets=targets,
gridPos=gridPos,
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -330,7 +320,6 @@ def timeseries_kilobytes(self,
gridPos=gridPos,
unit="kbytes",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -348,7 +337,6 @@ def timeseries_dollar(self,
gridPos=gridPos,
unit="$",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -362,7 +350,6 @@ def timeseries_ops(self, title, description, targets, legendCols=["mean"]):
gridPos=gridPos,
unit="ops",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -380,7 +367,6 @@ def timeseries_actor_ops(self,
gridPos=gridPos,
unit="ops",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -398,7 +384,6 @@ def timeseries_actor_ops_small(self,
gridPos=gridPos,
unit="ops",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -416,7 +401,6 @@ def timeseries_rowsps(self,
gridPos=gridPos,
unit="rows/s",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -434,7 +418,6 @@ def timeseries_bytesps(self,
gridPos=gridPos,
unit="MB/s",
legendCalcs=legendCols,
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -447,7 +430,6 @@ def timeseries_actor_rowsps(self, title, description, targets):
targets=targets,
gridPos=gridPos,
unit="rows/s",
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -460,7 +442,6 @@ def timeseries_memory(self, title, description, targets):
targets=targets,
gridPos=gridPos,
unit="bytes",
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -473,7 +454,6 @@ def timeseries_cpu(self, title, description, targets):
targets=targets,
gridPos=gridPos,
unit="percentunit",
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -486,7 +466,6 @@ def timeseries_latency_small(self, title, description, targets):
targets=targets,
gridPos=gridPos,
unit="s",
legendDisplayMode="table",
**self.common_options,
)

Expand All @@ -498,7 +477,6 @@ def timeseries_id(self, title, description, targets):
description=description,
targets=targets,
gridPos=gridPos,
legendDisplayMode="table",
**self.common_options,
)

Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ def section_streaming(panels):
[
panels.target(
f"rate({metric('stream_source_output_rows_counts')}[$__rate_interval])",
"source={{source_name}} {{source_id}} @ {{instance}}",
"source={{source_name}} actor={{actor_id}} @ {{instance}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ def section_streaming(outer_panels):
[
panels.target(
f"rate({metric('stream_source_output_rows_counts')}[$__rate_interval])",
"source={{source_name}} {{source_id}} @ {{instance}}",
"source={{source_name}} actor={{actor_id}} @ {{instance}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl StreamingMetrics {
let source_output_row_count = register_int_counter_vec_with_registry!(
"stream_source_output_rows_counts",
"Total number of rows that have been output from source",
&["source_id", "source_name"],
&["source_id", "source_name", "actor_id"],
registry
)
.unwrap();
Expand Down
3 changes: 0 additions & 3 deletions src/stream/src/executor/source/executor_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ pub struct StreamSourceCore<S: StateStore> {

pub(crate) column_ids: Vec<ColumnId>,

pub(crate) source_identify: String,

/// `source_desc_builder` will be taken (`mem::take`) on execution. A `SourceDesc` (currently
/// named `SourceDescV2`) will be constructed and used for execution.
pub(crate) source_desc_builder: Option<SourceDescBuilder>,
Expand Down Expand Up @@ -61,7 +59,6 @@ where
source_id,
source_name,
column_ids,
source_identify: "Table_".to_string() + &source_id.table_id().to_string(),
source_desc_builder: Some(source_desc_builder),
stream_source_splits: HashMap::new(),
split_state_store,
Expand Down
5 changes: 3 additions & 2 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
.source_row_per_barrier
.with_label_values(&[
self.ctx.id.to_string().as_str(),
self.stream_source_core.source_identify.as_ref(),
self.stream_source_core.source_id.to_string().as_ref(),
])
.inc_by(metric_row_per_barrier);
metric_row_per_barrier = 0;
Expand Down Expand Up @@ -426,8 +426,9 @@ impl<S: StateStore> FsSourceExecutor<S> {
self.metrics
.source_output_row_count
.with_label_values(&[
self.stream_source_core.source_identify.as_str(),
self.stream_source_core.source_id.to_string().as_ref(),
self.stream_source_core.source_name.as_ref(),
self.ctx.id.to_string().as_str(),
])
.inc_by(chunk.cardinality() as u64);
yield Message::Chunk(chunk);
Expand Down
11 changes: 6 additions & 5 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ impl<S: StateStore> SourceExecutor<S> {
self.stream_source_core
.as_ref()
.unwrap()
.source_identify
.source_id
.to_string()
.as_ref(),
])
.inc_by(metric_row_per_barrier);
Expand Down Expand Up @@ -494,13 +495,15 @@ impl<S: StateStore> SourceExecutor<S> {
self.stream_source_core
.as_ref()
.unwrap()
.source_identify
.as_str(),
.source_id
.to_string()
.as_ref(),
self.stream_source_core
.as_ref()
.unwrap()
.source_name
.as_ref(),
self.ctx.id.to_string().as_str(),
])
.inc_by(chunk.cardinality() as u64);
yield Message::Chunk(chunk);
Expand Down Expand Up @@ -630,7 +633,6 @@ mod tests {
let core = StreamSourceCore::<MemoryStateStore> {
source_id: table_id,
column_ids,
source_identify: "Table_".to_string() + &table_id.table_id().to_string(),
source_desc_builder: Some(source_desc_builder),
stream_source_splits: HashMap::new(),
split_state_store,
Expand Down Expand Up @@ -718,7 +720,6 @@ mod tests {
let core = StreamSourceCore::<MemoryStateStore> {
source_id: table_id,
column_ids: column_ids.clone(),
source_identify: "Table_".to_string() + &table_id.table_id().to_string(),
source_desc_builder: Some(source_desc_builder),
stream_source_splits: HashMap::new(),
split_state_store,
Expand Down

0 comments on commit 6cb4387

Please sign in to comment.