From 5be3c3305a411594f7812eeaa49de982e4db34b2 Mon Sep 17 00:00:00 2001 From: shamil-gadelshin Date: Thu, 14 Dec 2023 19:38:55 +0700 Subject: [PATCH] Add infrastructure for Subspace networking metrics. (#2284) * networking: Add subspace metrics infrastructure. * networking: Refactor metrics infrastructure. * Refactor networking metrics. --- .../src/bin/subspace-farmer/commands/farm.rs | 9 +++-- .../bin/subspace-farmer/commands/farm/dsn.rs | 13 +++---- .../subspace-networking/examples/benchmark.rs | 8 ++++- .../subspace-networking/examples/metrics.rs | 2 +- .../examples/random-walker.rs | 8 ++++- .../src/bin/subspace-bootstrap-node/main.rs | 34 ++++++++++-------- crates/subspace-networking/src/constructor.rs | 25 ++++++++++--- crates/subspace-networking/src/node_runner.rs | 23 +++++++++--- crates/subspace-networking/src/utils.rs | 35 +++++++++++++++++++ crates/subspace-service/src/dsn.rs | 15 +++++--- 10 files changed, 128 insertions(+), 44 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 08dab6b1c8..ce23388eaf 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -11,6 +11,7 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use lru::LruCache; use parking_lot::Mutex; +use prometheus_client::registry::Registry; use std::fs; use std::net::SocketAddr; use std::num::{NonZeroU8, NonZeroUsize}; @@ -348,9 +349,11 @@ where let (piece_cache, piece_cache_worker) = PieceCache::new(node_client.clone(), peer_id); + // Metrics + let mut prometheus_metrics_registry = Registry::default(); let metrics_endpoints_are_specified = !metrics_endpoints.is_empty(); - let (node, mut node_runner, metrics_registry) = { + let (node, mut node_runner) = { if dsn.bootstrap_nodes.is_empty() { dsn.bootstrap_nodes = farmer_app_info.dsn_bootstrap_nodes.clone(); } @@ -363,14 +366,14 @@ where Arc::downgrade(&readers_and_pieces), node_client.clone(), piece_cache.clone(), - metrics_endpoints_are_specified, + metrics_endpoints_are_specified.then_some(&mut prometheus_metrics_registry), )? }; let _prometheus_worker = if metrics_endpoints_are_specified { let prometheus_task = start_prometheus_metrics_server( metrics_endpoints, - RegistryAdapter::Libp2p(metrics_registry), + RegistryAdapter::Libp2p(prometheus_metrics_registry), )?; let join_handle = tokio::spawn(prometheus_task); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs index a5042b2cf1..99b0fe6770 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs @@ -10,7 +10,6 @@ use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; use subspace_farmer::{NodeClient, NodeRpcClient, KNOWN_PEERS_CACHE_SIZE}; use subspace_networking::libp2p::identity::Keypair; use subspace_networking::libp2p::kad::RecordKey; -use subspace_networking::libp2p::metrics::Metrics; use subspace_networking::libp2p::multiaddr::Protocol; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::utils::strip_peer_id; @@ -50,8 +49,8 @@ pub(super) fn configure_dsn( weak_readers_and_pieces: Weak>>, node_client: NodeRpcClient, piece_cache: PieceCache, - initialize_metrics: bool, -) -> Result<(Node, NodeRunner, Registry), anyhow::Error> { + prometheus_metrics_registry: Option<&mut Registry>, +) -> Result<(Node, NodeRunner), anyhow::Error> { let networking_parameters_registry = KnownPeersManager::new(KnownPeersManagerConfig { path: Some(base_path.join("known_addresses.bin").into_boxed_path()), ignore_peer_list: strip_peer_id(bootstrap_nodes.clone()) @@ -63,15 +62,12 @@ pub(super) fn configure_dsn( }) .map(Box::new)?; - // Metrics - let mut metrics_registry = Registry::default(); - let metrics = initialize_metrics.then(|| Metrics::new(&mut metrics_registry)); - let default_config = Config::new( protocol_prefix, keypair, piece_cache.clone(), Some(PeerInfoProvider::new_farmer()), + prometheus_metrics_registry, ); let config = Config { reserved_peers, @@ -203,7 +199,6 @@ pub(super) fn configure_dsn( bootstrap_addresses: bootstrap_nodes, kademlia_mode: KademliaMode::Dynamic, external_addresses, - metrics, disable_bootstrap_on_start, ..default_config }; @@ -223,7 +218,7 @@ pub(super) fn configure_dsn( .detach(); // Consider returning HandlerId instead of each `detach()` calls for other usages. - (node, node_runner, metrics_registry) + (node, node_runner) }) .map_err(Into::into) } diff --git a/crates/subspace-networking/examples/benchmark.rs b/crates/subspace-networking/examples/benchmark.rs index 0b10585344..300888742b 100644 --- a/crates/subspace-networking/examples/benchmark.rs +++ b/crates/subspace-networking/examples/benchmark.rs @@ -281,7 +281,13 @@ pub async fn configure_dsn( ) -> Node { let keypair = Keypair::generate_ed25519(); - let default_config = Config::new(protocol_prefix, keypair, (), Some(PeerInfoProvider::Client)); + let default_config = Config::new( + protocol_prefix, + keypair, + (), + Some(PeerInfoProvider::Client), + None, + ); let config = Config { listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], diff --git a/crates/subspace-networking/examples/metrics.rs b/crates/subspace-networking/examples/metrics.rs index 406e81649c..e3aba08605 100644 --- a/crates/subspace-networking/examples/metrics.rs +++ b/crates/subspace-networking/examples/metrics.rs @@ -22,7 +22,7 @@ async fn main() { let config_1 = Config { listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, - metrics: Some(metrics), + libp2p_metrics: Some(metrics), ..Config::default() }; let (node_1, mut node_runner_1) = subspace_networking::construct(config_1).unwrap(); diff --git a/crates/subspace-networking/examples/random-walker.rs b/crates/subspace-networking/examples/random-walker.rs index b56c45cbac..0ce0bbad8b 100644 --- a/crates/subspace-networking/examples/random-walker.rs +++ b/crates/subspace-networking/examples/random-walker.rs @@ -360,7 +360,13 @@ async fn configure_dsn( ) -> Node { let keypair = Keypair::generate_ed25519(); - let default_config = Config::new(protocol_prefix, keypair, (), Some(PeerInfoProvider::Client)); + let default_config = Config::new( + protocol_prefix, + keypair, + (), + Some(PeerInfoProvider::Client), + None, + ); let config = Config { listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], diff --git a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs index 649db9ca41..6eb874e209 100644 --- a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs +++ b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs @@ -6,7 +6,6 @@ use clap::Parser; use futures::{select, FutureExt}; use libp2p::identity::ed25519::Keypair; use libp2p::kad::Mode; -use libp2p::metrics::Metrics; use libp2p::{identity, Multiaddr, PeerId}; use prometheus_client::registry::Registry; use serde::{Deserialize, Serialize}; @@ -153,19 +152,10 @@ async fn main() -> Result<(), Box> { let keypair = identity::Keypair::from(decoded_keypair); // Metrics - let mut metric_registry = Registry::default(); let metrics_endpoints_are_specified = !metrics_endpoints.is_empty(); - let metrics = - metrics_endpoints_are_specified.then(|| Metrics::new(&mut metric_registry)); - - let prometheus_task = metrics_endpoints_are_specified - .then(|| { - start_prometheus_metrics_server( - metrics_endpoints, - RegistryAdapter::Libp2p(metric_registry), - ) - }) - .transpose()?; + let mut metrics_registry = Registry::default(); + let dsn_metrics_registry = + metrics_endpoints_are_specified.then_some(&mut metrics_registry); let known_peers_registry_config = KnownPeersManagerConfig { enable_known_peers_source: false, @@ -199,10 +189,15 @@ async fn main() -> Result<(), Box> { bootstrap_addresses: bootstrap_nodes, kademlia_mode: KademliaMode::Static(Mode::Server), external_addresses, - metrics, networking_parameters_registry: known_peers_registry.boxed(), - ..Config::new(protocol_version.to_string(), keypair, (), None) + ..Config::new( + protocol_version.to_string(), + keypair, + (), + None, + dsn_metrics_registry, + ) }; let (node, mut node_runner) = subspace_networking::construct(config).expect("Networking stack creation failed."); @@ -220,6 +215,15 @@ async fn main() -> Result<(), Box> { .detach(); info!("Subspace Bootstrap Node started"); + + let prometheus_task = metrics_endpoints_are_specified + .then(|| { + start_prometheus_metrics_server( + metrics_endpoints, + RegistryAdapter::Libp2p(metrics_registry), + ) + }) + .transpose()?; if let Some(prometheus_task) = prometheus_task { select! { _ = node_runner.run().fuse() => {}, diff --git a/crates/subspace-networking/src/constructor.rs b/crates/subspace-networking/src/constructor.rs index a2737302a2..dc951b89ae 100644 --- a/crates/subspace-networking/src/constructor.rs +++ b/crates/subspace-networking/src/constructor.rs @@ -14,7 +14,7 @@ use crate::protocols::request_response::request_response_factory::RequestHandler use crate::protocols::reserved_peers::Config as ReservedPeersConfig; use crate::shared::Shared; use crate::utils::rate_limiter::RateLimiter; -use crate::utils::strip_peer_id; +use crate::utils::{strip_peer_id, SubspaceMetrics}; use crate::{PeerInfo, PeerInfoConfig}; use backoff::{ExponentialBackoff, SystemClock}; use futures::channel::mpsc; @@ -35,6 +35,7 @@ use libp2p::multiaddr::Protocol; use libp2p::yamux::Config as YamuxConfig; use libp2p::{identity, Multiaddr, PeerId, StreamProtocol, SwarmBuilder, TransportError}; use parking_lot::Mutex; +use prometheus_client::registry::Registry; use std::borrow::Cow; use std::iter::Empty; use std::num::NonZeroUsize; @@ -238,8 +239,10 @@ pub struct Config { pub temporary_bans_cache_size: NonZeroUsize, /// Backoff policy for temporary banning of unreachable peers. pub temporary_ban_backoff: ExponentialBackoff, - /// Optional external prometheus metrics. None will disable metrics gathering. - pub metrics: Option, + /// Optional libp2p prometheus metrics. None will disable metrics gathering. + pub libp2p_metrics: Option, + /// Internal prometheus metrics. None will disable metrics gathering. + pub metrics: Option, /// Defines protocol version for the network peers. Affects network partition. pub protocol_version: String, /// Specifies a source for peer information. None disables the protocol. @@ -292,6 +295,7 @@ impl Default for Config<()> { keypair, (), Some(PeerInfoProvider::new_client()), + None, ) } } @@ -306,7 +310,17 @@ where keypair: identity::Keypair, local_records_provider: LocalRecordProvider, peer_info_provider: Option, + prometheus_registry: Option<&mut Registry>, ) -> Self { + let (libp2p_metrics, metrics) = prometheus_registry + .map(|registry| { + ( + Some(Metrics::new(registry)), + Some(SubspaceMetrics::new(registry)), + ) + }) + .unwrap_or((None, None)); + let mut kademlia = KademliaConfig::default(); kademlia .set_query_timeout(KADEMLIA_QUERY_TIMEOUT) @@ -379,7 +393,8 @@ where max_pending_outgoing_connections: SWARM_MAX_PENDING_OUTGOING_CONNECTIONS, temporary_bans_cache_size: TEMPORARY_BANS_CACHE_SIZE, temporary_ban_backoff, - metrics: None, + libp2p_metrics, + metrics, protocol_version, peer_info_provider, // Don't need to keep additional connections by default @@ -450,6 +465,7 @@ where max_pending_outgoing_connections, temporary_bans_cache_size, temporary_ban_backoff, + libp2p_metrics, metrics, protocol_version, peer_info_provider, @@ -639,6 +655,7 @@ where networking_parameters_registry, reserved_peers: strip_peer_id(reserved_peers).into_iter().collect(), temporary_bans, + libp2p_metrics, metrics, protocol_version, general_connection_decision_handler, diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 1c7db74551..0c7dbd67f7 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -12,7 +12,7 @@ use crate::protocols::request_response::request_response_factory::{ }; use crate::shared::{Command, CreatedSubscription, NewPeerInfo, PeerDiscovered, Shared}; use crate::utils::rate_limiter::RateLimiterPermit; -use crate::utils::{is_global_address_or_dns, strip_peer_id, PeerAddress}; +use crate::utils::{is_global_address_or_dns, strip_peer_id, PeerAddress, SubspaceMetrics}; use async_mutex::Mutex as AsyncMutex; use bytes::Bytes; use event_listener_primitives::HandlerId; @@ -123,8 +123,10 @@ where reserved_peers: HashMap, /// Temporarily banned peers. temporary_bans: Arc>, - /// Prometheus metrics. - metrics: Option, + /// Libp2p Prometheus metrics. + libp2p_metrics: Option, + /// Subspace Prometheus metrics. + metrics: Option, /// Mapping from specific peer to ip addresses peer_ip_addresses: HashMap>, /// Defines protocol version for the network peers. Affects network partition. @@ -178,7 +180,8 @@ where pub(crate) networking_parameters_registry: Box, pub(crate) reserved_peers: HashMap, pub(crate) temporary_bans: Arc>, - pub(crate) metrics: Option, + pub(crate) libp2p_metrics: Option, + pub(crate) metrics: Option, pub(crate) protocol_version: String, pub(crate) general_connection_decision_handler: Option, pub(crate) special_connection_decision_handler: Option, @@ -201,6 +204,7 @@ where mut networking_parameters_registry, reserved_peers, temporary_bans, + libp2p_metrics, metrics, protocol_version, general_connection_decision_handler, @@ -239,6 +243,7 @@ where networking_parameters_registry, reserved_peers, temporary_bans, + libp2p_metrics, metrics, peer_ip_addresses: HashMap::new(), protocol_version, @@ -548,6 +553,10 @@ where if num_established.get() == 1 { shared.handlers.connected_peer.call_simple(&peer_id); } + + if let Some(metrics) = self.metrics.as_mut() { + metrics.inc_established_connections() + } } SwarmEvent::ConnectionClosed { peer_id, @@ -592,6 +601,10 @@ where if num_established == 0 { shared.handlers.disconnected_peer.call_simple(&peer_id); } + + if let Some(metrics) = self.metrics.as_mut() { + metrics.dec_established_connections() + }; } SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { if let Some(peer_id) = &peer_id { @@ -1556,7 +1569,7 @@ where } fn register_event_metrics(&mut self, swarm_event: &SwarmEvent) { - if let Some(ref mut metrics) = self.metrics { + if let Some(ref mut metrics) = self.libp2p_metrics { match swarm_event { SwarmEvent::Behaviour(Event::Ping(ping_event)) => { metrics.record(ping_event); diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index b039a9c0c8..31ecae37a0 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -11,6 +11,8 @@ use event_listener_primitives::Bag; use futures::future::{Fuse, FusedFuture, FutureExt}; use libp2p::multiaddr::Protocol; use libp2p::{Multiaddr, PeerId}; +use prometheus_client::metrics::gauge::Gauge; +use prometheus_client::registry::Registry; use std::future::Future; use std::marker::PhantomData; use std::num::NonZeroUsize; @@ -21,6 +23,39 @@ use tokio::runtime::Handle; use tokio::task; use tracing::warn; +const NETWORKING_REGISTRY_PREFIX: &str = "subspace"; + +/// Metrics for Subspace networking +pub struct SubspaceMetrics { + established_connections: Gauge, +} + +impl SubspaceMetrics { + /// Constructor + pub fn new(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix(NETWORKING_REGISTRY_PREFIX); + + let gauge = Gauge::default(); + sub_registry.register( + "established_connections", + "The current number of established connections", + gauge.clone(), + ); + + Self { + established_connections: gauge, + } + } + + pub(crate) fn inc_established_connections(&mut self) { + self.established_connections.inc(); + } + + pub(crate) fn dec_established_connections(&mut self) { + self.established_connections.dec(); + } +} + /// Joins async join handle on drop pub(crate) struct AsyncJoinOnDrop(Option>>); diff --git a/crates/subspace-service/src/dsn.rs b/crates/subspace-service/src/dsn.rs index 8c159e3146..e185d27108 100644 --- a/crates/subspace-service/src/dsn.rs +++ b/crates/subspace-service/src/dsn.rs @@ -5,7 +5,6 @@ use std::num::NonZeroUsize; use std::path::PathBuf; use std::sync::Arc; use subspace_networking::libp2p::kad::Mode; -use subspace_networking::libp2p::metrics::Metrics; use subspace_networking::libp2p::{identity, Multiaddr}; use subspace_networking::utils::strip_peer_id; use subspace_networking::{ @@ -80,8 +79,8 @@ pub(crate) fn create_dsn_instance( ) -> Result<(Node, NodeRunner<()>, Option), DsnConfigurationError> { trace!("Subspace networking starting."); - let mut metric_registry = Registry::default(); - let metrics = enable_metrics.then(|| Metrics::new(&mut metric_registry)); + let mut metrics_registry = Registry::default(); + let dsn_metrics_registry = enable_metrics.then_some(&mut metrics_registry); let networking_parameters_registry = { // TODO: Make `base_path` point to `network` once we can clean up below migration code @@ -122,6 +121,7 @@ pub(crate) fn create_dsn_instance( keypair, (), Some(PeerInfoProvider::new_node()), + dsn_metrics_registry, ); let networking_config = subspace_networking::Config { @@ -147,13 +147,18 @@ pub(crate) fn create_dsn_instance( bootstrap_addresses: dsn_config.bootstrap_nodes, external_addresses: dsn_config.external_addresses, kademlia_mode: KademliaMode::Static(Mode::Client), - metrics, disable_bootstrap_on_start: dsn_config.disable_bootstrap_on_start, ..default_networking_config }; subspace_networking::construct(networking_config) - .map(|(node, node_runner)| (node, node_runner, enable_metrics.then_some(metric_registry))) + .map(|(node, node_runner)| { + ( + node, + node_runner, + enable_metrics.then_some(metrics_registry), + ) + }) .map_err(Into::into) }