Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add infrastructure for Subspace networking metrics. #2284

Merged
merged 4 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use lru::LruCache;
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::fs;
use std::net::SocketAddr;
use std::num::{NonZeroU8, NonZeroUsize};
Expand Down Expand Up @@ -348,9 +349,11 @@ where

let (piece_cache, piece_cache_worker) = PieceCache::new(node_client.clone(), peer_id);

// Metrics
let mut prometheus_metrics_registry = Registry::default();
let metrics_endpoints_are_specified = !metrics_endpoints.is_empty();

let (node, mut node_runner, metrics_registry) = {
let (node, mut node_runner) = {
if dsn.bootstrap_nodes.is_empty() {
dsn.bootstrap_nodes = farmer_app_info.dsn_bootstrap_nodes.clone();
}
Expand All @@ -363,14 +366,14 @@ where
Arc::downgrade(&readers_and_pieces),
node_client.clone(),
piece_cache.clone(),
metrics_endpoints_are_specified,
metrics_endpoints_are_specified.then_some(&mut prometheus_metrics_registry),
)?
};

let _prometheus_worker = if metrics_endpoints_are_specified {
let prometheus_task = start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(metrics_registry),
RegistryAdapter::Libp2p(prometheus_metrics_registry),
)?;

let join_handle = tokio::spawn(prometheus_task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::{NodeClient, NodeRpcClient, KNOWN_PEERS_CACHE_SIZE};
use subspace_networking::libp2p::identity::Keypair;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::metrics::Metrics;
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::strip_peer_id;
Expand Down Expand Up @@ -50,8 +49,8 @@ pub(super) fn configure_dsn(
weak_readers_and_pieces: Weak<Mutex<Option<ReadersAndPieces>>>,
node_client: NodeRpcClient,
piece_cache: PieceCache,
initialize_metrics: bool,
) -> Result<(Node, NodeRunner<PieceCache>, Registry), anyhow::Error> {
prometheus_metrics_registry: Option<&mut Registry>,
) -> Result<(Node, NodeRunner<PieceCache>), anyhow::Error> {
let networking_parameters_registry = KnownPeersManager::new(KnownPeersManagerConfig {
path: Some(base_path.join("known_addresses.bin").into_boxed_path()),
ignore_peer_list: strip_peer_id(bootstrap_nodes.clone())
Expand All @@ -63,15 +62,12 @@ pub(super) fn configure_dsn(
})
.map(Box::new)?;

// Metrics
let mut metrics_registry = Registry::default();
let metrics = initialize_metrics.then(|| Metrics::new(&mut metrics_registry));

let default_config = Config::new(
protocol_prefix,
keypair,
piece_cache.clone(),
Some(PeerInfoProvider::new_farmer()),
prometheus_metrics_registry,
);
let config = Config {
reserved_peers,
Expand Down Expand Up @@ -203,7 +199,6 @@ pub(super) fn configure_dsn(
bootstrap_addresses: bootstrap_nodes,
kademlia_mode: KademliaMode::Dynamic,
external_addresses,
metrics,
disable_bootstrap_on_start,
..default_config
};
Expand All @@ -223,7 +218,7 @@ pub(super) fn configure_dsn(
.detach();

// Consider returning HandlerId instead of each `detach()` calls for other usages.
(node, node_runner, metrics_registry)
(node, node_runner)
})
.map_err(Into::into)
}
8 changes: 7 additions & 1 deletion crates/subspace-networking/examples/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,13 @@ pub async fn configure_dsn(
) -> Node {
let keypair = Keypair::generate_ed25519();

let default_config = Config::new(protocol_prefix, keypair, (), Some(PeerInfoProvider::Client));
let default_config = Config::new(
protocol_prefix,
keypair,
(),
Some(PeerInfoProvider::Client),
None,
);

let config = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/examples/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() {
let config_1 = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
metrics: Some(metrics),
libp2p_metrics: Some(metrics),
..Config::default()
};
let (node_1, mut node_runner_1) = subspace_networking::construct(config_1).unwrap();
Expand Down
8 changes: 7 additions & 1 deletion crates/subspace-networking/examples/random-walker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,13 @@ async fn configure_dsn(
) -> Node {
let keypair = Keypair::generate_ed25519();

let default_config = Config::new(protocol_prefix, keypair, (), Some(PeerInfoProvider::Client));
let default_config = Config::new(
protocol_prefix,
keypair,
(),
Some(PeerInfoProvider::Client),
None,
);

let config = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use clap::Parser;
use futures::{select, FutureExt};
use libp2p::identity::ed25519::Keypair;
use libp2p::kad::Mode;
use libp2p::metrics::Metrics;
use libp2p::{identity, Multiaddr, PeerId};
use prometheus_client::registry::Registry;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -153,19 +152,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
let keypair = identity::Keypair::from(decoded_keypair);

// Metrics
let mut metric_registry = Registry::default();
let metrics_endpoints_are_specified = !metrics_endpoints.is_empty();
let metrics =
metrics_endpoints_are_specified.then(|| Metrics::new(&mut metric_registry));

let prometheus_task = metrics_endpoints_are_specified
.then(|| {
start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(metric_registry),
)
})
.transpose()?;
let mut metrics_registry = Registry::default();
let dsn_metrics_registry =
metrics_endpoints_are_specified.then_some(&mut metrics_registry);

let known_peers_registry_config = KnownPeersManagerConfig {
enable_known_peers_source: false,
Expand Down Expand Up @@ -199,10 +189,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
bootstrap_addresses: bootstrap_nodes,
kademlia_mode: KademliaMode::Static(Mode::Server),
external_addresses,
metrics,
networking_parameters_registry: known_peers_registry.boxed(),

..Config::new(protocol_version.to_string(), keypair, (), None)
..Config::new(
protocol_version.to_string(),
keypair,
(),
None,
dsn_metrics_registry,
)
};
let (node, mut node_runner) =
subspace_networking::construct(config).expect("Networking stack creation failed.");
Expand All @@ -220,6 +215,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
.detach();

info!("Subspace Bootstrap Node started");

let prometheus_task = metrics_endpoints_are_specified
.then(|| {
start_prometheus_metrics_server(
metrics_endpoints,
RegistryAdapter::Libp2p(metrics_registry),
)
})
.transpose()?;
if let Some(prometheus_task) = prometheus_task {
select! {
_ = node_runner.run().fuse() => {},
Expand Down
25 changes: 21 additions & 4 deletions crates/subspace-networking/src/constructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::protocols::request_response::request_response_factory::RequestHandler
use crate::protocols::reserved_peers::Config as ReservedPeersConfig;
use crate::shared::Shared;
use crate::utils::rate_limiter::RateLimiter;
use crate::utils::strip_peer_id;
use crate::utils::{strip_peer_id, SubspaceMetrics};
use crate::{PeerInfo, PeerInfoConfig};
use backoff::{ExponentialBackoff, SystemClock};
use futures::channel::mpsc;
Expand All @@ -35,6 +35,7 @@ use libp2p::multiaddr::Protocol;
use libp2p::yamux::Config as YamuxConfig;
use libp2p::{identity, Multiaddr, PeerId, StreamProtocol, SwarmBuilder, TransportError};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::borrow::Cow;
use std::iter::Empty;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -238,8 +239,10 @@ pub struct Config<LocalRecordProvider> {
pub temporary_bans_cache_size: NonZeroUsize,
/// Backoff policy for temporary banning of unreachable peers.
pub temporary_ban_backoff: ExponentialBackoff,
/// Optional external prometheus metrics. None will disable metrics gathering.
pub metrics: Option<Metrics>,
/// Optional libp2p prometheus metrics. None will disable metrics gathering.
pub libp2p_metrics: Option<Metrics>,
/// Internal prometheus metrics. None will disable metrics gathering.
pub metrics: Option<SubspaceMetrics>,
/// Defines protocol version for the network peers. Affects network partition.
pub protocol_version: String,
/// Specifies a source for peer information. None disables the protocol.
Expand Down Expand Up @@ -292,6 +295,7 @@ impl Default for Config<()> {
keypair,
(),
Some(PeerInfoProvider::new_client()),
None,
)
}
}
Expand All @@ -306,7 +310,17 @@ where
keypair: identity::Keypair,
local_records_provider: LocalRecordProvider,
peer_info_provider: Option<PeerInfoProvider>,
prometheus_registry: Option<&mut Registry>,
) -> Self {
let (libp2p_metrics, metrics) = prometheus_registry
.map(|registry| {
(
Some(Metrics::new(registry)),
Some(SubspaceMetrics::new(registry)),
)
})
.unwrap_or((None, None));

let mut kademlia = KademliaConfig::default();
kademlia
.set_query_timeout(KADEMLIA_QUERY_TIMEOUT)
Expand Down Expand Up @@ -379,7 +393,8 @@ where
max_pending_outgoing_connections: SWARM_MAX_PENDING_OUTGOING_CONNECTIONS,
temporary_bans_cache_size: TEMPORARY_BANS_CACHE_SIZE,
temporary_ban_backoff,
metrics: None,
libp2p_metrics,
metrics,
protocol_version,
peer_info_provider,
// Don't need to keep additional connections by default
Expand Down Expand Up @@ -450,6 +465,7 @@ where
max_pending_outgoing_connections,
temporary_bans_cache_size,
temporary_ban_backoff,
libp2p_metrics,
metrics,
protocol_version,
peer_info_provider,
Expand Down Expand Up @@ -639,6 +655,7 @@ where
networking_parameters_registry,
reserved_peers: strip_peer_id(reserved_peers).into_iter().collect(),
temporary_bans,
libp2p_metrics,
metrics,
protocol_version,
general_connection_decision_handler,
Expand Down
23 changes: 18 additions & 5 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::protocols::request_response::request_response_factory::{
};
use crate::shared::{Command, CreatedSubscription, NewPeerInfo, PeerDiscovered, Shared};
use crate::utils::rate_limiter::RateLimiterPermit;
use crate::utils::{is_global_address_or_dns, strip_peer_id, PeerAddress};
use crate::utils::{is_global_address_or_dns, strip_peer_id, PeerAddress, SubspaceMetrics};
use async_mutex::Mutex as AsyncMutex;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
Expand Down Expand Up @@ -123,8 +123,10 @@ where
reserved_peers: HashMap<PeerId, Multiaddr>,
/// Temporarily banned peers.
temporary_bans: Arc<Mutex<TemporaryBans>>,
/// Prometheus metrics.
metrics: Option<Metrics>,
/// Libp2p Prometheus metrics.
libp2p_metrics: Option<Metrics>,
/// Subspace Prometheus metrics.
metrics: Option<SubspaceMetrics>,
/// Mapping from specific peer to ip addresses
peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
/// Defines protocol version for the network peers. Affects network partition.
Expand Down Expand Up @@ -178,7 +180,8 @@ where
pub(crate) networking_parameters_registry: Box<dyn KnownPeersRegistry>,
pub(crate) reserved_peers: HashMap<PeerId, Multiaddr>,
pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
pub(crate) metrics: Option<Metrics>,
pub(crate) libp2p_metrics: Option<Metrics>,
pub(crate) metrics: Option<SubspaceMetrics>,
pub(crate) protocol_version: String,
pub(crate) general_connection_decision_handler: Option<ConnectedPeersHandler>,
pub(crate) special_connection_decision_handler: Option<ConnectedPeersHandler>,
Expand All @@ -201,6 +204,7 @@ where
mut networking_parameters_registry,
reserved_peers,
temporary_bans,
libp2p_metrics,
metrics,
protocol_version,
general_connection_decision_handler,
Expand Down Expand Up @@ -239,6 +243,7 @@ where
networking_parameters_registry,
reserved_peers,
temporary_bans,
libp2p_metrics,
metrics,
peer_ip_addresses: HashMap::new(),
protocol_version,
Expand Down Expand Up @@ -548,6 +553,10 @@ where
if num_established.get() == 1 {
shared.handlers.connected_peer.call_simple(&peer_id);
}

if let Some(metrics) = self.metrics.as_mut() {
metrics.inc_established_connections()
}
}
SwarmEvent::ConnectionClosed {
peer_id,
Expand Down Expand Up @@ -592,6 +601,10 @@ where
if num_established == 0 {
shared.handlers.disconnected_peer.call_simple(&peer_id);
}

if let Some(metrics) = self.metrics.as_mut() {
metrics.dec_established_connections()
};
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
if let Some(peer_id) = &peer_id {
Expand Down Expand Up @@ -1556,7 +1569,7 @@ where
}

fn register_event_metrics(&mut self, swarm_event: &SwarmEvent<Event>) {
if let Some(ref mut metrics) = self.metrics {
if let Some(ref mut metrics) = self.libp2p_metrics {
match swarm_event {
SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
metrics.record(ping_event);
Expand Down
Loading
Loading