Skip to content

Commit

Permalink
chery-pick: feat(subscription): add metrics for cursor (#18052)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored and hzxa21 committed Nov 5, 2024
1 parent 04e33e7 commit 577b57c
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 27 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

76 changes: 76 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1991,6 +1991,82 @@ def section_frontend(outer_panels):
],
["last"],
),
panels.timeseries_count(
"Subsription Cursor Nums",
"The number of valid and invalid subscription cursor",
[
panels.target(
f"{metric('subsription_cursor_nums')}",
"",
),
panels.target(
f"{metric('invalid_subsription_cursor_nums')}",
"",
),
],
),
panels.timeseries_count(
"Subscription Cursor Error Count",
"The subscription error num of cursor",
[
panels.target(
f"{metric('subscription_cursor_error_count')}",
"",
),
],
),
panels.timeseries_latency_ms(
"Subscription Cursor Query Duration(ms)",
"The amount of time a query exists inside the cursor",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('subscription_cursor_query_duration_bucket')}[$__rate_interval])) by (le, subscription_name))",
f"p{legend} - {{{{subscription_name}}}}",
),
[50, 99, "max"],
),
],
),
panels.timeseries_latency_ms(
"Subscription Cursor Declare Duration(ms)",
"Subscription cursor duration of declare",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('subscription_cursor_declare_duration_bucket')}[$__rate_interval])) by (le, subscription_name))",
f"p{legend} - {{{{subscription_name}}}}",
),
[50, 99, "max"],
)
],
),
panels.timeseries_latency_ms(
"Subscription Cursor Fetch Duration(ms)",
"Subscription cursor duration of fetch",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('subscription_cursor_fetch_duration_bucket')}[$__rate_interval])) by (le, subscription_name))",
f"p{legend} - {{{{subscription_name}}}}",
),
[50, 99, "max"],
)
],
),
panels.timeseries_latency_ms(
"Subscription Cursor Last Fetch Duration(ms)",
"Since the last fetch, the time up to now",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('subscription_cursor_last_fetch_duration_bucket')}[$__rate_interval])) by (le, subscription_name))",
f"p{legend} - {{{{subscription_name}}}}",
),
[50, 99, "max"],
)
],
),
panels.timeseries_latency(
"Query Latency (Distributed Query Mode)",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

12 changes: 10 additions & 2 deletions src/frontend/src/handler/declare_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn handle_declare_subscription_cursor(
risingwave_sqlparser::ast::Since::Full => None,
};
// Create cursor based on the response
session
if let Err(e) = session
.get_cursor_manager()
.add_subscription_cursor(
cursor_name.clone(),
Expand All @@ -91,7 +91,15 @@ async fn handle_declare_subscription_cursor(
subscription,
&handle_args,
)
.await?;
.await
{
session
.env()
.cursor_metrics
.subscription_cursor_error_count
.inc();
return Err(e);
}

Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
}
Expand Down
188 changes: 183 additions & 5 deletions src/frontend/src/monitor/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;
use core::mem;
use std::collections::HashMap;
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use prometheus::core::{AtomicU64, GenericCounter};
use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
use prometheus::{
exponential_buckets, histogram_opts, register_histogram_with_registry,
register_int_counter_with_registry, register_int_gauge_with_registry, Histogram, IntGauge,
Registry,
exponential_buckets, histogram_opts, register_histogram_vec_with_registry,
register_histogram_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, Histogram, HistogramVec, IntGauge, Registry,
};
use risingwave_common::metrics::TrAdderGauge;
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use tokio::task::JoinHandle;

use crate::session::SessionMapRef;

#[derive(Clone)]
pub struct FrontendMetrics {
Expand Down Expand Up @@ -80,3 +86,175 @@ impl FrontendMetrics {
GLOBAL_FRONTEND_METRICS.clone()
}
}

pub static GLOBAL_CURSOR_METRICS: LazyLock<CursorMetrics> =
LazyLock::new(|| CursorMetrics::new(&GLOBAL_METRICS_REGISTRY));

#[derive(Clone)]
pub struct CursorMetrics {
pub subscription_cursor_error_count: GenericCounter<AtomicU64>,
pub subscription_cursor_query_duration: HistogramVec,
pub subscription_cursor_declare_duration: HistogramVec,
pub subscription_cursor_fetch_duration: HistogramVec,
subsription_cursor_nums: GenericGauge<AtomicI64>,
invalid_subsription_cursor_nums: GenericGauge<AtomicI64>,
subscription_cursor_last_fetch_duration: HistogramVec,
_cursor_metrics_collector: Option<Arc<CursorMetricsCollector>>,
}

impl CursorMetrics {
pub fn new(registry: &Registry) -> Self {
let subscription_cursor_error_count = register_int_counter_with_registry!(
"subscription_cursor_error_count",
"The subscription error num of cursor",
registry
)
.unwrap();
let opts = histogram_opts!(
"subscription_cursor_query_duration",
"The amount of time a query exists inside the cursor",
exponential_buckets(1.0, 5.0, 11).unwrap(),
);
let subscription_cursor_query_duration =
register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();

let opts = histogram_opts!(
"subscription_cursor_declare_duration",
"Subscription cursor duration of declare",
exponential_buckets(1.0, 5.0, 11).unwrap(),
);
let subscription_cursor_declare_duration =
register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();

let opts = histogram_opts!(
"subscription_cursor_fetch_duration",
"Subscription cursor duration of fetch",
exponential_buckets(1.0, 5.0, 11).unwrap(),
);
let subscription_cursor_fetch_duration =
register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();

let subsription_cursor_nums = register_int_gauge_with_registry!(
"subsription_cursor_nums",
"The number of subscription cursor",
registry
)
.unwrap();
let invalid_subsription_cursor_nums = register_int_gauge_with_registry!(
"invalid_subsription_cursor_nums",
"The number of invalid subscription cursor",
registry
)
.unwrap();

let opts = histogram_opts!(
"subscription_cursor_last_fetch_duration",
"Since the last fetch, the time up to now",
exponential_buckets(1.0, 5.0, 11).unwrap(),
);
let subscription_cursor_last_fetch_duration =
register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
Self {
_cursor_metrics_collector: None,
subscription_cursor_error_count,
subscription_cursor_query_duration,
subscription_cursor_declare_duration,
subscription_cursor_fetch_duration,
subsription_cursor_nums,
invalid_subsription_cursor_nums,
subscription_cursor_last_fetch_duration,
}
}

pub fn for_test() -> Self {
GLOBAL_CURSOR_METRICS.clone()
}

pub fn start_with_session_map(&mut self, session_map: SessionMapRef) {
self._cursor_metrics_collector = Some(Arc::new(CursorMetricsCollector::new(
session_map,
self.subsription_cursor_nums.clone(),
self.invalid_subsription_cursor_nums.clone(),
self.subscription_cursor_last_fetch_duration.clone(),
)));
}

pub fn init(session_map: SessionMapRef) -> Self {
let mut cursor_metrics = GLOBAL_CURSOR_METRICS.clone();
cursor_metrics.start_with_session_map(session_map);
cursor_metrics
}
}

pub struct PeriodicCursorMetrics {
pub subsription_cursor_nums: i64,
pub invalid_subsription_cursor_nums: i64,
pub subscription_cursor_last_fetch_duration: HashMap<String, f64>,
}

struct CursorMetricsCollector {
_join_handle: JoinHandle<()>,
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
impl CursorMetricsCollector {
fn new(
session_map: SessionMapRef,
subsription_cursor_nums: GenericGauge<AtomicI64>,
invalid_subsription_cursor_nums: GenericGauge<AtomicI64>,
subscription_cursor_last_fetch_duration: HistogramVec,
) -> Self {
const COLLECT_INTERVAL_SECONDS: u64 = 60;

let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
let mut monitor_interval =
tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
// Wait for interval
_ = monitor_interval.tick() => {},
// Shutdown monitor
_ = &mut shutdown_rx => {
tracing::info!("Fragment info monitor is stopped");
return;
}
}

let session_vec = { session_map.read().values().cloned().collect::<Vec<_>>() };
let mut subsription_cursor_nums_value = 0;
let mut invalid_subsription_cursor_nums_value = 0;
for session in &session_vec {
let periodic_cursor_metrics = session
.get_cursor_manager()
.get_periodic_cursor_metrics()
.await;
subsription_cursor_nums_value +=
periodic_cursor_metrics.subsription_cursor_nums;
invalid_subsription_cursor_nums_value +=
periodic_cursor_metrics.invalid_subsription_cursor_nums;
for (subscription_name, duration) in
&periodic_cursor_metrics.subscription_cursor_last_fetch_duration
{
subscription_cursor_last_fetch_duration
.with_label_values(&[subscription_name])
.observe(*duration);
}
}
subsription_cursor_nums.set(subsription_cursor_nums_value);
invalid_subsription_cursor_nums.set(invalid_subsription_cursor_nums_value);
}
});
Self {
_join_handle: join_handle,
shutdown_tx: Some(shutdown_tx),
}
}
}
impl Drop for CursorMetricsCollector {
fn drop(&mut self) {
if let Some(shutdown_tx) = mem::take(&mut self.shutdown_tx) {
shutdown_tx.send(()).ok();
}
}
}
Loading

0 comments on commit 577b57c

Please sign in to comment.