Skip to content

Commit

Permalink
Add infrastructure for Subspace networking metrics. (#2284)
Browse files Browse the repository at this point in the history
* networking: Add subspace metrics infrastructure.

* networking: Refactor metrics infrastructure.

* Refactor networking metrics.
  • Loading branch information
shamil-gadelshin authored Dec 14, 2023
1 parent 591a50e commit 5be3c33
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use lru::LruCache;
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::fs;
use std::net::SocketAddr;
use std::num::{NonZeroU8, NonZeroUsize};
Expand Down Expand Up @@ -348,9 +349,11 @@ where

let (piece_cache, piece_cache_worker) = PieceCache::new(node_client.clone(), peer_id);

// Metrics
let mut prometheus_metrics_registry = Registry::default();
let metrics_endpoints_are_specified = !metrics_endpoints.is_empty();

let (node, mut node_runner, metrics_registry) = {
let (node, mut node_runner) = {
if dsn.bootstrap_nodes.is_empty() {
dsn.bootstrap_nodes = farmer_app_info.dsn_bootstrap_nodes.clone();
}
Expand All @@ -363,14 +366,14 @@ where
Arc::downgrade(&readers_and_pieces),
node_client.clone(),
piece_cache.clone(),
metrics_endpoints_are_specified,
metrics_endpoints_are_specified.then_some(&mut prometheus_metrics_registry),
)?
};

let _prometheus_worker = if metrics_endpoints_are_specified {
let prometheus_task = start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(metrics_registry),
RegistryAdapter::Libp2p(prometheus_metrics_registry),
)?;

let join_handle = tokio::spawn(prometheus_task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::{NodeClient, NodeRpcClient, KNOWN_PEERS_CACHE_SIZE};
use subspace_networking::libp2p::identity::Keypair;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::metrics::Metrics;
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::strip_peer_id;
Expand Down Expand Up @@ -50,8 +49,8 @@ pub(super) fn configure_dsn(
weak_readers_and_pieces: Weak<Mutex<Option<ReadersAndPieces>>>,
node_client: NodeRpcClient,
piece_cache: PieceCache,
initialize_metrics: bool,
) -> Result<(Node, NodeRunner<PieceCache>, Registry), anyhow::Error> {
prometheus_metrics_registry: Option<&mut Registry>,
) -> Result<(Node, NodeRunner<PieceCache>), anyhow::Error> {
let networking_parameters_registry = KnownPeersManager::new(KnownPeersManagerConfig {
path: Some(base_path.join("known_addresses.bin").into_boxed_path()),
ignore_peer_list: strip_peer_id(bootstrap_nodes.clone())
Expand All @@ -63,15 +62,12 @@ pub(super) fn configure_dsn(
})
.map(Box::new)?;

// Metrics
let mut metrics_registry = Registry::default();
let metrics = initialize_metrics.then(|| Metrics::new(&mut metrics_registry));

let default_config = Config::new(
protocol_prefix,
keypair,
piece_cache.clone(),
Some(PeerInfoProvider::new_farmer()),
prometheus_metrics_registry,
);
let config = Config {
reserved_peers,
Expand Down Expand Up @@ -203,7 +199,6 @@ pub(super) fn configure_dsn(
bootstrap_addresses: bootstrap_nodes,
kademlia_mode: KademliaMode::Dynamic,
external_addresses,
metrics,
disable_bootstrap_on_start,
..default_config
};
Expand All @@ -223,7 +218,7 @@ pub(super) fn configure_dsn(
.detach();

// Consider returning HandlerId instead of each `detach()` calls for other usages.
(node, node_runner, metrics_registry)
(node, node_runner)
})
.map_err(Into::into)
}
8 changes: 7 additions & 1 deletion crates/subspace-networking/examples/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,13 @@ pub async fn configure_dsn(
) -> Node {
let keypair = Keypair::generate_ed25519();

let default_config = Config::new(protocol_prefix, keypair, (), Some(PeerInfoProvider::Client));
let default_config = Config::new(
protocol_prefix,
keypair,
(),
Some(PeerInfoProvider::Client),
None,
);

let config = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/examples/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() {
let config_1 = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
metrics: Some(metrics),
libp2p_metrics: Some(metrics),
..Config::default()
};
let (node_1, mut node_runner_1) = subspace_networking::construct(config_1).unwrap();
Expand Down
8 changes: 7 additions & 1 deletion crates/subspace-networking/examples/random-walker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,13 @@ async fn configure_dsn(
) -> Node {
let keypair = Keypair::generate_ed25519();

let default_config = Config::new(protocol_prefix, keypair, (), Some(PeerInfoProvider::Client));
let default_config = Config::new(
protocol_prefix,
keypair,
(),
Some(PeerInfoProvider::Client),
None,
);

let config = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
Expand Down
34 changes: 19 additions & 15 deletions crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use clap::Parser;
use futures::{select, FutureExt};
use libp2p::identity::ed25519::Keypair;
use libp2p::kad::Mode;
use libp2p::metrics::Metrics;
use libp2p::{identity, Multiaddr, PeerId};
use prometheus_client::registry::Registry;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -153,19 +152,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
let keypair = identity::Keypair::from(decoded_keypair);

// Metrics
let mut metric_registry = Registry::default();
let metrics_endpoints_are_specified = !metrics_endpoints.is_empty();
let metrics =
metrics_endpoints_are_specified.then(|| Metrics::new(&mut metric_registry));

let prometheus_task = metrics_endpoints_are_specified
.then(|| {
start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(metric_registry),
)
})
.transpose()?;
let mut metrics_registry = Registry::default();
let dsn_metrics_registry =
metrics_endpoints_are_specified.then_some(&mut metrics_registry);

let known_peers_registry_config = KnownPeersManagerConfig {
enable_known_peers_source: false,
Expand Down Expand Up @@ -199,10 +189,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
bootstrap_addresses: bootstrap_nodes,
kademlia_mode: KademliaMode::Static(Mode::Server),
external_addresses,
metrics,
networking_parameters_registry: known_peers_registry.boxed(),

..Config::new(protocol_version.to_string(), keypair, (), None)
..Config::new(
protocol_version.to_string(),
keypair,
(),
None,
dsn_metrics_registry,
)
};
let (node, mut node_runner) =
subspace_networking::construct(config).expect("Networking stack creation failed.");
Expand All @@ -220,6 +215,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
.detach();

info!("Subspace Bootstrap Node started");

let prometheus_task = metrics_endpoints_are_specified
.then(|| {
start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(metrics_registry),
)
})
.transpose()?;
if let Some(prometheus_task) = prometheus_task {
select! {
_ = node_runner.run().fuse() => {},
Expand Down
25 changes: 21 additions & 4 deletions crates/subspace-networking/src/constructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::protocols::request_response::request_response_factory::RequestHandler
use crate::protocols::reserved_peers::Config as ReservedPeersConfig;
use crate::shared::Shared;
use crate::utils::rate_limiter::RateLimiter;
use crate::utils::strip_peer_id;
use crate::utils::{strip_peer_id, SubspaceMetrics};
use crate::{PeerInfo, PeerInfoConfig};
use backoff::{ExponentialBackoff, SystemClock};
use futures::channel::mpsc;
Expand All @@ -35,6 +35,7 @@ use libp2p::multiaddr::Protocol;
use libp2p::yamux::Config as YamuxConfig;
use libp2p::{identity, Multiaddr, PeerId, StreamProtocol, SwarmBuilder, TransportError};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::borrow::Cow;
use std::iter::Empty;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -238,8 +239,10 @@ pub struct Config<LocalRecordProvider> {
pub temporary_bans_cache_size: NonZeroUsize,
/// Backoff policy for temporary banning of unreachable peers.
pub temporary_ban_backoff: ExponentialBackoff,
/// Optional external prometheus metrics. None will disable metrics gathering.
pub metrics: Option<Metrics>,
/// Optional libp2p prometheus metrics. None will disable metrics gathering.
pub libp2p_metrics: Option<Metrics>,
/// Internal prometheus metrics. None will disable metrics gathering.
pub metrics: Option<SubspaceMetrics>,
/// Defines protocol version for the network peers. Affects network partition.
pub protocol_version: String,
/// Specifies a source for peer information. None disables the protocol.
Expand Down Expand Up @@ -292,6 +295,7 @@ impl Default for Config<()> {
keypair,
(),
Some(PeerInfoProvider::new_client()),
None,
)
}
}
Expand All @@ -306,7 +310,17 @@ where
keypair: identity::Keypair,
local_records_provider: LocalRecordProvider,
peer_info_provider: Option<PeerInfoProvider>,
prometheus_registry: Option<&mut Registry>,
) -> Self {
let (libp2p_metrics, metrics) = prometheus_registry
.map(|registry| {
(
Some(Metrics::new(registry)),
Some(SubspaceMetrics::new(registry)),
)
})
.unwrap_or((None, None));

let mut kademlia = KademliaConfig::default();
kademlia
.set_query_timeout(KADEMLIA_QUERY_TIMEOUT)
Expand Down Expand Up @@ -379,7 +393,8 @@ where
max_pending_outgoing_connections: SWARM_MAX_PENDING_OUTGOING_CONNECTIONS,
temporary_bans_cache_size: TEMPORARY_BANS_CACHE_SIZE,
temporary_ban_backoff,
metrics: None,
libp2p_metrics,
metrics,
protocol_version,
peer_info_provider,
// Don't need to keep additional connections by default
Expand Down Expand Up @@ -450,6 +465,7 @@ where
max_pending_outgoing_connections,
temporary_bans_cache_size,
temporary_ban_backoff,
libp2p_metrics,
metrics,
protocol_version,
peer_info_provider,
Expand Down Expand Up @@ -639,6 +655,7 @@ where
networking_parameters_registry,
reserved_peers: strip_peer_id(reserved_peers).into_iter().collect(),
temporary_bans,
libp2p_metrics,
metrics,
protocol_version,
general_connection_decision_handler,
Expand Down
23 changes: 18 additions & 5 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::protocols::request_response::request_response_factory::{
};
use crate::shared::{Command, CreatedSubscription, NewPeerInfo, PeerDiscovered, Shared};
use crate::utils::rate_limiter::RateLimiterPermit;
use crate::utils::{is_global_address_or_dns, strip_peer_id, PeerAddress};
use crate::utils::{is_global_address_or_dns, strip_peer_id, PeerAddress, SubspaceMetrics};
use async_mutex::Mutex as AsyncMutex;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
Expand Down Expand Up @@ -123,8 +123,10 @@ where
reserved_peers: HashMap<PeerId, Multiaddr>,
/// Temporarily banned peers.
temporary_bans: Arc<Mutex<TemporaryBans>>,
/// Prometheus metrics.
metrics: Option<Metrics>,
/// Libp2p Prometheus metrics.
libp2p_metrics: Option<Metrics>,
/// Subspace Prometheus metrics.
metrics: Option<SubspaceMetrics>,
/// Mapping from specific peer to ip addresses
peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
/// Defines protocol version for the network peers. Affects network partition.
Expand Down Expand Up @@ -178,7 +180,8 @@ where
pub(crate) networking_parameters_registry: Box<dyn KnownPeersRegistry>,
pub(crate) reserved_peers: HashMap<PeerId, Multiaddr>,
pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
pub(crate) metrics: Option<Metrics>,
pub(crate) libp2p_metrics: Option<Metrics>,
pub(crate) metrics: Option<SubspaceMetrics>,
pub(crate) protocol_version: String,
pub(crate) general_connection_decision_handler: Option<ConnectedPeersHandler>,
pub(crate) special_connection_decision_handler: Option<ConnectedPeersHandler>,
Expand All @@ -201,6 +204,7 @@ where
mut networking_parameters_registry,
reserved_peers,
temporary_bans,
libp2p_metrics,
metrics,
protocol_version,
general_connection_decision_handler,
Expand Down Expand Up @@ -239,6 +243,7 @@ where
networking_parameters_registry,
reserved_peers,
temporary_bans,
libp2p_metrics,
metrics,
peer_ip_addresses: HashMap::new(),
protocol_version,
Expand Down Expand Up @@ -548,6 +553,10 @@ where
if num_established.get() == 1 {
shared.handlers.connected_peer.call_simple(&peer_id);
}

if let Some(metrics) = self.metrics.as_mut() {
metrics.inc_established_connections()
}
}
SwarmEvent::ConnectionClosed {
peer_id,
Expand Down Expand Up @@ -592,6 +601,10 @@ where
if num_established == 0 {
shared.handlers.disconnected_peer.call_simple(&peer_id);
}

if let Some(metrics) = self.metrics.as_mut() {
metrics.dec_established_connections()
};
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
if let Some(peer_id) = &peer_id {
Expand Down Expand Up @@ -1556,7 +1569,7 @@ where
}

fn register_event_metrics(&mut self, swarm_event: &SwarmEvent<Event>) {
if let Some(ref mut metrics) = self.metrics {
if let Some(ref mut metrics) = self.libp2p_metrics {
match swarm_event {
SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
metrics.record(ping_event);
Expand Down
Loading

0 comments on commit 5be3c33

Please sign in to comment.