diff --git a/Cargo.lock b/Cargo.lock index 86c82338b0f..80739158e97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2900,6 +2900,7 @@ dependencies = [ "anyhow", "async-trait", "fuel-core-chain-config", + "fuel-core-metrics", "fuel-core-services", "fuel-core-storage", "fuel-core-types", @@ -2938,6 +2939,7 @@ dependencies = [ "ethers-contract", "ethers-core", "ethers-providers", + "fuel-core-metrics", "fuel-core-relayer", "fuel-core-services", "fuel-core-storage", @@ -2986,10 +2988,12 @@ version = "0.20.1" dependencies = [ "anyhow", "async-trait", + "fuel-core-metrics", "fuel-core-services", "fuel-core-trace", "fuel-core-types", "futures", + "lazy_static", "mockall", "test-case", "tokio", diff --git a/crates/metrics/src/p2p_metrics.rs b/crates/metrics/src/p2p_metrics.rs index b93375073ff..3c8dd754e65 100644 --- a/crates/metrics/src/p2p_metrics.rs +++ b/crates/metrics/src/p2p_metrics.rs @@ -10,18 +10,20 @@ pub struct P2PMetrics { // For descriptions of each Counter, see the `new` function where each Counter/Histogram is initialized pub peer_metrics: Registry, pub unique_peers: Counter, + pub run_tracker: Counter, } impl P2PMetrics { fn new() -> Self { let peer_metrics = Registry::default(); - let unique_peers = Counter::default(); + let run_tracker = Counter::default(); let mut metrics = P2PMetrics { gossip_sub_registry: OnceBox::new(), peer_metrics, unique_peers, + run_tracker, }; metrics.peer_metrics.register( @@ -30,6 +32,12 @@ impl P2PMetrics { Box::new(metrics.unique_peers.clone()), ); + metrics.peer_metrics.register( + "p2p_run_method_duration", + "Measure time for p2p service", + Box::new(metrics.run_tracker.clone()), + ); + metrics } } diff --git a/crates/metrics/src/service.rs b/crates/metrics/src/service.rs index 57fa456818b..e1c3e219d72 100644 --- a/crates/metrics/src/service.rs +++ b/crates/metrics/src/service.rs @@ -1,3 +1,5 @@ +use std::time::Instant; + use crate::{ graphql_metrics::GRAPHQL_METRICS, p2p_metrics::P2P_METRICS, @@ -11,7 +13,11 @@ use axum::{ }, }; use libp2p_prom_client::encoding::text::encode as libp2p_encode; -use prometheus_client::encoding::text::encode; +use prometheus_client::{ + encoding::text::encode, + metrics::counter::Counter, + registry::Registry, +}; pub fn encode_metrics_response() -> impl IntoResponse { // encode libp2p metrics using older prometheus @@ -32,10 +38,22 @@ pub fn encode_metrics_response() -> impl IntoResponse { return error_body() } + if encode(&mut encoded, &POA_METRICS.registry).is_err() { + return error_body() + } + if encode(&mut encoded, &GRAPHQL_METRICS.registry).is_err() { return error_body() } + if encode(&mut encoded, &RELAYER_METRICS.registry).is_err() { + return error_body() + } + + if encode(&mut encoded, &IMPORT_METRICS.registry).is_err() { + return error_body() + } + Response::builder() .status(200) .body(Body::from(encoded)) @@ -48,3 +66,48 @@ fn error_body() -> Response { .body(Body::from("")) .unwrap() } + +pub struct ServiceMetrics { + pub registry: Registry, + pub run_tracker: Counter, +} + +impl ServiceMetrics { + pub fn new(name: &str) -> Self { + let registry = Registry::default(); + + let run_tracker = Counter::default(); + + let mut metrics = ServiceMetrics { + registry, + run_tracker, + }; + + metrics.registry.register( + name, + "Measure time for service's run() method", + metrics.run_tracker.clone(), + ); + + metrics + } + + pub fn instant() -> Instant { + Instant::now() + } + + pub fn observe(&self, start: Instant) { + self.run_tracker.inc_by(start.elapsed().as_secs()); + } +} + +lazy_static::lazy_static! { + pub static ref POA_METRICS: ServiceMetrics = + ServiceMetrics::new("sync_run_method_duration"); + + pub static ref RELAYER_METRICS: ServiceMetrics = + ServiceMetrics::new("relayer_run_method_duration"); + + pub static ref IMPORT_METRICS: ServiceMetrics = + ServiceMetrics::new("import_run_method_duration"); +} diff --git a/crates/metrics/src/txpool_metrics.rs b/crates/metrics/src/txpool_metrics.rs index b06ebd2dd38..c279562752c 100644 --- a/crates/metrics/src/txpool_metrics.rs +++ b/crates/metrics/src/txpool_metrics.rs @@ -1,6 +1,9 @@ use lazy_static::lazy_static; use prometheus_client::{ - metrics::histogram::Histogram, + metrics::{ + counter::Counter, + histogram::Histogram, + }, registry::Registry, }; use std::default::Default; @@ -10,6 +13,7 @@ pub struct TxPoolMetrics { pub registry: Registry, pub gas_price_histogram: Histogram, pub tx_size_histogram: Histogram, + pub run_tracker: Counter, } impl Default for TxPoolMetrics { @@ -24,10 +28,13 @@ impl Default for TxPoolMetrics { let tx_size_histogram = Histogram::new(tx_sizes.into_iter()); + let run_tracker = Counter::default(); + let mut metrics = TxPoolMetrics { registry, gas_price_histogram, tx_size_histogram, + run_tracker, }; metrics.registry.register( @@ -42,6 +49,12 @@ impl Default for TxPoolMetrics { metrics.tx_size_histogram.clone(), ); + metrics.registry.register( + "txpool_run_method_duration", + "Measure time for txpool service", + metrics.run_tracker.clone(), + ); + metrics } } diff --git a/crates/services/consensus_module/poa/Cargo.toml b/crates/services/consensus_module/poa/Cargo.toml index 76467e4a0de..03ccb278baa 100644 --- a/crates/services/consensus_module/poa/Cargo.toml +++ b/crates/services/consensus_module/poa/Cargo.toml @@ -13,6 +13,7 @@ version = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } fuel-core-chain-config = { workspace = true } +fuel-core-metrics = { workspace = true } fuel-core-services = { workspace = true } fuel-core-storage = { workspace = true } fuel-core-types = { workspace = true } diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index fc42fc8d49e..a2cea998237 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -20,6 +20,10 @@ use anyhow::{ anyhow, Context, }; +use fuel_core_metrics::service::{ + ServiceMetrics, + POA_METRICS, +}; use fuel_core_services::{ stream::BoxStream, RunnableService, @@ -58,6 +62,7 @@ use fuel_core_types::{ }, tai64::Tai64, }; + use std::{ ops::Deref, time::Duration, @@ -73,6 +78,7 @@ use tokio_stream::StreamExt; use tracing::error; pub type Service = ServiceRunner>; + #[derive(Clone)] pub struct SharedState { request_sender: mpsc::Sender, @@ -423,6 +429,8 @@ where I: BlockImporter, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { + let start = ServiceMetrics::instant(); + let should_continue; // make sure we're synced first while *self.sync_task_handle.shared.borrow() == SyncState::NotSynced { @@ -481,6 +489,8 @@ where should_continue = true; } } + + POA_METRICS.observe(start); Ok(should_continue) } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 8349917d6ed..52e35e5b382 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -23,6 +23,7 @@ use crate::{ }, }; use anyhow::anyhow; +use fuel_core_metrics::p2p_metrics::P2P_METRICS; use fuel_core_services::{ stream::BoxStream, RunnableService, @@ -61,6 +62,7 @@ use libp2p::{ use std::{ fmt::Debug, sync::Arc, + time::Instant, }; use tokio::sync::{ broadcast, @@ -180,6 +182,8 @@ where D: P2pDb + 'static, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { + let start = Instant::now(); + let should_continue; tokio::select! { biased; @@ -309,6 +313,7 @@ where } } + P2P_METRICS.run_tracker.inc_by(start.elapsed().as_secs()); Ok(should_continue) } diff --git a/crates/services/relayer/Cargo.toml b/crates/services/relayer/Cargo.toml index a30506eb57f..3277e406168 100644 --- a/crates/services/relayer/Cargo.toml +++ b/crates/services/relayer/Cargo.toml @@ -21,6 +21,7 @@ ethers-providers = { version = "1.0.2", default-features = false, features = [ "ws", "rustls", ] } +fuel-core-metrics = { workspace = true } fuel-core-services = { workspace = true } fuel-core-storage = { workspace = true } fuel-core-types = { workspace = true } diff --git a/crates/services/relayer/src/service.rs b/crates/services/relayer/src/service.rs index 9eac59ff231..2194905b484 100644 --- a/crates/services/relayer/src/service.rs +++ b/crates/services/relayer/src/service.rs @@ -21,6 +21,10 @@ use ethers_providers::{ Provider, ProviderError, }; +use fuel_core_metrics::service::{ + ServiceMetrics, + RELAYER_METRICS, +}; use fuel_core_services::{ RunnableService, RunnableTask, @@ -38,6 +42,7 @@ use fuel_core_types::{ fuel_types::Nonce, }; use futures::StreamExt; + use std::{ borrow::Cow, convert::TryInto, @@ -221,6 +226,8 @@ where D: RelayerDb + 'static, { async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result { + let start = ServiceMetrics::instant(); + let now = tokio::time::Instant::now(); let should_continue = true; @@ -236,6 +243,7 @@ where .await; } + RELAYER_METRICS.observe(start); result.map(|_| should_continue) } diff --git a/crates/services/sync/Cargo.toml b/crates/services/sync/Cargo.toml index a6cd607fb50..35b895bc2aa 100644 --- a/crates/services/sync/Cargo.toml +++ b/crates/services/sync/Cargo.toml @@ -12,9 +12,11 @@ version = { workspace = true } [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +fuel-core-metrics = { workspace = true } fuel-core-services = { workspace = true } fuel-core-types = { workspace = true } futures = { workspace = true } +lazy_static = { workspace = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } diff --git a/crates/services/sync/src/import.rs b/crates/services/sync/src/import.rs index 0f7b2d4001e..567ea649a75 100644 --- a/crates/services/sync/src/import.rs +++ b/crates/services/sync/src/import.rs @@ -7,6 +7,10 @@ use std::{ sync::Arc, }; +use fuel_core_metrics::service::{ + ServiceMetrics, + IMPORT_METRICS, +}; use fuel_core_services::{ SharedMutex, StateWatcher, @@ -29,6 +33,7 @@ use futures::{ }, Stream, }; + use std::future::Future; use tokio::sync::Notify; use tracing::Instrument; @@ -118,8 +123,11 @@ where &self, shutdown: &mut StateWatcher, ) -> anyhow::Result { + let start = ServiceMetrics::instant(); + self.import_inner(shutdown).await?; + IMPORT_METRICS.observe(start); Ok(wait_for_notify_or_shutdown(&self.notify, shutdown).await) } diff --git a/crates/services/sync/src/sync.rs b/crates/services/sync/src/sync.rs index c7f39918c59..ee9ccab7c20 100644 --- a/crates/services/sync/src/sync.rs +++ b/crates/services/sync/src/sync.rs @@ -3,6 +3,7 @@ use std::sync::Arc; +use fuel_core_metrics::service::ServiceMetrics; use fuel_core_services::{ stream::{ BoxStream, @@ -12,6 +13,7 @@ use fuel_core_services::{ }; use fuel_core_types::fuel_types::BlockHeight; use futures::stream::StreamExt; +use lazy_static::lazy_static; use tokio::sync::Notify; use crate::state::State; @@ -19,6 +21,11 @@ use crate::state::State; #[cfg(test)] mod tests; +lazy_static! { + static ref SYNC_METRICS: ServiceMetrics = + ServiceMetrics::new("sync_run_method_duration"); +} + pub(crate) enum IncomingHeight { Observed(BlockHeight), Committed(BlockHeight), @@ -53,6 +60,8 @@ impl SyncHeights { /// Sync the state from the height stream. /// This stream never blocks or errors. pub(crate) async fn sync(&mut self) -> Option<()> { + let start = ServiceMetrics::instant(); + let height = self.height_stream.next().await?; let state_change = match height { IncomingHeight::Committed(height) => { @@ -65,6 +74,9 @@ impl SyncHeights { if state_change { self.notify.notify_one(); } + + SYNC_METRICS.observe(start); + Some(()) } diff --git a/crates/services/txpool/src/service.rs b/crates/services/txpool/src/service.rs index d2bc5a5376d..0dc981ab46c 100644 --- a/crates/services/txpool/src/service.rs +++ b/crates/services/txpool/src/service.rs @@ -10,6 +10,7 @@ use crate::{ TxInfo, TxPool, }; +use fuel_core_metrics::txpool_metrics::TXPOOL_METRICS; use fuel_core_services::{ stream::BoxStream, RunnableService, @@ -169,6 +170,8 @@ where async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { let should_continue; + let start = std::time::Instant::now(); + tokio::select! { biased; @@ -236,6 +239,9 @@ where } } } + + TXPOOL_METRICS.run_tracker.inc_by(start.elapsed().as_secs()); + Ok(should_continue) }