diff --git a/crates/topos-p2p/src/behaviour/grpc/event.rs b/crates/topos-p2p/src/behaviour/grpc/event.rs index 4c9b72523..f297a8ec0 100644 --- a/crates/topos-p2p/src/behaviour/grpc/event.rs +++ b/crates/topos-p2p/src/behaviour/grpc/event.rs @@ -14,6 +14,7 @@ pub enum Event { OutboundSuccess { peer_id: PeerId, request_id: RequestId, + #[allow(unused)] channel: Channel, }, diff --git a/crates/topos-p2p/src/network.rs b/crates/topos-p2p/src/network.rs index d50f0adac..58d540aa8 100644 --- a/crates/topos-p2p/src/network.rs +++ b/crates/topos-p2p/src/network.rs @@ -14,7 +14,7 @@ use crate::{ }; use futures::Stream; use libp2p::{ - core::upgrade, + core::{transport::MemoryTransport, upgrade}, dns, identity::Keypair, kad::store::MemoryStore, @@ -30,6 +30,7 @@ use std::{ }; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; +use tracing::debug; pub fn builder<'a>() -> NetworkBuilder<'a> { NetworkBuilder::default() @@ -48,9 +49,16 @@ pub struct NetworkBuilder<'a> { local_port: Option, config: NetworkConfig, grpc_context: GrpcContext, + memory_transport: bool, } impl<'a> NetworkBuilder<'a> { + #[cfg(test)] + pub(crate) fn memory(mut self) -> Self { + self.memory_transport = true; + + self + } pub fn grpc_context(mut self, grpc_context: GrpcContext) -> Self { self.grpc_context = grpc_context; @@ -131,6 +139,7 @@ impl<'a> NetworkBuilder<'a> { let grpc = grpc::Behaviour::new(self.grpc_context); + debug!("Known peers: {:?}", self.known_peers); let behaviour = Behaviour { gossipsub, peer_info: PeerInfoBehaviour::new(PEER_INFO_PROTOCOL, &peer_key), @@ -148,23 +157,29 @@ impl<'a> NetworkBuilder<'a> { grpc, }; - let transport = { + let multiplex_config = libp2p::yamux::Config::default(); + + let transport = if self.memory_transport { + MemoryTransport::new() + .upgrade(upgrade::Version::V1) + .authenticate(noise::Config::new(&peer_key)?) + .multiplex(multiplex_config) + .timeout(TWO_HOURS) + .boxed() + } else { let tcp = libp2p::tcp::tokio::Transport::new(Config::default().nodelay(true)); let dns_tcp = dns::tokio::Transport::system(tcp).unwrap(); let tcp = libp2p::tcp::tokio::Transport::new(Config::default().nodelay(true)); - dns_tcp.or_transport(tcp) + dns_tcp + .or_transport(tcp) + .upgrade(upgrade::Version::V1) + .authenticate(noise::Config::new(&peer_key)?) + .multiplex(multiplex_config) + .timeout(TWO_HOURS) + .boxed() }; - let multiplex_config = libp2p::yamux::Config::default(); - - let transport = transport - .upgrade(upgrade::Version::V1) - .authenticate(noise::Config::new(&peer_key)?) - .multiplex(multiplex_config) - .timeout(TWO_HOURS) - .boxed(); - let swarm = Swarm::new( transport, behaviour, @@ -216,8 +231,8 @@ impl<'a> NetworkBuilder<'a> { pending_record_requests: HashMap::new(), shutdown, health_state: crate::runtime::HealthState { - bootpeer_connection_retries: 3, - successfully_connected_to_bootpeer: if self.known_peers.is_empty() { + bootnode_connection_retries: 3, + successfully_connected_to_bootnode: if self.known_peers.is_empty() { // Node seems to be a boot node Some(ConnectionId::new_unchecked(0)) } else { diff --git a/crates/topos-p2p/src/runtime/handle_event.rs b/crates/topos-p2p/src/runtime/handle_event.rs index 596859cd1..4f62303aa 100644 --- a/crates/topos-p2p/src/runtime/handle_event.rs +++ b/crates/topos-p2p/src/runtime/handle_event.rs @@ -1,4 +1,4 @@ -use libp2p::{multiaddr::Protocol, swarm::SwarmEvent}; +use libp2p::{core::Endpoint, multiaddr::Protocol, swarm::SwarmEvent}; use tracing::{debug, error, info, warn}; use crate::{error::P2PError, event::ComposedEvent, Event, Runtime}; @@ -62,13 +62,13 @@ impl EventHandler> for Runtime { error, } if self .health_state - .successfully_connected_to_bootpeer + .successfully_connected_to_bootnode .is_none() - && self.health_state.dialed_bootpeer.contains(&connection_id) => + && self.health_state.dialed_bootnode.contains(&connection_id) => { - warn!("Unable to connect to bootpeer {peer_id}: {error:?}"); - self.health_state.dialed_bootpeer.remove(&connection_id); - if self.health_state.dialed_bootpeer.is_empty() { + warn!("Unable to connect to bootnode {peer_id}: {error:?}"); + self.health_state.dialed_bootnode.remove(&connection_id); + if self.health_state.dialed_bootnode.is_empty() { // We tried to connect to all bootnode without success error!("Unable to connect to any bootnode"); } @@ -100,25 +100,49 @@ impl EventHandler> for Runtime { num_established, concurrent_dial_errors, established_in, - } if self.health_state.dialed_bootpeer.contains(&connection_id) => { - info!("Successfully connected to bootpeer {peer_id}"); + } if self.health_state.dialed_bootnode.contains(&connection_id) => { + info!("Successfully connected to bootnode {peer_id}"); if self .health_state - .successfully_connected_to_bootpeer + .successfully_connected_to_bootnode .is_none() { - self.health_state.successfully_connected_to_bootpeer = Some(connection_id); - _ = self.health_state.dialed_bootpeer.remove(&connection_id); + self.health_state.successfully_connected_to_bootnode = Some(connection_id); + _ = self.health_state.dialed_bootnode.remove(&connection_id); } } SwarmEvent::ConnectionEstablished { - peer_id, endpoint, .. + peer_id, + endpoint, + connection_id, + .. } => { - info!( - "Connection established with peer {peer_id} as {:?}", - endpoint.to_endpoint() - ); + if self + .health_state + .successfully_connected_to_bootnode + .is_none() + && self.boot_peers.contains(&peer_id) + { + info!( + "Connection established with bootnode {peer_id} as {:?}", + endpoint.to_endpoint() + ); + + if endpoint.to_endpoint() == Endpoint::Listener { + if let Err(error) = self.swarm.dial(peer_id) { + error!( + "Unable to dial bootnode {peer_id} after incoming connection: \ + {error}" + ); + } + } + } else { + info!( + "Connection established with peer {peer_id} as {:?}", + endpoint.to_endpoint() + ); + } if self.swarm.connected_peers().count() >= self.config.minimum_cluster_size { if let Err(error) = self.swarm.behaviour_mut().gossipsub.subscribe() { @@ -164,8 +188,8 @@ impl EventHandler> for Runtime { peer_id: Some(ref peer_id), connection_id, } if self.boot_peers.contains(peer_id) => { - info!("Dialing bootpeer {peer_id} on connection: {connection_id}"); - self.health_state.dialed_bootpeer.insert(connection_id); + info!("Dialing bootnode {peer_id} on connection: {connection_id}"); + self.health_state.dialed_bootnode.insert(connection_id); } SwarmEvent::Dialing { @@ -185,6 +209,7 @@ impl EventHandler> for Runtime { SwarmEvent::ListenerError { listener_id, error } => { error!("Unhandled ListenerError {listener_id:?} | {error}") } + event => { warn!("Unhandled SwarmEvent: {:?}", event); } diff --git a/crates/topos-p2p/src/runtime/handle_event/discovery.rs b/crates/topos-p2p/src/runtime/handle_event/discovery.rs index c82863046..7d3e4a545 100644 --- a/crates/topos-p2p/src/runtime/handle_event/discovery.rs +++ b/crates/topos-p2p/src/runtime/handle_event/discovery.rs @@ -45,7 +45,7 @@ impl EventHandler> for Runtime { { if self .health_state - .successfully_connected_to_bootpeer + .successfully_connected_to_bootnode .is_none() { warn!( @@ -85,11 +85,11 @@ impl EventHandler> for Runtime { } if num_remaining == 0 && self .health_state - .successfully_connected_to_bootpeer + .successfully_connected_to_bootnode .is_none() && self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy => { - match self.health_state.bootpeer_connection_retries.checked_sub(1) { + match self.health_state.bootnode_connection_retries.checked_sub(1) { None => { error!( "Bootstrap query finished but unable to connect to bootnode, stopping" @@ -103,7 +103,7 @@ impl EventHandler> for Runtime { {} more times", new ); - self.health_state.bootpeer_connection_retries = new; + self.health_state.bootnode_connection_retries = new; } } } @@ -119,7 +119,7 @@ impl EventHandler> for Runtime { } if num_remaining == 0 && self .health_state - .successfully_connected_to_bootpeer + .successfully_connected_to_bootnode .is_some() && self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy => { diff --git a/crates/topos-p2p/src/runtime/handle_event/grpc.rs b/crates/topos-p2p/src/runtime/handle_event/grpc.rs index a17c83a6a..bb275a14f 100644 --- a/crates/topos-p2p/src/runtime/handle_event/grpc.rs +++ b/crates/topos-p2p/src/runtime/handle_event/grpc.rs @@ -1,10 +1,52 @@ +use tracing::debug; + use crate::{behaviour::grpc, Runtime}; use super::{EventHandler, EventResult}; #[async_trait::async_trait] impl EventHandler for Runtime { - async fn handle(&mut self, _event: grpc::Event) -> EventResult { + async fn handle(&mut self, event: grpc::Event) -> EventResult { + match event { + grpc::Event::OutboundFailure { + peer_id, + request_id, + error, + } => { + debug!( + "Outbound connection failure to peer {} for request {}: {}", + peer_id, request_id, error + ); + } + grpc::Event::OutboundSuccess { + peer_id, + request_id, + .. + } => { + debug!( + "Outbound connection success to peer {} for request {}", + peer_id, request_id + ); + } + grpc::Event::InboundNegotiatedConnection { + request_id, + connection_id, + } => { + debug!( + "Inbound connection negotiated for request {} with connection {}", + request_id, connection_id + ); + } + grpc::Event::OutboundNegotiatedConnection { + peer_id, + request_id, + } => { + debug!( + "Outbound connection negotiated to peer {} for request {}", + peer_id, request_id + ); + } + } Ok(()) } } diff --git a/crates/topos-p2p/src/runtime/mod.rs b/crates/topos-p2p/src/runtime/mod.rs index 8e25dfff5..2d612b49c 100644 --- a/crates/topos-p2p/src/runtime/mod.rs +++ b/crates/topos-p2p/src/runtime/mod.rs @@ -62,11 +62,11 @@ pub(crate) struct HealthState { /// Indicates if the node is listening on any address pub(crate) is_listening: bool, /// List the bootnodes that the node has tried to connect to - pub(crate) dialed_bootpeer: HashSet, + pub(crate) dialed_bootnode: HashSet, /// Indicates if the node has successfully connected to a bootnode - pub(crate) successfully_connected_to_bootpeer: Option, + pub(crate) successfully_connected_to_bootnode: Option, /// Track the number of remaining retries to connect to any bootnode - pub(crate) bootpeer_connection_retries: usize, + pub(crate) bootnode_connection_retries: usize, } impl Runtime { diff --git a/crates/topos-p2p/src/tests/bootstrap.rs b/crates/topos-p2p/src/tests/bootstrap.rs new file mode 100644 index 000000000..87eff36ad --- /dev/null +++ b/crates/topos-p2p/src/tests/bootstrap.rs @@ -0,0 +1,60 @@ +use std::time::Duration; + +use futures::{future::join_all, FutureExt}; +use rstest::rstest; +use test_log::test; +use topos_test_sdk::tce::NodeConfig; +use tracing::Instrument; + +#[rstest] +#[test(tokio::test)] +#[timeout(Duration::from_secs(5))] +async fn two_bootnode_communicating() { + let bootnode = NodeConfig::memory(2); + let local = NodeConfig::memory(1); + let bootnode_known_peers = vec![(local.peer_id(), local.addr.clone())]; + let local_known_peers = vec![(bootnode.peer_id(), bootnode.addr.clone())]; + + let mut handlers = Vec::new(); + + let context_local = tracing::info_span!("start_node", "peer_id" = local.peer_id().to_string()); + + let context_bootnode = + tracing::info_span!("start_node", "peer_id" = bootnode.peer_id().to_string()); + handlers.push( + async move { + let (client, mut stream, runtime) = crate::network::builder() + .minimum_cluster_size(1) + .peer_key(local.keypair.clone()) + .listen_addresses(&[local.addr.clone()]) + .known_peers(&local_known_peers) + .memory() + .build() + .await + .expect("Unable to create p2p network"); + + runtime.bootstrap(&mut stream).await + } + .instrument(context_local) + .boxed(), + ); + + handlers.push( + async move { + let (client, mut stream, runtime) = crate::network::builder() + .minimum_cluster_size(1) + .peer_key(bootnode.keypair.clone()) + .listen_addresses(&[bootnode.addr.clone()]) + .known_peers(&bootnode_known_peers) + .memory() + .build() + .await + .expect("Unable to create p2p network"); + + runtime.bootstrap(&mut stream).await + } + .instrument(context_bootnode) + .boxed(), + ); + assert!(join_all(handlers).await.iter().all(Result::is_ok)); +} diff --git a/crates/topos-p2p/src/tests/mod.rs b/crates/topos-p2p/src/tests/mod.rs index d3acbd392..bff12350f 100644 --- a/crates/topos-p2p/src/tests/mod.rs +++ b/crates/topos-p2p/src/tests/mod.rs @@ -1,3 +1,4 @@ mod behaviour; +mod bootstrap; mod command; mod support; diff --git a/crates/topos-test-sdk/Cargo.toml b/crates/topos-test-sdk/Cargo.toml index f452cd268..54877a97e 100644 --- a/crates/topos-test-sdk/Cargo.toml +++ b/crates/topos-test-sdk/Cargo.toml @@ -24,7 +24,7 @@ ethers.workspace = true async-trait.workspace = true futures.workspace = true lazy_static = { version = "1.4.0" } -libp2p.workspace = true +libp2p = { workspace = true, features = ["macros"] } proc_macro_sdk = { path = "./proc_macro_sdk/" } rand.workspace = true rstest.workspace = true diff --git a/crates/topos-test-sdk/src/p2p/mod.rs b/crates/topos-test-sdk/src/p2p/mod.rs index 119329bc2..ff0df939e 100644 --- a/crates/topos-test-sdk/src/p2p/mod.rs +++ b/crates/topos-test-sdk/src/p2p/mod.rs @@ -1,24 +1,28 @@ use libp2p::{ + build_multiaddr, identity::{self, Keypair}, Multiaddr, }; +use rand::{thread_rng, Rng}; use crate::networking::get_available_port; -pub type Port = u16; - -pub fn local_peer(peer_index: u8) -> (Keypair, Port, Multiaddr) { +pub fn local_peer(peer_index: u8, memory_transport: bool) -> (Keypair, Multiaddr) { let peer_id: Keypair = keypair_from_seed(peer_index); - let port = get_available_port(); - let local_listen_addr: Multiaddr = format!( - "/ip4/127.0.0.1/tcp/{}/p2p/{}", - port, - peer_id.public().to_peer_id() - ) - .parse() - .unwrap(); + let local_listen_addr = if memory_transport { + build_multiaddr![Memory(thread_rng().gen::())] + } else { + let port = get_available_port(); + format!( + "/ip4/127.0.0.1/tcp/{}/p2p/{}", + port, + peer_id.public().to_peer_id() + ) + .parse() + .unwrap() + }; - (peer_id, port, local_listen_addr) + (peer_id, local_listen_addr) } pub fn keypair_from_seed(seed: u8) -> Keypair { diff --git a/crates/topos-test-sdk/src/tce/mod.rs b/crates/topos-test-sdk/src/tce/mod.rs index 432fd140a..2faaed193 100644 --- a/crates/topos-test-sdk/src/tce/mod.rs +++ b/crates/topos-test-sdk/src/tce/mod.rs @@ -96,7 +96,6 @@ impl TceContext { #[derive(Debug, Clone)] pub struct NodeConfig { pub seed: u8, - pub port: u16, pub keypair: Keypair, pub addr: Multiaddr, pub minimum_cluster_size: usize, @@ -117,12 +116,23 @@ impl NodeConfig { } } + pub fn memory(seed: u8) -> Self { + let (keypair, addr) = local_peer(seed, true); + + Self { + seed, + keypair, + addr, + minimum_cluster_size: 0, + dummy: false, + } + } + pub fn from_seed(seed: u8) -> Self { - let (keypair, port, addr) = local_peer(seed); + let (keypair, addr) = local_peer(seed, false); Self { seed, - port, keypair, addr, minimum_cluster_size: 0, @@ -148,7 +158,6 @@ impl NodeConfig { > { bootstrap_network( self.seed, - self.port, self.addr.clone(), peers, self.minimum_cluster_size, @@ -165,7 +174,6 @@ impl NodeConfig { ) -> Result<(NetworkClient, impl Stream, Runtime), P2PError> { create_network_worker( self.seed, - self.port, vec![self.addr.clone()], peers, self.minimum_cluster_size, @@ -230,7 +238,6 @@ pub async fn start_node( let (network_client, network_stream, runtime_join_handle) = bootstrap_network( config.seed, - config.port, config.addr.clone(), peers, config.minimum_cluster_size, diff --git a/crates/topos-test-sdk/src/tce/p2p.rs b/crates/topos-test-sdk/src/tce/p2p.rs index 888a2dc2b..d8209b454 100644 --- a/crates/topos-test-sdk/src/tce/p2p.rs +++ b/crates/topos-test-sdk/src/tce/p2p.rs @@ -12,7 +12,6 @@ use super::NodeConfig; pub async fn create_network_worker( seed: u8, - _port: u16, addr: Vec, peers: &[NodeConfig], minimum_cluster_size: usize, @@ -63,7 +62,6 @@ pub async fn create_network_worker( pub async fn bootstrap_network( seed: u8, - port: u16, addr: Multiaddr, peers: &[NodeConfig], minimum_cluster_size: usize, @@ -78,7 +76,7 @@ pub async fn bootstrap_network( Box, > { let (network_client, mut network_stream, runtime) = - create_network_worker(seed, port, vec![addr], peers, minimum_cluster_size, router) + create_network_worker(seed, vec![addr], peers, minimum_cluster_size, router) .in_current_span() .await?;