Skip to content

Commit

Permalink
refactor metrics
Browse files Browse the repository at this point in the history
Signed-off-by: terassyi <[email protected]>
  • Loading branch information
terassyi committed Jun 3, 2024
1 parent dc8d748 commit 12c5432
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 5 deletions.
4 changes: 3 additions & 1 deletion sartd/src/kubernetes/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ pub use kube::{
use serde::Serialize;
use tokio::sync::RwLock;

use sartd_trace::{error::TraceableError, metrics::Metrics};
use sartd_trace::error::TraceableError;

use crate::metrics::Metrics;

pub trait Ctx {
fn metrics(&self) -> &Metrics;
Expand Down
1 change: 1 addition & 0 deletions sartd/src/kubernetes/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub mod error;
pub mod reconciler;
pub mod server;
pub mod webhook;
mod metrics;
92 changes: 92 additions & 0 deletions sartd/src/kubernetes/src/controller/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use kube::Resource;
use prometheus::Registry;
use prometheus::{histogram_opts, opts, HistogramVec, IntCounter, IntCounterVec};
use sartd_trace::error::TraceableError;
use tokio::time::Instant;

#[derive(Debug, Clone)]
pub struct Metrics {
pub reconciliations: IntCounterVec,
pub failures: IntCounterVec,
pub reconcile_duration: HistogramVec,
}

impl Default for Metrics {
fn default() -> Self {
let reconcile_duration = HistogramVec::new(
histogram_opts!(
"sart_controller_reconcile_duration_seconds",
"The duration of reconcile to complete in seconds"
)
.buckets(vec![0.01, 0.1, 0.25, 0.5, 1., 5., 15., 60.]),
&[],
)
.unwrap();
let failures = IntCounterVec::new(
opts!(
"sart_controller_reconciliation_errors_total",
"reconciliation errors",
),
&["resource", "instance", "error"],
)
.unwrap();
let reconciliations = IntCounterVec::new(
opts!(
"sart_controller_reconciliation_total",
"Total count of reconciliations",
),
&["resource", "instance"],
)
.unwrap();
Metrics {
reconciliations,
failures,
reconcile_duration,
}
}
}

impl Metrics {
pub fn register(self, registry: &Registry) -> Result<Self, prometheus::Error> {
Ok(self)
}

pub fn reconcile_failure<T: Resource<DynamicType = ()>, E: TraceableError>(
&self,
resource: &T,
e: &E,
) {
self.failures
.with_label_values(&[
&resource.object_ref(&()).kind.unwrap(),
&resource.object_ref(&()).name.unwrap(),
e.metric_label().as_ref(),
])
.inc()
}

pub fn reconciliation<T: Resource<DynamicType = ()>>(&self, resource: &T) {
self.reconciliations
.with_label_values(&[
&resource.object_ref(&()).kind.unwrap(),
&resource.object_ref(&()).name.unwrap(),
])
.inc()
}
}

/// Smart function duration measurer
///
/// Relies on Drop to calculate duration and register the observation in the histogram
pub struct ReconcileMeasurer {
start: Instant,
metric: HistogramVec,
}

impl Drop for ReconcileMeasurer {
fn drop(&mut self) {
#[allow(clippy::cast_precision_loss)]
let duration = self.start.elapsed().as_millis() as f64 / 1000.0;
self.metric.with_label_values(&[]).observe(duration);
}
}
33 changes: 31 additions & 2 deletions sartd/src/kubernetes/src/controller/reconciler/address_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,51 @@ pub async fn reconciler(
ctx: Arc<ContextWith<Arc<AllocatorSet>>>,
) -> Result<Action, Error> {
let address_blocks = Api::<AddressBlock>::all(ctx.client().clone());
ctx.inner.metrics().reconciliation(ab.as_ref());

finalizer(
&address_blocks,
ADDRESS_BLOCK_FINALIZER,
ab,
|event| async {
match event {
Event::Apply(ab) => reconcile(&address_blocks, &ab, ctx.clone()).await,
Event::Cleanup(ab) => cleanup(&address_blocks, &ab, ctx.clone()).await,
Event::Apply(ab) => reconcile_with_metrics(&address_blocks, &ab, ctx.clone()).await,
Event::Cleanup(ab) => cleanup_with_metrics(&address_blocks, &ab, ctx.clone()).await,
}
},
)
.await
.map_err(|e| Error::Finalizer(Box::new(e)))
}

async fn reconcile_with_metrics(
api: &Api<AddressBlock>,
ab: &AddressBlock,
ctx: Arc<ContextWith<Arc<AllocatorSet>>>,
) -> Result<Action, Error> {
match reconcile(api, ab, ctx.clone()).await {
Ok(action) => Ok(action),
Err(e) => {
ctx.inner.metrics().reconcile_failure(ab, &e);
Err(e)
}
}
}

async fn cleanup_with_metrics(
api: &Api<AddressBlock>,
ab: &AddressBlock,
ctx: Arc<ContextWith<Arc<AllocatorSet>>>,
) -> Result<Action, Error> {
match cleanup(api, ab, ctx.clone()).await {
Ok(action) => Ok(action),
Err(e) => {
ctx.inner.metrics().reconcile_failure(ab, &e);
Err(e)
}
}
}

#[tracing::instrument(skip_all, fields(trace_id))]
async fn reconcile(
_api: &Api<AddressBlock>,
Expand Down
3 changes: 1 addition & 2 deletions sartd/src/kubernetes/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub mod reconciler {
Client, Resource, ResourceExt,
};
use prometheus::Registry;
use sartd_trace::metrics::Metrics;
use serde::Serialize;

use crate::{
Expand All @@ -43,7 +42,7 @@ pub mod reconciler {
RouterIdSelector, SpeakerConfig, CLUSTER_BGP_FINALIZER,
},
node_bgp::{NodeBGP, NodeBGPSpec},
},
}, metrics::Metrics,
};

pub type ApiServerHandle = tower_test::mock::Handle<Request<Body>, Response<Body>>;
Expand Down
1 change: 1 addition & 0 deletions sartd/src/kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pub mod controller;
pub mod crd;
pub mod error;
pub mod fixture;
pub mod metrics;
pub mod util;
90 changes: 90 additions & 0 deletions sartd/src/kubernetes/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use kube::Resource;
use prometheus::Registry;
use prometheus::{histogram_opts, opts, HistogramVec, IntCounter, IntCounterVec};
use sartd_trace::error::TraceableError;
use tokio::time::Instant;

#[derive(Debug, Clone)]
pub struct Metrics {
pub reconciliations: IntCounterVec,
pub failures: IntCounterVec,
pub reconcile_duration: HistogramVec,
}

impl Default for Metrics {
fn default() -> Self {
let reconcile_duration = HistogramVec::new(
histogram_opts!(
"sart_controller_reconcile_duration_seconds",
"The duration of reconcile to complete in seconds"
)
.buckets(vec![0.01, 0.1, 0.25, 0.5, 1., 5., 15., 60.]),
&[],
)
.unwrap();
let failures = IntCounterVec::new(
opts!(
"sart_controller_reconciliation_errors_total",
"reconciliation errors",
),
&["resource", "instance", "error"],
)
.unwrap();
let reconciliations = IntCounterVec::new(
opts!(
"sart_controller_reconciliation_total",
"Total count of reconciliations",
),
&["resource", "instance"],
)
.unwrap();
Metrics {
reconciliations,
failures,
reconcile_duration,
}
}
}

impl Metrics {
pub fn register(self, registry: &Registry) -> Result<Self, prometheus::Error> {
Ok(self)
}

pub fn reconcile_failure<T: Resource<DynamicType = ()>, E: TraceableError>(
&self,
resource: &T,
e: &E,
) {
self.failures
.with_label_values(&[
&resource.object_ref(&()).kind.unwrap(),
&resource.object_ref(&()).name.unwrap(),
e.metric_label().as_ref(),
])
.inc()
}

pub fn reconciliation<T: Resource<DynamicType = ()>>(&self, resource: &T) {
self.reconciliations.with_label_values(&[
&resource.object_ref(&()).kind.unwrap(),
&resource.object_ref(&()).name.unwrap(),
]).inc()
}
}

/// Smart function duration measurer
///
/// Relies on Drop to calculate duration and register the observation in the histogram
pub struct ReconcileMeasurer {
start: Instant,
metric: HistogramVec,
}

impl Drop for ReconcileMeasurer {
fn drop(&mut self) {
#[allow(clippy::cast_precision_loss)]
let duration = self.start.elapsed().as_millis() as f64 / 1000.0;
self.metric.with_label_values(&[]).observe(duration);
}
}

0 comments on commit 12c5432

Please sign in to comment.