diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 58fef7e8945..99ac2b35e24 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -23,6 +23,8 @@ use std::time::Instant; use bytes::Buf; use futures::TryFutureExt; +use prometheus_client::encoding::EncodeLabel; +use prometheus_client::encoding::EncodeLabelSet; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; use prometheus_client::metrics::histogram; @@ -81,7 +83,7 @@ use crate::*; /// ``` #[derive(Debug, Clone)] pub struct PrometheusClientLayer { - metrics: PrometheusClientMetrics, + metrics: PrometheusClientMetricDefinitions, } impl PrometheusClientLayer { @@ -89,7 +91,7 @@ impl PrometheusClientLayer { /// that do NOT call this method multiple times with a same registry. If you want initialize multiple /// [`PrometheusClientLayer`] with a single registry, you should use [`Arc::clone`] instead. pub fn new(registry: &mut Registry) -> Self { - let metrics = PrometheusClientMetrics::register(registry); + let metrics = PrometheusClientMetricDefinitions::register(registry); Self { metrics } } } @@ -100,8 +102,11 @@ impl Layer for PrometheusClientLayer { fn layer(&self, inner: A) -> Self::LayeredAccess { let meta = inner.info(); let scheme = meta.scheme(); + let root = meta.root().to_string(); + let name = meta.name().to_string(); - let metrics = Arc::new(self.metrics.clone()); + let metrics = + PrometheusClientMetrics::new(Arc::new(self.metrics.clone()), scheme, root, name); PrometheusAccessor { inner, metrics, @@ -110,12 +115,53 @@ impl Layer for PrometheusClientLayer { } } -type OperationLabels = [(&'static str, &'static str); 2]; -type ErrorLabels = [(&'static str, &'static str); 3]; +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +struct OperationLabels { + op: &'static str, + scheme: &'static str, + root: Arc, + namespace: Arc, +} + +impl EncodeLabelSet for OperationLabels { + fn encode( + &self, + mut encoder: prometheus_client::encoding::LabelSetEncoder, + ) -> std::result::Result<(), std::fmt::Error> { + ("op", self.op).encode(encoder.encode_label())?; + ("scheme", self.scheme).encode(encoder.encode_label())?; + ("namespace", self.namespace.as_str()).encode(encoder.encode_label())?; + ("root", self.root.as_str()).encode(encoder.encode_label())?; + Ok(()) + } +} -/// [`PrometheusClientMetrics`] provide the performance and IO metrics with the `prometheus-client` crate. +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +struct ErrorLabels { + op: &'static str, + scheme: &'static str, + err: &'static str, + root: Arc, + namespace: Arc, +} + +impl EncodeLabelSet for ErrorLabels { + fn encode( + &self, + mut encoder: prometheus_client::encoding::LabelSetEncoder, + ) -> std::result::Result<(), std::fmt::Error> { + ("op", self.op).encode(encoder.encode_label())?; + ("scheme", self.scheme).encode(encoder.encode_label())?; + ("error", self.err).encode(encoder.encode_label())?; + ("namespace", self.namespace.as_str()).encode(encoder.encode_label())?; + ("root", self.root.as_str()).encode(encoder.encode_label())?; + Ok(()) + } +} + +/// [`PrometheusClientMetricDefinitions`] provide the definition about RED(Rate/Error/Duration) metrics with the `prometheus-client` crate. #[derive(Debug, Clone)] -struct PrometheusClientMetrics { +struct PrometheusClientMetricDefinitions { /// Total counter of the specific operation be called. requests_total: Family, /// Total counter of the errors. @@ -128,7 +174,7 @@ struct PrometheusClientMetrics { bytes_total: Family, } -impl PrometheusClientMetrics { +impl PrometheusClientMetricDefinitions { pub fn register(registry: &mut Registry) -> Self { let requests_total = Family::default(); let errors_total = Family::default(); @@ -159,32 +205,78 @@ impl PrometheusClientMetrics { bytes_total, } } +} + +#[derive(Clone, Debug)] +struct PrometheusClientMetrics { + metrics: Arc, + scheme: Scheme, + root: Arc, + name: Arc, +} - fn increment_errors_total(&self, scheme: Scheme, op: &'static str, err: ErrorKind) { - let labels = [ - ("scheme", scheme.into_static()), - ("op", op), - ("err", err.into_static()), - ]; - self.errors_total.get_or_create(&labels).inc(); +impl PrometheusClientMetrics { + fn new( + metrics: Arc, + scheme: Scheme, + root: String, + name: String, + ) -> Self { + Self { + metrics, + scheme, + root: Arc::new(root), + name: Arc::new(name), + } + } + + fn increment_errors_total(&self, op: &'static str, err: ErrorKind) { + let labels = ErrorLabels { + op, + scheme: self.scheme.into_static(), + err: err.into_static(), + root: self.root.clone(), + namespace: self.name.clone(), + }; + self.metrics.errors_total.get_or_create(&labels).inc(); } fn increment_request_total(&self, scheme: Scheme, op: &'static str) { - let labels = [("scheme", scheme.into_static()), ("op", op)]; - self.requests_total.get_or_create(&labels).inc(); + let labels = OperationLabels { + op, + scheme: scheme.into_static(), + root: self.root.clone(), + namespace: self.name.clone(), + }; + self.metrics.requests_total.get_or_create(&labels).inc(); } fn observe_bytes_total(&self, scheme: Scheme, op: &'static str, bytes: usize) { - let labels = [("scheme", scheme.into_static()), ("op", op)]; - self.bytes_histogram + let labels = OperationLabels { + op, + scheme: scheme.into_static(), + root: self.root.clone(), + namespace: self.name.clone(), + }; + self.metrics + .bytes_histogram .get_or_create(&labels) .observe(bytes as f64); - self.bytes_total.get_or_create(&labels).inc_by(bytes as u64); + self.metrics + .bytes_total + .get_or_create(&labels) + .inc_by(bytes as u64); } fn observe_request_duration(&self, scheme: Scheme, op: &'static str, duration: Duration) { - let labels = [("scheme", scheme.into_static()), ("op", op)]; - self.request_duration_seconds + let labels = OperationLabels { + op, + scheme: scheme.into_static(), + root: self.root.clone(), + namespace: self.name.clone(), + }; + self.metrics + .request_duration_seconds .get_or_create(&labels) .observe(duration.as_secs_f64()); } @@ -193,7 +285,7 @@ impl PrometheusClientMetrics { #[derive(Clone)] pub struct PrometheusAccessor { inner: A, - metrics: Arc, + metrics: PrometheusClientMetrics, scheme: Scheme, } @@ -231,11 +323,8 @@ impl LayeredAccess for PrometheusAccessor { start_time.elapsed(), ); create_res.map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::CreateDir.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::CreateDir.into_static(), e.kind()); e }) } @@ -258,11 +347,8 @@ impl LayeredAccess for PrometheusAccessor { PrometheusMetricWrapper::new(r, self.metrics.clone(), self.scheme), )), Err(err) => { - self.metrics.increment_errors_total( - self.scheme, - Operation::Read.into_static(), - err.kind(), - ); + self.metrics + .increment_errors_total(Operation::Read.into_static(), err.kind()); Err(err) } } @@ -289,11 +375,8 @@ impl LayeredAccess for PrometheusAccessor { PrometheusMetricWrapper::new(w, self.metrics.clone(), self.scheme), )), Err(err) => { - self.metrics.increment_errors_total( - self.scheme, - Operation::Write.into_static(), - err.kind(), - ); + self.metrics + .increment_errors_total(Operation::Write.into_static(), err.kind()); Err(err) } } @@ -308,11 +391,8 @@ impl LayeredAccess for PrometheusAccessor { .inner .stat(path, args) .inspect_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::Stat.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::Stat.into_static(), e.kind()); }) .await; @@ -322,11 +402,8 @@ impl LayeredAccess for PrometheusAccessor { start_time.elapsed(), ); stat_res.map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::Stat.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::Stat.into_static(), e.kind()); e }) } @@ -344,11 +421,8 @@ impl LayeredAccess for PrometheusAccessor { start_time.elapsed(), ); delete_res.map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::Delete.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::Delete.into_static(), e.kind()); e }) } @@ -366,11 +440,8 @@ impl LayeredAccess for PrometheusAccessor { start_time.elapsed(), ); list_res.map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::List.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::List.into_static(), e.kind()); e }) } @@ -388,11 +459,8 @@ impl LayeredAccess for PrometheusAccessor { start_time.elapsed(), ); result.map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::Batch.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::Batch.into_static(), e.kind()); e }) } @@ -410,11 +478,8 @@ impl LayeredAccess for PrometheusAccessor { start_time.elapsed(), ); result.map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::Presign.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::Presign.into_static(), e.kind()); e }) } @@ -432,11 +497,8 @@ impl LayeredAccess for PrometheusAccessor { start_time.elapsed(), ); result.map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::BlockingCreateDir.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::BlockingCreateDir.into_static(), e.kind()); e }) } @@ -453,11 +515,8 @@ impl LayeredAccess for PrometheusAccessor { }); result.map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::BlockingRead.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::BlockingRead.into_static(), e.kind()); e }) } @@ -474,11 +533,8 @@ impl LayeredAccess for PrometheusAccessor { }); result.map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::BlockingWrite.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::BlockingWrite.into_static(), e.kind()); e }) } @@ -496,11 +552,8 @@ impl LayeredAccess for PrometheusAccessor { ); result.map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::BlockingStat.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::BlockingStat.into_static(), e.kind()); e }) } @@ -518,11 +571,8 @@ impl LayeredAccess for PrometheusAccessor { start_time.elapsed(), ); result.map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::BlockingDelete.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::BlockingDelete.into_static(), e.kind()); e }) } @@ -540,11 +590,8 @@ impl LayeredAccess for PrometheusAccessor { start_time.elapsed(), ); result.map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::BlockingList.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(Operation::BlockingList.into_static(), e.kind()); e }) } @@ -553,12 +600,12 @@ impl LayeredAccess for PrometheusAccessor { pub struct PrometheusMetricWrapper { inner: R, - metrics: Arc, + metrics: PrometheusClientMetrics, scheme: Scheme, } impl PrometheusMetricWrapper { - fn new(inner: R, metrics: Arc, scheme: Scheme) -> Self { + fn new(inner: R, metrics: PrometheusClientMetrics, scheme: Scheme) -> Self { Self { inner, metrics, @@ -586,11 +633,8 @@ impl oio::Read for PrometheusMetricWrapper { Ok(bs) } Err(e) => { - self.metrics.increment_errors_total( - self.scheme, - ReadOperation::Read.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(ReadOperation::Read.into_static(), e.kind()); Err(e) } } @@ -616,11 +660,8 @@ impl oio::BlockingRead for PrometheusMetricWrapper { bs }) .map_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - ReadOperation::BlockingRead.into_static(), - e.kind(), - ); + self.metrics + .increment_errors_total(ReadOperation::BlockingRead.into_static(), e.kind()); e }) } @@ -647,11 +688,8 @@ impl oio::Write for PrometheusMetricWrapper { ); }) .map_err(|err| { - self.metrics.increment_errors_total( - self.scheme, - WriteOperation::Write.into_static(), - err.kind(), - ); + self.metrics + .increment_errors_total(WriteOperation::Write.into_static(), err.kind()); err }) } @@ -670,11 +708,8 @@ impl oio::Write for PrometheusMetricWrapper { ); }) .map_err(|err| { - self.metrics.increment_errors_total( - self.scheme, - WriteOperation::Abort.into_static(), - err.kind(), - ); + self.metrics + .increment_errors_total(WriteOperation::Abort.into_static(), err.kind()); err }) } @@ -693,11 +728,8 @@ impl oio::Write for PrometheusMetricWrapper { ); }) .map_err(|err| { - self.metrics.increment_errors_total( - self.scheme, - WriteOperation::Close.into_static(), - err.kind(), - ); + self.metrics + .increment_errors_total(WriteOperation::Close.into_static(), err.kind()); err }) } @@ -724,7 +756,6 @@ impl oio::BlockingWrite for PrometheusMetricWrapper { }) .map_err(|err| { self.metrics.increment_errors_total( - self.scheme, WriteOperation::BlockingWrite.into_static(), err.kind(), ); @@ -746,7 +777,6 @@ impl oio::BlockingWrite for PrometheusMetricWrapper { }) .map_err(|err| { self.metrics.increment_errors_total( - self.scheme, WriteOperation::BlockingClose.into_static(), err.kind(), );