Skip to content

Commit

Permalink
Refactor metrics statics into free fns (#633)
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky authored Oct 27, 2022
1 parent 0f7a541 commit facf875
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 138 deletions.
8 changes: 2 additions & 6 deletions src/filters/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,7 @@ impl Filter for FilterChain {
}
None => {
tracing::trace!(%id, "read dropping packet");
crate::metrics::PACKETS_DROPPED
.with_label_values(&[crate::metrics::READ_DIRECTION_LABEL, id])
.inc();
crate::metrics::packets_dropped(crate::metrics::READ, id).inc();
return None;
}
}
Expand All @@ -283,9 +281,7 @@ impl Filter for FilterChain {
}
None => {
tracing::trace!(%id, "write dropping packet");
crate::metrics::PACKETS_DROPPED
.with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL, id])
.inc();
crate::metrics::packets_dropped(crate::metrics::WRITE, id).inc();
None
}
}
Expand Down
161 changes: 101 additions & 60 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

use once_cell::sync::Lazy;
use prometheus::{
core::Collector, HistogramOpts, HistogramVec, IntCounterVec, Opts, Registry, DEFAULT_BUCKETS,
core::Collector, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, Opts,
Registry, DEFAULT_BUCKETS,
};

pub use prometheus::Result;
Expand All @@ -29,6 +30,9 @@ pub const METADATA_KEY_LABEL: &str = "metadata_key";
/// `read` and `write` executions.
pub const DIRECTION_LABEL: &str = "event";

pub(crate) const READ: Direction = Direction::Read;
pub(crate) const WRITE: Direction = Direction::Write;

/// Label value for [DIRECTION_LABEL] for `read` events
pub const READ_DIRECTION_LABEL: &str = "read";
/// Label value for [DIRECTION_LABEL] for `write` events
Expand All @@ -55,66 +59,103 @@ pub(crate) const BUCKET_FACTOR: f64 = 2.0;
/// care about granularity past 1 second.
pub(crate) const BUCKET_COUNT: usize = 13;

pub(crate) static PROCESSING_TIME: Lazy<HistogramVec> = Lazy::new(|| {
prometheus::register_histogram_vec_with_registry! {
prometheus::histogram_opts! {
"packets_processing_duration_seconds",
"Total processing time for a packet",
prometheus::exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT).unwrap(),
},
&[DIRECTION_LABEL],
registry(),
}
.unwrap()
});

pub(crate) static BYTES_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"bytes_total",
"total number of bytes",
},
&[DIRECTION_LABEL],
registry(),
}
.unwrap()
});

pub(crate) static ERRORS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"errors_total",
"total number of errors sending packets",
},
&[DIRECTION_LABEL],
registry(),
}
.unwrap()
});

pub(crate) static PACKETS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"packets_total",
"Total number of packets",
},
&[DIRECTION_LABEL],
registry(),
}
.unwrap()
});

pub(crate) static PACKETS_DROPPED: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"packets_dropped",
"Total number of dropped packets",
},
&[DIRECTION_LABEL, "reason"],
registry(),
#[derive(Clone, Copy, Debug)]
pub enum Direction {
Read,
Write,
}

impl Direction {
pub(crate) const LABEL: &'static str = DIRECTION_LABEL;

pub fn label(self) -> &'static str {
match self {
Self::Read => READ_DIRECTION_LABEL,
Self::Write => WRITE_DIRECTION_LABEL,
}
}
.unwrap()
});
}

pub(crate) fn processing_time(direction: Direction) -> Histogram {
static PROCESSING_TIME: Lazy<HistogramVec> = Lazy::new(|| {
prometheus::register_histogram_vec_with_registry! {
prometheus::histogram_opts! {
"packets_processing_duration_seconds",
"Total processing time for a packet",
prometheus::exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT).unwrap(),
},
&[Direction::LABEL],
registry(),
}
.unwrap()
});

PROCESSING_TIME.with_label_values(&[direction.label()])
}

pub(crate) fn bytes_total(direction: Direction) -> IntCounter {
static BYTES_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"bytes_total",
"total number of bytes",
},
&[Direction::LABEL],
registry(),
}
.unwrap()
});

BYTES_TOTAL.with_label_values(&[direction.label()])
}

pub(crate) fn errors_total(direction: Direction) -> IntCounter {
static ERRORS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"errors_total",
"total number of errors sending packets",
},
&[Direction::LABEL],
registry(),
}
.unwrap()
});

ERRORS_TOTAL.with_label_values(&[direction.label()])
}

pub(crate) fn packets_total(direction: Direction) -> IntCounter {
static PACKETS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"packets_total",
"Total number of packets",
},
&[Direction::LABEL],
registry(),
}
.unwrap()
});

PACKETS_TOTAL.with_label_values(&[direction.label()])
}

pub(crate) fn packets_dropped(direction: Direction, reason: &str) -> IntCounter {
static PACKETS_DROPPED: Lazy<IntCounterVec> = Lazy::new(|| {
prometheus::register_int_counter_vec_with_registry! {
prometheus::opts! {
"packets_dropped",
"Total number of dropped packets",
},
&[Direction::LABEL, "reason"],
registry(),
}
.unwrap()
});

PACKETS_DROPPED.with_label_values(&[direction.label(), reason])
}

/// Create a generic metrics options.
/// Use [filter_opts] instead if the intended target is a filter.
Expand Down
6 changes: 2 additions & 4 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl Proxy {
);
tokio::select! {
recv = socket.recv_from(&mut buf) => {
let timer = crate::metrics::PROCESSING_TIME.with_label_values(&[crate::metrics::READ_DIRECTION_LABEL]).start_timer();
let timer = crate::metrics::processing_time(crate::metrics::READ).start_timer();
match recv {
Ok((size, source)) => {
let contents = buf[..size].to_vec();
Expand Down Expand Up @@ -234,9 +234,7 @@ impl Proxy {
let endpoints: Vec<_> = clusters.endpoints().collect();
if endpoints.is_empty() {
tracing::trace!("dropping packet, no upstream endpoints available");
crate::metrics::PACKETS_DROPPED
.with_label_values(&[crate::metrics::READ_DIRECTION_LABEL, "NoEndpointsAvailable"])
.inc();
crate::metrics::packets_dropped(crate::metrics::READ, "NoEndpointsAvailable").inc();
return;
}

Expand Down
57 changes: 20 additions & 37 deletions src/proxy/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Session {

tracing::debug!(source = %s.source, dest = ?s.dest, "Session created");

self::metrics::TOTAL_SESSIONS.inc();
self::metrics::total_sessions().inc();
s.active_session_metric().inc();
s.run(args.ttl, args.downstream_socket, shutdown_rx);
Ok(s)
Expand Down Expand Up @@ -165,14 +165,12 @@ impl Session {
received = upstream_socket.recv_from(&mut buf) => {
match received {
Err(error) => {
crate::metrics::ERRORS_TOTAL.with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL]).inc();
crate::metrics::errors_total(crate::metrics::WRITE).inc();
tracing::error!(%error, %source, dest = ?endpoint, "Error receiving packet");
},
Ok((size, recv_addr)) => {
crate::metrics::BYTES_TOTAL
.with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL])
.inc_by(size as u64);
crate::metrics::PACKETS_TOTAL.with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL]).inc();
crate::metrics::bytes_total(crate::metrics::WRITE).inc_by(size as u64);
crate::metrics::packets_total(crate::metrics::WRITE).inc();
Session::process_recv_packet(
&downstream_socket,
&expiration,
Expand All @@ -183,7 +181,7 @@ impl Session {
endpoint: &endpoint,
source: recv_addr.into(),
dest: source.clone(),
timer: crate::metrics::PROCESSING_TIME.with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL]).start_timer(),
timer: crate::metrics::processing_time(crate::metrics::WRITE).start_timer(),
}).await
}
};
Expand Down Expand Up @@ -211,20 +209,13 @@ impl Session {
}

fn active_session_metric(&self) -> prometheus::IntGauge {
let labels = self
let (asn_number, ip_prefix) = self
.asn_info
.as_ref()
.map(|asn| [asn.r#as.to_string(), asn.prefix.clone()])
.unwrap_or_else(|| [String::new(), String::new()]);
.map(|asn| (asn.r#as, &*asn.prefix))
.unwrap_or_else(|| (<_>::default(), <_>::default()));

let mut iter = labels.iter();

let labels = [
iter.next().map(|item| &**item).unwrap(),
iter.next().map(|item| &**item).unwrap(),
];

metrics::ACTIVE_SESSIONS.with_label_values(labels.as_slice())
metrics::active_sessions(asn_number as u16, ip_prefix)
}

/// process_recv_packet processes a packet that is received by this session.
Expand Down Expand Up @@ -268,12 +259,12 @@ impl Session {

let handle_error = |error: Error| {
error.log();
crate::metrics::PACKETS_DROPPED
.with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL])
.inc();
crate::metrics::ERRORS_TOTAL
.with_label_values(&[crate::metrics::WRITE_DIRECTION_LABEL])
.inc();
crate::metrics::packets_dropped(
crate::metrics::WRITE,
"proxy::Session::process_recv_packet",
)
.inc();
crate::metrics::errors_total(crate::metrics::WRITE).inc();
};

match result {
Expand Down Expand Up @@ -323,20 +314,12 @@ impl Session {

match self.do_send(buf).await {
Ok(size) => {
crate::metrics::PACKETS_TOTAL
.with_label_values(&[crate::metrics::READ_DIRECTION_LABEL])
.inc();
crate::metrics::BYTES_TOTAL
.with_label_values(&[crate::metrics::READ_DIRECTION_LABEL])
.inc_by(size as u64);
crate::metrics::packets_total(crate::metrics::READ).inc();
crate::metrics::bytes_total(crate::metrics::READ).inc_by(size as u64);
}
Err(error) => {
crate::metrics::PACKETS_DROPPED
.with_label_values(&[crate::metrics::READ_DIRECTION_LABEL])
.inc();
crate::metrics::ERRORS_TOTAL
.with_label_values(&[crate::metrics::READ_DIRECTION_LABEL])
.inc();
crate::metrics::packets_dropped(crate::metrics::READ, "proxy::Session::send").inc();
crate::metrics::errors_total(crate::metrics::READ).inc();
tracing::error!(kind=%error.kind(), "{}", error);
return;
}
Expand All @@ -357,7 +340,7 @@ impl Session {
impl Drop for Session {
fn drop(&mut self) {
self.active_session_metric().dec();
metrics::DURATION_SECS.observe(self.created_at.elapsed().as_secs() as f64);
metrics::duration_secs().observe(self.created_at.elapsed().as_secs() as f64);

if let Err(error) = self.shutdown_tx.send(()) {
tracing::warn!(%error, "Error sending session shutdown signal");
Expand Down
Loading

0 comments on commit facf875

Please sign in to comment.