Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
fix ci

fix
  • Loading branch information
xxhZs committed Sep 20, 2024
1 parent e9ca7ad commit 4cbb0e1
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 33 deletions.
87 changes: 60 additions & 27 deletions src/frontend/src/monitor/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use prometheus::core::{AtomicU64, GenericCounter};
use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
use prometheus::{
exponential_buckets, histogram_opts, register_histogram_vec_with_registry,
register_histogram_with_registry, register_int_counter_with_registry,
Expand Down Expand Up @@ -87,17 +87,23 @@ impl FrontendMetrics {
}
}

pub static GLOBAL_CURSOR_METRICS: LazyLock<CursorMetrics> =
LazyLock::new(|| CursorMetrics::new(&GLOBAL_METRICS_REGISTRY));

#[derive(Clone)]
pub struct CursorMetrics {
pub subscription_cursor_error_count: GenericCounter<AtomicU64>,
pub subscription_cursor_query_duration: HistogramVec,
pub subscription_cursor_declare_duration: HistogramVec,
pub subscription_cursor_fetch_duration: HistogramVec,
_cursor_metrics_collector: Arc<CursorMetricsCollector>,
subsription_cursor_nums: GenericGauge<AtomicI64>,
invalid_subsription_cursor_nums: GenericGauge<AtomicI64>,
subscription_cursor_last_fetch_duration: HistogramVec,
_cursor_metrics_collector: Option<Arc<CursorMetricsCollector>>,
}

impl CursorMetrics {
pub fn new(registry: &Registry, session_map: SessionMapRef) -> Self {
pub fn new(registry: &Registry) -> Self {
let subscription_cursor_error_count = register_int_counter_with_registry!(
"subscription_cursor_error_count",
"The subscription error num of cursor",
Expand Down Expand Up @@ -127,29 +133,6 @@ impl CursorMetrics {
);
let subscription_cursor_fetch_duration =
register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
Self {
_cursor_metrics_collector: Arc::new(CursorMetricsCollector::new(session_map, registry)),
subscription_cursor_error_count,
subscription_cursor_query_duration,
subscription_cursor_declare_duration,
subscription_cursor_fetch_duration,
}
}
}

pub struct PeriodicCursorMetrics {
pub subsription_cursor_nums: i64,
pub invalid_subsription_cursor_nums: i64,
pub subscription_cursor_last_fetch_duration: HashMap<String, f64>,
}

struct CursorMetricsCollector {
_join_handle: JoinHandle<()>,
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
impl CursorMetricsCollector {
fn new(session_map: SessionMapRef, registry: &Registry) -> Self {
const COLLECT_INTERVAL_SECONDS: u64 = 60;

let subsription_cursor_nums = register_int_gauge_with_registry!(
"subsription_cursor_nums",
Expand All @@ -171,6 +154,56 @@ impl CursorMetricsCollector {
);
let subscription_cursor_last_fetch_duration =
register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
Self {
_cursor_metrics_collector: None,
subscription_cursor_error_count,
subscription_cursor_query_duration,
subscription_cursor_declare_duration,
subscription_cursor_fetch_duration,
subsription_cursor_nums,
invalid_subsription_cursor_nums,
subscription_cursor_last_fetch_duration,
}
}

pub fn for_test() -> Self {
GLOBAL_CURSOR_METRICS.clone()
}

pub fn start_with_session_map(&mut self, session_map: SessionMapRef) {
self._cursor_metrics_collector = Some(Arc::new(CursorMetricsCollector::new(
session_map,
self.subsription_cursor_nums.clone(),
self.invalid_subsription_cursor_nums.clone(),
self.subscription_cursor_last_fetch_duration.clone(),
)));
}

pub fn init(session_map: SessionMapRef) -> Self {
let mut cursor_metrics = GLOBAL_CURSOR_METRICS.clone();
cursor_metrics.start_with_session_map(session_map);
cursor_metrics
}
}

pub struct PeriodicCursorMetrics {
pub subsription_cursor_nums: i64,
pub invalid_subsription_cursor_nums: i64,
pub subscription_cursor_last_fetch_duration: HashMap<String, f64>,
}

struct CursorMetricsCollector {
_join_handle: JoinHandle<()>,
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
impl CursorMetricsCollector {
fn new(
session_map: SessionMapRef,
subsription_cursor_nums: GenericGauge<AtomicI64>,
invalid_subsription_cursor_nums: GenericGauge<AtomicI64>,
subscription_cursor_last_fetch_duration: HistogramVec,
) -> Self {
const COLLECT_INTERVAL_SECONDS: u64 = 60;

let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
Expand Down Expand Up @@ -221,7 +254,7 @@ impl CursorMetricsCollector {
impl Drop for CursorMetricsCollector {
fn drop(&mut self) {
if let Some(shutdown_tx) = mem::take(&mut self.shutdown_tx) {
shutdown_tx.send(()).unwrap();
shutdown_tx.send(()).ok();
}
}
}
8 changes: 2 additions & 6 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use risingwave_common::config::{
load_config, BatchConfig, MetaConfig, MetricLevel, StreamingConfig,
};
use risingwave_common::memory::MemoryContext;
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
use risingwave_common::system_param::local_manager::{
Expand Down Expand Up @@ -236,7 +235,7 @@ impl FrontendEnv {
client_pool,
sessions_map: sessions_map.clone(),
frontend_metrics: Arc::new(FrontendMetrics::for_test()),
cursor_metrics: Arc::new(CursorMetrics::new(&GLOBAL_METRICS_REGISTRY, sessions_map)),
cursor_metrics: Arc::new(CursorMetrics::for_test()),
batch_config: BatchConfig::default(),
meta_config: MetaConfig::default(),
streaming_config: StreamingConfig::default(),
Expand Down Expand Up @@ -415,10 +414,7 @@ impl FrontendEnv {
));

let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
let cursor_metrics = Arc::new(CursorMetrics::new(
&GLOBAL_METRICS_REGISTRY,
sessions_map.clone(),
));
let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
let sessions = sessions_map.clone();

// Idle transaction background monitor
Expand Down

0 comments on commit 4cbb0e1

Please sign in to comment.