Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Add more Prometheus metrics (#5571)
Browse files Browse the repository at this point in the history
* Add more Prometheus metrics

* Update client/network/src/service.rs

Co-Authored-By: Max Inden <[email protected]>

* Update client/network/src/service.rs

Co-Authored-By: Max Inden <[email protected]>

Co-authored-by: Gavin Wood <[email protected]>
Co-authored-by: Max Inden <[email protected]>
  • Loading branch information
3 people authored Apr 8, 2020
1 parent 78cb165 commit 967852f
Showing 1 changed file with 127 additions and 25 deletions.
152 changes: 127 additions & 25 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ use crate::{
};
use futures::prelude::*;
use libp2p::{PeerId, Multiaddr};
use libp2p::core::{Executor, connection::PendingConnectionError};
use libp2p::core::{ConnectedPoint, Executor, connection::{ConnectionError, PendingConnectionError}};
use libp2p::kad::record;
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent};
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent, protocols_handler::NodeHandlerWrapperError};
use log::{error, info, trace, warn};
use parking_lot::Mutex;
use prometheus_endpoint::{
register, Counter, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64,
register, Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64,
};
use sc_peerset::PeersetHandle;
use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link};
Expand Down Expand Up @@ -821,13 +821,18 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {

struct Metrics {
// This list is ordered alphabetically
connections: Gauge<U64>,
connections: GaugeVec<U64>,
connections_closed_total: CounterVec<U64>,
import_queue_blocks_submitted: Counter<U64>,
import_queue_finality_proofs_submitted: Counter<U64>,
import_queue_justifications_submitted: Counter<U64>,
incoming_connections_errors_total: Counter<U64>,
incoming_connections_total: Counter<U64>,
is_major_syncing: Gauge<U64>,
issued_light_requests: Counter<U64>,
kbuckets_num_nodes: Gauge<U64>,
listeners_local_addresses: Gauge<U64>,
listeners_errors_total: Counter<U64>,
network_per_sec_bytes: GaugeVec<U64>,
notifications_queues_size: HistogramVec,
notifications_sizes: HistogramVec,
Expand All @@ -836,15 +841,28 @@ struct Metrics {
peers_count: Gauge<U64>,
peerset_num_discovered: Gauge<U64>,
peerset_num_requested: Gauge<U64>,
pending_connections: Gauge<U64>,
pending_connections_errors_total: CounterVec<U64>,
random_kademalia_queries_total: Counter<U64>,
}

impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
// This list is ordered alphabetically
connections: register(Gauge::new(
"sub_libp2p_connections", "Number of libp2p connections"
connections: register(GaugeVec::new(
Opts::new(
"sub_libp2p_connections",
"Number of active libp2p connections"
),
&["direction"]
)?, registry)?,
connections_closed_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_connections_closed_total",
"Total number of connections closed, by reason"
),
&["reason"]
)?, registry)?,
import_queue_blocks_submitted: register(Counter::new(
"import_queue_blocks_submitted",
Expand All @@ -858,6 +876,14 @@ impl Metrics {
"import_queue_justifications_submitted",
"Number of justifications submitted to the import queue.",
)?, registry)?,
incoming_connections_errors_total: register(Counter::new(
"sub_libp2p_incoming_connections_handshake_errors_total",
"Total number of incoming connections that have failed during the initial handshake"
)?, registry)?,
incoming_connections_total: register(Counter::new(
"sub_libp2p_incoming_connections_total",
"Total number of incoming connections on the listening sockets"
)?, registry)?,
is_major_syncing: register(Gauge::new(
"sub_libp2p_is_major_syncing", "Whether the node is performing a major sync or not.",
)?, registry)?,
Expand All @@ -868,6 +894,13 @@ impl Metrics {
kbuckets_num_nodes: register(Gauge::new(
"sub_libp2p_kbuckets_num_nodes", "Number of nodes in the Kademlia k-buckets"
)?, registry)?,
listeners_local_addresses: register(Gauge::new(
"sub_libp2p_listeners_local_addresses", "Number of local addresses we're listening on"
)?, registry)?,
listeners_errors_total: register(Counter::new(
"sub_libp2p_listeners_errors_total",
"Total number of non-fatal errors reported by a listener"
)?, registry)?,
network_per_sec_bytes: register(GaugeVec::new(
Opts::new(
"sub_libp2p_network_per_sec_bytes",
Expand Down Expand Up @@ -916,6 +949,17 @@ impl Metrics {
peerset_num_requested: register(Gauge::new(
"sub_libp2p_peerset_num_requested", "Number of nodes that the peerset manager wants us to be connected to",
)?, registry)?,
pending_connections: register(Gauge::new(
"sub_libp2p_pending_connections",
"Number of connections in the process of being established",
)?, registry)?,
pending_connections_errors_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_pending_connections_errors_total",
"Total number of node connection failures"
),
&["reason"]
)?, registry)?,
random_kademalia_queries_total: register(Counter::new(
"sub_libp2p_random_kademalia_queries_total", "Number of random Kademlia queries started",
)?, registry)?,
Expand Down Expand Up @@ -1047,22 +1091,50 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
metrics.update_with_network_event(&ev);
}
},
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, .. }) => {
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. }) => {
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
if let Some(metrics) = this.metrics.as_ref() {
metrics.connections.inc();
match endpoint {
ConnectedPoint::Dialer { .. } =>
metrics.connections.with_label_values(&["out"]).inc(),
ConnectedPoint::Listener { .. } =>
metrics.connections.with_label_values(&["in"]).inc(),
}
}
},
Poll::Ready(SwarmEvent::ConnectionClosed { peer_id, cause, endpoint, .. }) => {
trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause);
if let Some(metrics) = this.metrics.as_ref() {
match endpoint {
ConnectedPoint::Dialer { .. } =>
metrics.connections.with_label_values(&["out"]).dec(),
ConnectedPoint::Listener { .. } =>
metrics.connections.with_label_values(&["in"]).dec(),
}
match cause {
ConnectionError::IO(_) =>
metrics.connections_closed_total.with_label_values(&["transport-error"]).inc(),
ConnectionError::ConnectionLimit(_) =>
metrics.connections_closed_total.with_label_values(&["limit-reached"]).inc(),
ConnectionError::Handler(NodeHandlerWrapperError::Handler(_)) =>
metrics.connections_closed_total.with_label_values(&["protocol-error"]).inc(),
ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout) =>
metrics.connections_closed_total.with_label_values(&["keep-alive-timeout"]).inc(),
}
}
},
Poll::Ready(SwarmEvent::NewListenAddr(addr)) => {
trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", addr);
if let Some(metrics) = this.metrics.as_ref() {
metrics.listeners_local_addresses.inc();
}
},
Poll::Ready(SwarmEvent::ConnectionClosed { peer_id, .. }) => {
trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?})", peer_id);
Poll::Ready(SwarmEvent::ExpiredListenAddr(addr)) => {
trace!(target: "sub-libp2p", "Libp2p => ExpiredListenAddr({})", addr);
if let Some(metrics) = this.metrics.as_ref() {
metrics.connections.dec();
metrics.listeners_local_addresses.dec();
}
},
Poll::Ready(SwarmEvent::NewListenAddr(addr)) =>
trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", addr),
Poll::Ready(SwarmEvent::ExpiredListenAddr(addr)) =>
trace!(target: "sub-libp2p", "Libp2p => ExpiredListenAddr({})", addr),
Poll::Ready(SwarmEvent::UnreachableAddr { peer_id, address, error, .. }) => {
trace!(
target: "sub-libp2p", "Libp2p => Failed to reach {:?} through {:?}: {}",
Expand All @@ -1080,25 +1152,54 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
);
}
}

if let Some(metrics) = this.metrics.as_ref() {
match error {
PendingConnectionError::InvalidPeerId =>
metrics.pending_connections_errors_total.with_label_values(&["invalid-peer-id"]).inc(),
PendingConnectionError::Transport(_) | PendingConnectionError::IO(_) =>
metrics.pending_connections_errors_total.with_label_values(&["transport-error"]).inc(),
}
}
}
Poll::Ready(SwarmEvent::Dialing(peer_id)) =>
trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id),
Poll::Ready(SwarmEvent::IncomingConnection { local_addr, send_back_addr }) =>
Poll::Ready(SwarmEvent::IncomingConnection { local_addr, send_back_addr }) => {
trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))",
local_addr, send_back_addr),
Poll::Ready(SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error }) =>
local_addr, send_back_addr);
if let Some(metrics) = this.metrics.as_ref() {
metrics.incoming_connections_total.inc();
}
},
Poll::Ready(SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => {
trace!(target: "sub-libp2p", "Libp2p => IncomingConnectionError({},{}): {}",
local_addr, send_back_addr, error),
Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }) =>
local_addr, send_back_addr, error);
if let Some(metrics) = this.metrics.as_ref() {
metrics.incoming_connections_errors_total.inc();
}
},
Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }) => {
trace!(target: "sub-libp2p", "Libp2p => BannedPeer({}). Connected via {:?}.",
peer_id, endpoint),
peer_id, endpoint);
if let Some(metrics) = this.metrics.as_ref() {
metrics.incoming_connections_errors_total.inc();
}
},
Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { address, error }) =>
trace!(target: "sub-libp2p", "Libp2p => UnknownPeerUnreachableAddr({}): {}",
address, error),
Poll::Ready(SwarmEvent::ListenerClosed { reason, addresses: _ }) =>
warn!(target: "sub-libp2p", "Libp2p => ListenerClosed: {:?}", reason),
Poll::Ready(SwarmEvent::ListenerError { error }) =>
trace!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error),
Poll::Ready(SwarmEvent::ListenerClosed { reason, addresses }) => {
warn!(target: "sub-libp2p", "Libp2p => ListenerClosed: {:?}", reason);
if let Some(metrics) = this.metrics.as_ref() {
metrics.listeners_local_addresses.sub(addresses.len() as u64);
}
},
Poll::Ready(SwarmEvent::ListenerError { error }) => {
trace!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error);
if let Some(metrics) = this.metrics.as_ref() {
metrics.listeners_errors_total.inc();
}
},
};
}

Expand Down Expand Up @@ -1127,6 +1228,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
metrics.peers_count.set(num_connected_peers as u64);
metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64);
metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64);
metrics.pending_connections.set(Swarm::network_info(&this.network_service).num_connections_pending as u64);
}

Poll::Pending
Expand Down

0 comments on commit 967852f

Please sign in to comment.