Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metrics to services #1267

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion crates/metrics/src/p2p_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ pub struct P2PMetrics {
// For descriptions of each Counter, see the `new` function where each Counter/Histogram is initialized
pub peer_metrics: Registry,
pub unique_peers: Counter,
pub run_tracker: Counter,
}

impl P2PMetrics {
fn new() -> Self {
let peer_metrics = Registry::default();

let unique_peers = Counter::default();
let run_tracker = Counter::default();

let mut metrics = P2PMetrics {
gossip_sub_registry: OnceBox::new(),
peer_metrics,
unique_peers,
run_tracker,
};

metrics.peer_metrics.register(
Expand All @@ -30,6 +32,12 @@ impl P2PMetrics {
Box::new(metrics.unique_peers.clone()),
);

metrics.peer_metrics.register(
"p2p_run_method_duration",
"Measure time for p2p service",
Box::new(metrics.run_tracker.clone()),
);

metrics
}
}
Expand Down
65 changes: 64 additions & 1 deletion crates/metrics/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Instant;

use crate::{
graphql_metrics::GRAPHQL_METRICS,
p2p_metrics::P2P_METRICS,
Expand All @@ -11,7 +13,11 @@ use axum::{
},
};
use libp2p_prom_client::encoding::text::encode as libp2p_encode;
use prometheus_client::encoding::text::encode;
use prometheus_client::{
encoding::text::encode,
metrics::counter::Counter,
registry::Registry,
};

pub fn encode_metrics_response() -> impl IntoResponse {
// encode libp2p metrics using older prometheus
Expand All @@ -32,10 +38,22 @@ pub fn encode_metrics_response() -> impl IntoResponse {
return error_body()
}

if encode(&mut encoded, &POA_METRICS.registry).is_err() {
return error_body()
}

if encode(&mut encoded, &GRAPHQL_METRICS.registry).is_err() {
return error_body()
}

if encode(&mut encoded, &RELAYER_METRICS.registry).is_err() {
return error_body()
}

if encode(&mut encoded, &IMPORT_METRICS.registry).is_err() {
return error_body()
}

Response::builder()
.status(200)
.body(Body::from(encoded))
Expand All @@ -48,3 +66,48 @@ fn error_body() -> Response<Body> {
.body(Body::from(""))
.unwrap()
}

pub struct ServiceMetrics {
pub registry: Registry,
pub run_tracker: Counter,
}

impl ServiceMetrics {
pub fn new(name: &str) -> Self {
let registry = Registry::default();

let run_tracker = Counter::default();

let mut metrics = ServiceMetrics {
registry,
run_tracker,
};

metrics.registry.register(
name,
"Measure time for service's run() method",
metrics.run_tracker.clone(),
);

metrics
}

pub fn instant() -> Instant {
Instant::now()
}

pub fn observe(&self, start: Instant) {
self.run_tracker.inc_by(start.elapsed().as_secs());
}
}

lazy_static::lazy_static! {
pub static ref POA_METRICS: ServiceMetrics =
ServiceMetrics::new("sync_run_method_duration");

pub static ref RELAYER_METRICS: ServiceMetrics =
ServiceMetrics::new("relayer_run_method_duration");

pub static ref IMPORT_METRICS: ServiceMetrics =
ServiceMetrics::new("import_run_method_duration");
}
15 changes: 14 additions & 1 deletion crates/metrics/src/txpool_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use lazy_static::lazy_static;
use prometheus_client::{
metrics::histogram::Histogram,
metrics::{
counter::Counter,
histogram::Histogram,
},
registry::Registry,
};
use std::default::Default;
Expand All @@ -10,6 +13,7 @@ pub struct TxPoolMetrics {
pub registry: Registry,
pub gas_price_histogram: Histogram,
pub tx_size_histogram: Histogram,
pub run_tracker: Counter,
}

impl Default for TxPoolMetrics {
Expand All @@ -24,10 +28,13 @@ impl Default for TxPoolMetrics {

let tx_size_histogram = Histogram::new(tx_sizes.into_iter());

let run_tracker = Counter::default();

let mut metrics = TxPoolMetrics {
registry,
gas_price_histogram,
tx_size_histogram,
run_tracker,
};

metrics.registry.register(
Expand All @@ -42,6 +49,12 @@ impl Default for TxPoolMetrics {
metrics.tx_size_histogram.clone(),
);

metrics.registry.register(
"txpool_run_method_duration",
"Measure time for txpool service",
metrics.run_tracker.clone(),
);

metrics
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/services/consensus_module/poa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ version = { workspace = true }
anyhow = { workspace = true }
async-trait = { workspace = true }
fuel-core-chain-config = { workspace = true }
fuel-core-metrics = { workspace = true }
fuel-core-services = { workspace = true }
fuel-core-storage = { workspace = true }
fuel-core-types = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions crates/services/consensus_module/poa/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ use anyhow::{
anyhow,
Context,
};
use fuel_core_metrics::service::{
ServiceMetrics,
POA_METRICS,
};
use fuel_core_services::{
stream::BoxStream,
RunnableService,
Expand Down Expand Up @@ -58,6 +62,7 @@ use fuel_core_types::{
},
tai64::Tai64,
};

use std::{
ops::Deref,
time::Duration,
Expand All @@ -73,6 +78,7 @@ use tokio_stream::StreamExt;
use tracing::error;

pub type Service<T, B, I> = ServiceRunner<MainTask<T, B, I>>;

#[derive(Clone)]
pub struct SharedState {
request_sender: mpsc::Sender<Request>,
Expand Down Expand Up @@ -423,6 +429,8 @@ where
I: BlockImporter<Database = D>,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let start = ServiceMetrics::instant();

let should_continue;
// make sure we're synced first
while *self.sync_task_handle.shared.borrow() == SyncState::NotSynced {
Expand Down Expand Up @@ -481,6 +489,8 @@ where
should_continue = true;
}
}

POA_METRICS.observe(start);
Ok(should_continue)
}

Expand Down
5 changes: 5 additions & 0 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
},
};
use anyhow::anyhow;
use fuel_core_metrics::p2p_metrics::P2P_METRICS;
use fuel_core_services::{
stream::BoxStream,
RunnableService,
Expand Down Expand Up @@ -61,6 +62,7 @@ use libp2p::{
use std::{
fmt::Debug,
sync::Arc,
time::Instant,
};
use tokio::sync::{
broadcast,
Expand Down Expand Up @@ -180,6 +182,8 @@ where
D: P2pDb + 'static,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let start = Instant::now();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that possible to move it to ServiceRunner and have it for each service?
image


let should_continue;
tokio::select! {
biased;
Expand Down Expand Up @@ -309,6 +313,7 @@ where
}
}

P2P_METRICS.run_tracker.inc_by(start.elapsed().as_secs());
Ok(should_continue)
}

Expand Down
1 change: 1 addition & 0 deletions crates/services/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ethers-providers = { version = "1.0.2", default-features = false, features = [
"ws",
"rustls",
] }
fuel-core-metrics = { workspace = true }
fuel-core-services = { workspace = true }
fuel-core-storage = { workspace = true }
fuel-core-types = { workspace = true }
Expand Down
8 changes: 8 additions & 0 deletions crates/services/relayer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use ethers_providers::{
Provider,
ProviderError,
};
use fuel_core_metrics::service::{
ServiceMetrics,
RELAYER_METRICS,
};
use fuel_core_services::{
RunnableService,
RunnableTask,
Expand All @@ -38,6 +42,7 @@ use fuel_core_types::{
fuel_types::Nonce,
};
use futures::StreamExt;

use std::{
borrow::Cow,
convert::TryInto,
Expand Down Expand Up @@ -221,6 +226,8 @@ where
D: RelayerDb + 'static,
{
async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result<bool> {
let start = ServiceMetrics::instant();

let now = tokio::time::Instant::now();
let should_continue = true;

Expand All @@ -236,6 +243,7 @@ where
.await;
}

RELAYER_METRICS.observe(start);
result.map(|_| should_continue)
}

Expand Down
2 changes: 2 additions & 0 deletions crates/services/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ version = { workspace = true }
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
fuel-core-metrics = { workspace = true }
fuel-core-services = { workspace = true }
fuel-core-types = { workspace = true }
futures = { workspace = true }
lazy_static = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }

Expand Down
8 changes: 8 additions & 0 deletions crates/services/sync/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use std::{
sync::Arc,
};

use fuel_core_metrics::service::{
ServiceMetrics,
IMPORT_METRICS,
};
use fuel_core_services::{
SharedMutex,
StateWatcher,
Expand All @@ -29,6 +33,7 @@ use futures::{
},
Stream,
};

use std::future::Future;
use tokio::sync::Notify;
use tracing::Instrument;
Expand Down Expand Up @@ -118,8 +123,11 @@ where
&self,
shutdown: &mut StateWatcher,
) -> anyhow::Result<bool> {
let start = ServiceMetrics::instant();

self.import_inner(shutdown).await?;

IMPORT_METRICS.observe(start);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each service may have the instance inside of ServiceRunner and you don't need to use lazy=)

Ok(wait_for_notify_or_shutdown(&self.notify, shutdown).await)
}

Expand Down
Loading