diff --git a/src/filters/chain.rs b/src/filters/chain.rs index ba5facdb3e..9e8fc2405c 100644 --- a/src/filters/chain.rs +++ b/src/filters/chain.rs @@ -258,9 +258,7 @@ impl Filter for FilterChain { } None => { tracing::trace!(%id, "read dropping packet"); - crate::metrics::PACKETS_DROPPED - .with_label_values(&[crate::metrics::READ_DIRECTION_LABEL, id]) - .inc(); + crate::metrics::packets_dropped(crate::metrics::READ, id).inc(); return None; } } @@ -283,9 +281,7 @@ impl Filter for FilterChain { } None => { tracing::trace!(%id, "write dropping packet"); - crate::metrics::PACKETS_DROPPED - .with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL, id]) - .inc(); + crate::metrics::packets_dropped(crate::metrics::WRITE, id).inc(); None } } diff --git a/src/metrics.rs b/src/metrics.rs index 15003d7685..6c2431b1c1 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -16,7 +16,8 @@ use once_cell::sync::Lazy; use prometheus::{ - core::Collector, HistogramOpts, HistogramVec, IntCounterVec, Opts, Registry, DEFAULT_BUCKETS, + core::Collector, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, Opts, + Registry, DEFAULT_BUCKETS, }; pub use prometheus::Result; @@ -29,6 +30,9 @@ pub const METADATA_KEY_LABEL: &str = "metadata_key"; /// `read` and `write` executions. pub const DIRECTION_LABEL: &str = "event"; +pub(crate) const READ: Direction = Direction::Read; +pub(crate) const WRITE: Direction = Direction::Write; + /// Label value for [DIRECTION_LABEL] for `read` events pub const READ_DIRECTION_LABEL: &str = "read"; /// Label value for [DIRECTION_LABEL] for `write` events @@ -55,66 +59,103 @@ pub(crate) const BUCKET_FACTOR: f64 = 2.0; /// care about granularity past 1 second. pub(crate) const BUCKET_COUNT: usize = 13; -pub(crate) static PROCESSING_TIME: Lazy = Lazy::new(|| { - prometheus::register_histogram_vec_with_registry! { - prometheus::histogram_opts! { - "packets_processing_duration_seconds", - "Total processing time for a packet", - prometheus::exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT).unwrap(), - }, - &[DIRECTION_LABEL], - registry(), - } - .unwrap() -}); - -pub(crate) static BYTES_TOTAL: Lazy = Lazy::new(|| { - prometheus::register_int_counter_vec_with_registry! { - prometheus::opts! { - "bytes_total", - "total number of bytes", - }, - &[DIRECTION_LABEL], - registry(), - } - .unwrap() -}); - -pub(crate) static ERRORS_TOTAL: Lazy = Lazy::new(|| { - prometheus::register_int_counter_vec_with_registry! { - prometheus::opts! { - "errors_total", - "total number of errors sending packets", - }, - &[DIRECTION_LABEL], - registry(), - } - .unwrap() -}); - -pub(crate) static PACKETS_TOTAL: Lazy = Lazy::new(|| { - prometheus::register_int_counter_vec_with_registry! { - prometheus::opts! { - "packets_total", - "Total number of packets", - }, - &[DIRECTION_LABEL], - registry(), - } - .unwrap() -}); - -pub(crate) static PACKETS_DROPPED: Lazy = Lazy::new(|| { - prometheus::register_int_counter_vec_with_registry! { - prometheus::opts! { - "packets_dropped", - "Total number of dropped packets", - }, - &[DIRECTION_LABEL, "reason"], - registry(), +#[derive(Clone, Copy, Debug)] +pub enum Direction { + Read, + Write, +} + +impl Direction { + pub(crate) const LABEL: &'static str = DIRECTION_LABEL; + + pub fn label(self) -> &'static str { + match self { + Self::Read => READ_DIRECTION_LABEL, + Self::Write => WRITE_DIRECTION_LABEL, + } } - .unwrap() -}); +} + +pub(crate) fn processing_time(direction: Direction) -> Histogram { + static PROCESSING_TIME: Lazy = Lazy::new(|| { + prometheus::register_histogram_vec_with_registry! { + prometheus::histogram_opts! { + "packets_processing_duration_seconds", + "Total processing time for a packet", + prometheus::exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT).unwrap(), + }, + &[Direction::LABEL], + registry(), + } + .unwrap() + }); + + PROCESSING_TIME.with_label_values(&[direction.label()]) +} + +pub(crate) fn bytes_total(direction: Direction) -> IntCounter { + static BYTES_TOTAL: Lazy = Lazy::new(|| { + prometheus::register_int_counter_vec_with_registry! { + prometheus::opts! { + "bytes_total", + "total number of bytes", + }, + &[Direction::LABEL], + registry(), + } + .unwrap() + }); + + BYTES_TOTAL.with_label_values(&[direction.label()]) +} + +pub(crate) fn errors_total(direction: Direction) -> IntCounter { + static ERRORS_TOTAL: Lazy = Lazy::new(|| { + prometheus::register_int_counter_vec_with_registry! { + prometheus::opts! { + "errors_total", + "total number of errors sending packets", + }, + &[Direction::LABEL], + registry(), + } + .unwrap() + }); + + ERRORS_TOTAL.with_label_values(&[direction.label()]) +} + +pub(crate) fn packets_total(direction: Direction) -> IntCounter { + static PACKETS_TOTAL: Lazy = Lazy::new(|| { + prometheus::register_int_counter_vec_with_registry! { + prometheus::opts! { + "packets_total", + "Total number of packets", + }, + &[Direction::LABEL], + registry(), + } + .unwrap() + }); + + PACKETS_TOTAL.with_label_values(&[direction.label()]) +} + +pub(crate) fn packets_dropped(direction: Direction, reason: &str) -> IntCounter { + static PACKETS_DROPPED: Lazy = Lazy::new(|| { + prometheus::register_int_counter_vec_with_registry! { + prometheus::opts! { + "packets_dropped", + "Total number of dropped packets", + }, + &[Direction::LABEL, "reason"], + registry(), + } + .unwrap() + }); + + PACKETS_DROPPED.with_label_values(&[direction.label(), reason]) +} /// Create a generic metrics options. /// Use [filter_opts] instead if the intended target is a filter. diff --git a/src/proxy.rs b/src/proxy.rs index a9b9d40961..0f93556424 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -197,7 +197,7 @@ impl Proxy { ); tokio::select! { recv = socket.recv_from(&mut buf) => { - let timer = crate::metrics::PROCESSING_TIME.with_label_values(&[crate::metrics::READ_DIRECTION_LABEL]).start_timer(); + let timer = crate::metrics::processing_time(crate::metrics::READ).start_timer(); match recv { Ok((size, source)) => { let contents = buf[..size].to_vec(); @@ -234,9 +234,7 @@ impl Proxy { let endpoints: Vec<_> = clusters.endpoints().collect(); if endpoints.is_empty() { tracing::trace!("dropping packet, no upstream endpoints available"); - crate::metrics::PACKETS_DROPPED - .with_label_values(&[crate::metrics::READ_DIRECTION_LABEL, "NoEndpointsAvailable"]) - .inc(); + crate::metrics::packets_dropped(crate::metrics::READ, "NoEndpointsAvailable").inc(); return; } diff --git a/src/proxy/sessions.rs b/src/proxy/sessions.rs index 9da07209e8..e35bb21038 100644 --- a/src/proxy/sessions.rs +++ b/src/proxy/sessions.rs @@ -136,7 +136,7 @@ impl Session { tracing::debug!(source = %s.source, dest = ?s.dest, "Session created"); - self::metrics::TOTAL_SESSIONS.inc(); + self::metrics::total_sessions().inc(); s.active_session_metric().inc(); s.run(args.ttl, args.downstream_socket, shutdown_rx); Ok(s) @@ -165,14 +165,12 @@ impl Session { received = upstream_socket.recv_from(&mut buf) => { match received { Err(error) => { - crate::metrics::ERRORS_TOTAL.with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL]).inc(); + crate::metrics::errors_total(crate::metrics::WRITE).inc(); tracing::error!(%error, %source, dest = ?endpoint, "Error receiving packet"); }, Ok((size, recv_addr)) => { - crate::metrics::BYTES_TOTAL - .with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL]) - .inc_by(size as u64); - crate::metrics::PACKETS_TOTAL.with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL]).inc(); + crate::metrics::bytes_total(crate::metrics::WRITE).inc_by(size as u64); + crate::metrics::packets_total(crate::metrics::WRITE).inc(); Session::process_recv_packet( &downstream_socket, &expiration, @@ -183,7 +181,7 @@ impl Session { endpoint: &endpoint, source: recv_addr.into(), dest: source.clone(), - timer: crate::metrics::PROCESSING_TIME.with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL]).start_timer(), + timer: crate::metrics::processing_time(crate::metrics::WRITE).start_timer(), }).await } }; @@ -211,20 +209,13 @@ impl Session { } fn active_session_metric(&self) -> prometheus::IntGauge { - let labels = self + let (asn_number, ip_prefix) = self .asn_info .as_ref() - .map(|asn| [asn.r#as.to_string(), asn.prefix.clone()]) - .unwrap_or_else(|| [String::new(), String::new()]); + .map(|asn| (asn.r#as, &*asn.prefix)) + .unwrap_or_else(|| (<_>::default(), <_>::default())); - let mut iter = labels.iter(); - - let labels = [ - iter.next().map(|item| &**item).unwrap(), - iter.next().map(|item| &**item).unwrap(), - ]; - - metrics::ACTIVE_SESSIONS.with_label_values(labels.as_slice()) + metrics::active_sessions(asn_number as u16, ip_prefix) } /// process_recv_packet processes a packet that is received by this session. @@ -268,12 +259,12 @@ impl Session { let handle_error = |error: Error| { error.log(); - crate::metrics::PACKETS_DROPPED - .with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL]) - .inc(); - crate::metrics::ERRORS_TOTAL - .with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL]) - .inc(); + crate::metrics::packets_dropped( + crate::metrics::WRITE, + "proxy::Session::process_recv_packet", + ) + .inc(); + crate::metrics::errors_total(crate::metrics::WRITE).inc(); }; match result { @@ -323,20 +314,12 @@ impl Session { match self.do_send(buf).await { Ok(size) => { - crate::metrics::PACKETS_TOTAL - .with_label_values(&[crate::metrics::READ_DIRECTION_LABEL]) - .inc(); - crate::metrics::BYTES_TOTAL - .with_label_values(&[crate::metrics::READ_DIRECTION_LABEL]) - .inc_by(size as u64); + crate::metrics::packets_total(crate::metrics::READ).inc(); + crate::metrics::bytes_total(crate::metrics::READ).inc_by(size as u64); } Err(error) => { - crate::metrics::PACKETS_DROPPED - .with_label_values(&[crate::metrics::READ_DIRECTION_LABEL]) - .inc(); - crate::metrics::ERRORS_TOTAL - .with_label_values(&[crate::metrics::READ_DIRECTION_LABEL]) - .inc(); + crate::metrics::packets_dropped(crate::metrics::READ, "proxy::Session::send").inc(); + crate::metrics::errors_total(crate::metrics::READ).inc(); tracing::error!(kind=%error.kind(), "{}", error); return; } @@ -357,7 +340,7 @@ impl Session { impl Drop for Session { fn drop(&mut self) { self.active_session_metric().dec(); - metrics::DURATION_SECS.observe(self.created_at.elapsed().as_secs() as f64); + metrics::duration_secs().observe(self.created_at.elapsed().as_secs() as f64); if let Err(error) = self.shutdown_tx.send(()) { tracing::warn!(%error, "Error sending session shutdown signal"); diff --git a/src/proxy/sessions/metrics.rs b/src/proxy/sessions/metrics.rs index 4fe5ac646e..3315940057 100644 --- a/src/proxy/sessions/metrics.rs +++ b/src/proxy/sessions/metrics.rs @@ -15,7 +15,7 @@ */ use once_cell::sync::Lazy; -use prometheus::{Histogram, IntCounter, IntGaugeVec, Opts}; +use prometheus::{Histogram, IntCounter, IntGauge, IntGaugeVec, Opts}; use crate::metrics::{histogram_opts, register}; @@ -23,34 +23,46 @@ const SUBSYSTEM: &str = "session"; const ASN_NUMBER_LABEL: &str = "asn"; const IP_PREFIX_LABEL: &str = "ip_prefix"; -pub(crate) static ACTIVE_SESSIONS: Lazy = Lazy::new(|| { - prometheus::register_int_gauge_vec_with_registry! { - Opts::new("active", "number of sessions currently active").subsystem(SUBSYSTEM), - &[ASN_NUMBER_LABEL, IP_PREFIX_LABEL], - crate::metrics::registry(), - } - .unwrap() -}); - -pub(crate) static TOTAL_SESSIONS: Lazy = Lazy::new(|| { - register( - IntCounter::with_opts( - Opts::new("total", "total number of established sessions").subsystem(SUBSYSTEM), +pub(crate) fn active_sessions(asn_number: u16, ip_prefix: &str) -> IntGauge { + static ACTIVE_SESSIONS: Lazy = Lazy::new(|| { + prometheus::register_int_gauge_vec_with_registry! { + Opts::new("active", "number of sessions currently active").subsystem(SUBSYSTEM), + &[ASN_NUMBER_LABEL, IP_PREFIX_LABEL], + crate::metrics::registry(), + } + .unwrap() + }); + + ACTIVE_SESSIONS.with_label_values(&[&asn_number.to_string(), ip_prefix]) +} + +pub(crate) fn total_sessions() -> &'static IntCounter { + static TOTAL_SESSIONS: Lazy = Lazy::new(|| { + register( + IntCounter::with_opts( + Opts::new("total", "total number of established sessions").subsystem(SUBSYSTEM), + ) + .unwrap(), + ) + }); + + &*TOTAL_SESSIONS +} + +pub(crate) fn duration_secs() -> &'static Histogram { + static DURATION_SECS: Lazy = Lazy::new(|| { + register( + Histogram::with_opts(histogram_opts( + "duration_secs", + SUBSYSTEM, + "duration of sessions", + vec![ + 1f64, 5f64, 10f64, 25f64, 60f64, 300f64, 900f64, 1800f64, 3600f64, + ], + )) + .unwrap(), ) - .unwrap(), - ) -}); - -pub(crate) static DURATION_SECS: Lazy = Lazy::new(|| { - register( - Histogram::with_opts(histogram_opts( - "duration_secs", - SUBSYSTEM, - "duration of sessions", - vec![ - 1f64, 5f64, 10f64, 25f64, 60f64, 300f64, 900f64, 1800f64, 3600f64, - ], - )) - .unwrap(), - ) -}); + }); + + &*DURATION_SECS +}