Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(metrics): remove actor_id label from back-pressure metrics based on metrics level #18213

Merged
merged 8 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 7 additions & 31 deletions dashboard/lib/api/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*
*/
import { Metrics, MetricsSample } from "../../components/metrics"
import { GetBackPressureResponse } from "../../proto/gen/monitor_service"
import {
BackPressureInfo,
GetBackPressureResponse,
} from "../../proto/gen/monitor_service"
import api from "./api"

export interface BackPressuresMetrics {
Expand All @@ -30,13 +33,6 @@ export async function fetchPrometheusBackPressure() {
return res
}

export interface BackPressureInfo {
actorId: number
fragmentId: number
downstreamFragmentId: number
value: number
}

export interface BackPressureRateInfo {
actorId: number
fragmentId: number
Expand All @@ -55,14 +51,8 @@ function convertToMapAndAgg(
const map = new Map<string, number>()
for (const item of backPressures) {
const key = `${item.fragmentId}-${item.downstreamFragmentId}`
if (mapValue.has(key) && mapNumber.has(key)) {
// add || tp avoid NaN and pass check
mapValue.set(key, (mapValue.get(key) || 0) + item.value)
mapNumber.set(key, (mapNumber.get(key) || 0) + 1)
} else {
mapValue.set(key, item.value)
mapNumber.set(key, 1)
}
mapValue.set(key, (mapValue.get(key) || 0) + item.value)
mapNumber.set(key, (mapNumber.get(key) || 0) + item.actorCount)
}

for (const [key, value] of mapValue) {
Expand Down Expand Up @@ -137,7 +127,7 @@ export function calculateCumulativeBp(
mapResult.forEach((value, key) => {
const [fragmentId, downstreamFragmentId] = key.split("-").map(Number)
const backPressureInfo: BackPressureInfo = {
actorId: 0,
actorCount: 1, // the value here has already been averaged by real actor count
fragmentId,
downstreamFragmentId,
value,
Expand All @@ -161,27 +151,13 @@ export function calculateBPRate(
return convertToBackPressureMetrics(convertFromMapAndAgg(result))
}

export const BackPressureInfo = {
fromJSON: (object: any) => {
return {
actorId: isSet(object.actorId) ? Number(object.actorId) : 0,
fragmentId: isSet(object.fragmentId) ? Number(object.fragmentId) : 0,
downstreamFragmentId: isSet(object.downstreamFragmentId)
? Number(object.downstreamFragmentId)
: 0,
value: isSet(object.value) ? Number(object.value) : 0,
}
},
}

// Get back pressure from meta node -> compute node
export async function fetchEmbeddedBackPressure() {
const response: GetBackPressureResponse = await api.get(
"/metrics/fragment/embedded_back_pressures"
)
let backPressureInfos: BackPressureInfo[] =
response.backPressureInfos?.map(BackPressureInfo.fromJSON) ?? []
backPressureInfos = backPressureInfos.sort((a, b) => a.actorId - b.actorId)
return backPressureInfos
}

Expand Down
2 changes: 1 addition & 1 deletion dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import Title from "../components/Title"
import useErrorToast from "../hook/useErrorToast"
import useFetch from "../lib/api/fetch"
import {
BackPressureInfo,
calculateBPRate,
calculateCumulativeBp,
fetchEmbeddedBackPressure,
Expand All @@ -52,6 +51,7 @@ import {
} from "../lib/api/streaming"
import { FragmentBox } from "../lib/layout"
import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta"
import { BackPressureInfo } from "../proto/gen/monitor_service"
import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan"

interface DispatcherNode {
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1133,8 +1133,12 @@ def section_streaming_actors(outer_panels):
"much time it takes an actor to process a message, i.e. a barrier, a watermark or rows of data, "
"on average. Then we divide this duration by 1 second and show it as a percentage.",
[
# Note: actor_count is equal to the number of dispatchers for a given downstream fragment,
# this holds true as long as we don't support multiple edges between two fragments.
panels.target(
f"avg(rate({metric('stream_actor_output_buffer_blocking_duration_ns')}[$__rate_interval])) by (fragment_id, downstream_fragment_id) / 1000000000",
f"sum(rate({metric('stream_actor_output_buffer_blocking_duration_ns')}[$__rate_interval])) by (fragment_id, downstream_fragment_id) \
/ ignoring (downstream_fragment_id) group_left sum({metric('stream_actor_count')}) by (fragment_id) \
/ 1000000000",
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
"fragment {{fragment_id}}->{{downstream_fragment_id}}",
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions proto/monitor_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
message GetBackPressureRequest {}

message BackPressureInfo {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 downstream_fragment_id = 3;
uint32 fragment_id = 1;

Check failure on line 58 in proto/monitor_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "fragment_id" on message "BackPressureInfo" changed option "json_name" from "actorId" to "fragmentId".

Check failure on line 58 in proto/monitor_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" on message "BackPressureInfo" changed name from "actor_id" to "fragment_id".
uint32 downstream_fragment_id = 2;

Check failure on line 59 in proto/monitor_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "downstream_fragment_id" on message "BackPressureInfo" changed option "json_name" from "fragmentId" to "downstreamFragmentId".

Check failure on line 59 in proto/monitor_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" on message "BackPressureInfo" changed name from "fragment_id" to "downstream_fragment_id".
uint32 actor_count = 3;

Check failure on line 60 in proto/monitor_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "3" with name "actor_count" on message "BackPressureInfo" changed option "json_name" from "downstreamFragmentId" to "actorCount".

Check failure on line 60 in proto/monitor_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "3" on message "BackPressureInfo" changed name from "downstream_fragment_id" to "actor_count".
double value = 4;
}

Expand Down
41 changes: 41 additions & 0 deletions src/common/metrics/src/gauge_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use prometheus::IntGauge;

#[easy_ext::ext(IntGaugeExt)]
impl IntGauge {
/// Increment the gauge, and return a guard that will decrement the gauge when dropped.
#[must_use]
pub fn inc_guard(&self) -> impl Drop + '_ {
struct Guard<'a> {
gauge: &'a IntGauge,
}

impl<'a> Guard<'a> {
fn create(gauge: &'a IntGauge) -> Self {
gauge.inc();
Self { gauge }
}
}

impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
self.gauge.dec();
}
}

Guard::create(self)
}
}
2 changes: 2 additions & 0 deletions src/common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;

mod error_metrics;
mod gauge_ext;
mod guarded_metrics;
mod metrics;
pub mod monitor;
mod relabeled_metric;

pub use error_metrics::*;
pub use gauge_ext::*;
pub use guarded_metrics::*;
pub use metrics::*;
pub use relabeled_metric::*;
Expand Down
13 changes: 12 additions & 1 deletion src/common/metrics/src/relabeled_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prometheus::core::{MetricVec, MetricVecBuilder};
use prometheus::core::{Collector, MetricVec, MetricVecBuilder};
use prometheus::{HistogramVec, IntCounterVec};

use crate::{
Expand Down Expand Up @@ -89,6 +89,7 @@ impl<T: MetricVecBuilder> RelabeledMetricVec<MetricVec<T>> {
}

impl<T: MetricVecBuilder, const N: usize> RelabeledMetricVec<LabelGuardedMetricVec<T, N>> {
// TODO: shall we rename this to `with_guarded_label_values`?
pub fn with_label_values(&self, vals: &[&str; N]) -> LabelGuardedMetric<T::M, N> {
if self.metric_level > self.relabel_threshold {
// relabel first n labels to empty string
Expand All @@ -102,6 +103,16 @@ impl<T: MetricVecBuilder, const N: usize> RelabeledMetricVec<LabelGuardedMetricV
}
}

impl<T: Collector> Collector for RelabeledMetricVec<T> {
fn desc(&self) -> Vec<&prometheus::core::Desc> {
self.metric.desc()
}

fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
self.metric.collect()
}
}

pub type RelabeledCounterVec = RelabeledMetricVec<IntCounterVec>;
pub type RelabeledHistogramVec = RelabeledMetricVec<HistogramVec>;

Expand Down
74 changes: 59 additions & 15 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::ffi::CString;
use std::fs;
use std::path::Path;
Expand Down Expand Up @@ -291,31 +291,75 @@ impl MonitorService for MonitorServiceImpl {
&self,
_request: Request<GetBackPressureRequest>,
) -> Result<Response<GetBackPressureResponse>, Status> {
let metric_family = global_streaming_metrics(MetricLevel::Info)
let metrics = global_streaming_metrics(MetricLevel::Info);
let actor_output_buffer_blocking_duration_ns = metrics
.actor_output_buffer_blocking_duration_ns
.collect()
.into_iter()
.next()
.unwrap()
.take_metric();
let actor_count = metrics
.actor_count
.collect()
.into_iter()
.next()
.unwrap()
.take_metric();

let actor_count: HashMap<_, _> = actor_count
.iter()
.filter_map(|m| {
let fragment_id = m
.get_label()
.iter()
.find(|lp| lp.get_name() == "fragment_id")?
.get_value()
.parse::<u32>()
.unwrap();
let count = m.get_gauge().get_value() as u32;
Some((fragment_id, count))
})
.collect();
let metrics = metric_family.get(0).unwrap().get_metric();
let mut back_pressure_infos: Vec<BackPressureInfo> = Vec::new();
for label_pairs in metrics {
let mut back_pressure_info = BackPressureInfo::default();

let mut back_pressure_infos: HashMap<_, BackPressureInfo> = HashMap::new();

for label_pairs in actor_output_buffer_blocking_duration_ns {
let mut fragment_id = None;
let mut downstream_fragment_id = None;
for label_pair in label_pairs.get_label() {
if label_pair.get_name() == "actor_id" {
back_pressure_info.actor_id = label_pair.get_value().parse::<u32>().unwrap();
}
if label_pair.get_name() == "fragment_id" {
back_pressure_info.fragment_id = label_pair.get_value().parse::<u32>().unwrap();
fragment_id = label_pair.get_value().parse::<u32>().ok();
}
if label_pair.get_name() == "downstream_fragment_id" {
back_pressure_info.downstream_fragment_id =
label_pair.get_value().parse::<u32>().unwrap();
downstream_fragment_id = label_pair.get_value().parse::<u32>().ok();
}
}
back_pressure_info.value = label_pairs.get_counter().get_value();
back_pressure_infos.push(back_pressure_info);
let Some(fragment_id) = fragment_id else {
continue;
};
let Some(downstream_fragment_id) = downstream_fragment_id else {
continue;
};

// When metrics level is Debug, we may have multiple metrics with the same label pairs
// (fragment_id, downstream_fragment_id). We need to aggregate them locally.
//
// Metrics from different compute nodes should be aggregated by the caller.
let back_pressure_info = back_pressure_infos
.entry((fragment_id, downstream_fragment_id))
.or_insert_with(|| BackPressureInfo {
fragment_id,
downstream_fragment_id,
actor_count: actor_count.get(&fragment_id).copied().unwrap_or_default(),
value: 0.,
});

back_pressure_info.value += label_pairs.get_counter().get_value();
}

Ok(Response::new(GetBackPressureResponse {
back_pressure_infos,
back_pressure_infos: back_pressure_infos.into_values().collect(),
}))
}

Expand Down
9 changes: 7 additions & 2 deletions src/meta/src/dashboard/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,13 @@ pub async fn list_prometheus_fragment_back_pressure(
Extension(srv): Extension<Service>,
) -> Result<Json<FragmentBackPressure>> {
if let Some(ref client) = srv.prometheus_client {
let back_pressure_query =
format!("avg(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[60s])) by (fragment_id, downstream_fragment_id) / 1000000000", srv.prometheus_selector);
let back_pressure_query = format!(
"sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[60s])) by (fragment_id, downstream_fragment_id) \
/ ignoring (downstream_fragment_id) group_left sum(stream_actor_count{{{}}}) by (fragment_id) \
/ 1000000000",
srv.prometheus_selector,
srv.prometheus_selector,
);
let result = client.query(back_pressure_query).get().await.map_err(err)?;
let back_pressure_data = result
.data()
Expand Down
10 changes: 8 additions & 2 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use hytra::TrAdder;
use risingwave_common::catalog::TableId;
use risingwave_common::config::StreamingConfig;
use risingwave_common::log::LogSuppresser;
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::metrics::{IntGaugeExt, GLOBAL_ERROR_METRICS};
use risingwave_common::util::epoch::EpochPair;
use risingwave_expr::expr_context::{expr_context_scope, FRAGMENT_ID};
use risingwave_expr::ExprError;
Expand Down Expand Up @@ -198,7 +198,6 @@ where
.into()));

let id = self.actor_context.id;

let span_name = format!("Actor {id}");

let new_span = |epoch: Option<EpochPair>| {
Expand All @@ -213,6 +212,13 @@ where
};
let mut span = new_span(None);

let actor_count = self
.actor_context
.streaming_metrics
.actor_count
.with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
let _actor_count_guard = actor_count.inc_guard();

let mut last_epoch: Option<EpochPair> = None;
let mut stream = Box::pin(Box::new(self.consumer).execute());

Expand Down
Loading
Loading