Skip to content

Commit

Permalink
revert: remove enable_stream_row_count config #10261 (#11768)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz authored Aug 18, 2023
1 parent f33183a commit 64ec18e
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 14 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

15 changes: 9 additions & 6 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,9 @@ def section_object_storage(outer_panels):
def section_streaming(panels):
mv_filter = "executor_identity=~\".*MaterializeExecutor.*\""
sink_filter = "executor_identity=~\".*SinkExecutor.*\""
table_type_filter = "table_type=~\"MATERIALIZED_VIEW\""
mv_throughput_query = f'sum(rate({metric("stream_executor_row_count", filter=mv_filter)}[$__rate_interval]) * on(actor_id) group_left(materialized_view_id, table_name) (group({metric("table_info", filter=table_type_filter)}) by (actor_id, materialized_view_id, table_name))) by (materialized_view_id, table_name)'
sink_throughput_query = f'sum(rate({metric("stream_executor_row_count", filter=sink_filter)}[$__rate_interval]) * on(actor_id) group_left(sink_name) (group({metric("sink_info")}) by (actor_id, sink_name))) by (sink_name)'
return [
panels.row("Streaming"),
panels.timeseries_rowsps(
Expand Down Expand Up @@ -703,21 +706,21 @@ def section_streaming(panels):
),
panels.timeseries_rowsps(
"Sink Throughput(rows/s)",
"The figure shows the number of rows output by each sink executor actor per second.",
"The figure shows the number of rows output by each sink per second.",
[
panels.target(
f"rate({metric('stream_executor_row_count', filter=sink_filter)}[$__rate_interval])",
"sink={{executor_identity}} {{actor_id}} @ {{instance}}",
sink_throughput_query,
"sink {{sink_name}}",
),
],
),
panels.timeseries_rowsps(
"Materialized View Throughput(rows/s)",
"The figure shows the number of rows written into each materialized executor actor per second.",
"The figure shows the number of rows written into each materialized view per second.",
[
panels.target(
f"rate({metric('stream_executor_row_count', filter=mv_filter)}[$__rate_interval])",
"MV={{executor_identity}} {{actor_id}} @ {{instance}}",
mv_throughput_query,
"materialized view {{table_name}} table_id {{materialized_view_id}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,12 @@ serde_with::with_prefix!(batch_prefix "batch_");
/// It is put at [`StreamingConfig::developer`].
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct StreamingDeveloperConfig {
/// Set to true to enable per-executor row count metrics. This will produce a lot of timeseries
/// and might affect the prometheus performance. If you only need actor input and output
/// rows data, see `stream_actor_in_record_cnt` and `stream_actor_out_record_cnt` instead.
#[serde(default = "default::developer::stream_enable_executor_row_count")]
pub enable_executor_row_count: bool,

/// The capacity of the chunks in the channel that connects between `ConnectorSource` and
/// `SourceExecutor`.
#[serde(default = "default::developer::connector_message_buffer_size")]
Expand Down Expand Up @@ -1011,6 +1017,10 @@ pub mod default {
1024
}

pub fn stream_enable_executor_row_count() -> bool {
false
}

pub fn connector_message_buffer_size() -> usize {
16
}
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async_stack_trace = "ReleaseVerbose"
unique_user_stream_errors = 10

[streaming.developer]
stream_enable_executor_row_count = false
stream_connector_message_buffer_size = 16
stream_unsafe_extreme_cache_size = 10
stream_chunk_size = 256
Expand Down
25 changes: 25 additions & 0 deletions src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use prometheus::{
use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_pb::common::WorkerType;
use risingwave_pb::stream_plan::stream_node::NodeBody::Sink;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -161,6 +162,8 @@ pub struct MetaMetrics {
/// A dummpy gauge metrics with its label to be the mapping from materialized view id to table
/// id.
pub mv_info: IntGaugeVec,
/// A dummy gauge metrics with its label to be the mapping from actor id to sink id
pub sink_info: IntGaugeVec,

/// Write throughput of commit epoch for each stable
pub table_write_throughput: IntCounterVec,
Expand Down Expand Up @@ -514,6 +517,14 @@ impl MetaMetrics {
)
.unwrap();

let sink_info = register_int_gauge_vec_with_registry!(
"sink_info",
"Mapping from actor id to (actor id, sink name)",
&["actor_id", "sink_name",],
registry
)
.unwrap();

let l0_compact_level_count = register_histogram_vec_with_registry!(
"storage_l0_compact_level_count",
"level_count of l0 compact task",
Expand Down Expand Up @@ -639,6 +650,7 @@ impl MetaMetrics {
actor_info,
table_info,
mv_info,
sink_info,
l0_compact_level_count,
compact_task_size,
compact_task_file_count,
Expand Down Expand Up @@ -773,6 +785,19 @@ pub async fn start_fragment_info_monitor<S: MetaStore>(
}
}

if let Some(stream_node) = &actor.nodes {
if let Some(Sink(sink_node)) = &stream_node.node_body {
let sink_name = match &sink_node.sink_desc {
Some(sink_desc) => &sink_desc.name,
_ => "unknown",
};
meta_metrics
.sink_info
.with_label_values(&[&actor_id_str, sink_name])
.set(1);
}
}

// Report a dummy gauge metrics with (table id, actor id, table
// name) as its label
for table_id in &fragment.state_table_ids {
Expand Down
25 changes: 22 additions & 3 deletions src/stream/src/executor/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct WrapperExecutor {
input: BoxedExecutor,

extra: ExtraInfo,

enable_executor_row_count: bool,
}

impl WrapperExecutor {
Expand All @@ -53,6 +55,7 @@ impl WrapperExecutor {
actor_id: ActorId,
executor_id: u64,
metrics: Arc<StreamingMetrics>,
enable_executor_row_count: bool,
) -> Self {
Self {
input,
Expand All @@ -62,11 +65,13 @@ impl WrapperExecutor {
executor_id,
metrics,
},
enable_executor_row_count,
}
}

#[allow(clippy::let_and_return)]
fn wrap_debug(
_enable_executor_row_count: bool,
info: Arc<ExecutorInfo>,
_extra: ExtraInfo,
stream: impl MessageStream + 'static,
Expand All @@ -78,6 +83,7 @@ impl WrapperExecutor {
}

fn wrap(
enable_executor_row_count: bool,
info: Arc<ExecutorInfo>,
extra: ExtraInfo,
stream: impl MessageStream + 'static,
Expand All @@ -98,6 +104,7 @@ impl WrapperExecutor {

// Trace
let stream = trace::trace(
enable_executor_row_count,
info.clone(),
extra.input_pos,
extra.actor_id,
Expand All @@ -107,7 +114,7 @@ impl WrapperExecutor {
);

if cfg!(debug_assertions) {
Self::wrap_debug(info, extra, stream).boxed()
Self::wrap_debug(enable_executor_row_count, info, extra, stream).boxed()
} else {
stream.boxed()
}
Expand All @@ -117,12 +124,24 @@ impl WrapperExecutor {
impl Executor for WrapperExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
let info = Arc::new(self.input.info());
Self::wrap(info, self.extra, self.input.execute()).boxed()
Self::wrap(
self.enable_executor_row_count,
info,
self.extra,
self.input.execute(),
)
.boxed()
}

fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
let info = Arc::new(self.input.info());
Self::wrap(info, self.extra, self.input.execute_with_epoch(epoch)).boxed()
Self::wrap(
self.enable_executor_row_count,
info,
self.extra,
self.input.execute_with_epoch(epoch),
)
.boxed()
}

fn schema(&self) -> &Schema {
Expand Down
5 changes: 4 additions & 1 deletion src/stream/src/executor/wrapper/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::task::ActorId;
/// Streams wrapped by `trace` will be traced with `tracing` spans and reported to `opentelemetry`.
#[try_stream(ok = Message, error = StreamExecutorError)]
pub async fn trace(
enable_executor_row_count: bool,
info: Arc<ExecutorInfo>,
_input_pos: usize,
actor_id: ActorId,
Expand All @@ -38,14 +39,16 @@ pub async fn trace(

let span_name = pretty_identity(&info.identity, actor_id, executor_id);

let is_sink_or_mv = info.identity.contains("Materialize") || info.identity.contains("Sink");

let new_span = || tracing::info_span!("executor", "otel.name" = span_name, actor_id);
let mut span = new_span();

pin_mut!(input);

while let Some(message) = input.next().instrument(span.clone()).await.transpose()? {
if let Message::Chunk(chunk) = &message {
if chunk.cardinality() > 0 {
if chunk.cardinality() > 0 && (enable_executor_row_count || is_sink_or_mv) {
metrics
.executor_row_count
.with_label_values(&[&actor_id_string, &span_name])
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ impl LocalStreamManagerCore {
actor_context.id,
executor_id,
self.streaming_metrics.clone(),
self.config.developer.enable_executor_row_count,
)
.boxed();

Expand Down

0 comments on commit 64ec18e

Please sign in to comment.