Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix(p2p): accept listener connection during bootstrap (#484)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
Signed-off-by: Monir Hadji <[email protected]>
Co-authored-by: Monir Hadji <[email protected]>
  • Loading branch information
Freyskeyd and hadjiszs authored Mar 25, 2024
1 parent 5b6ddb8 commit b8cd730
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 63 deletions.
1 change: 1 addition & 0 deletions crates/topos-p2p/src/behaviour/grpc/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum Event {
OutboundSuccess {
peer_id: PeerId,
request_id: RequestId,
#[allow(unused)]
channel: Channel,
},

Expand Down
43 changes: 29 additions & 14 deletions crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
};
use futures::Stream;
use libp2p::{
core::upgrade,
core::{transport::MemoryTransport, upgrade},
dns,
identity::Keypair,
kad::store::MemoryStore,
Expand All @@ -30,6 +30,7 @@ use std::{
};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;

pub fn builder<'a>() -> NetworkBuilder<'a> {
NetworkBuilder::default()
Expand All @@ -48,9 +49,16 @@ pub struct NetworkBuilder<'a> {
local_port: Option<u8>,
config: NetworkConfig,
grpc_context: GrpcContext,
memory_transport: bool,
}

impl<'a> NetworkBuilder<'a> {
#[cfg(test)]
pub(crate) fn memory(mut self) -> Self {
self.memory_transport = true;

self
}
pub fn grpc_context(mut self, grpc_context: GrpcContext) -> Self {
self.grpc_context = grpc_context;

Expand Down Expand Up @@ -131,6 +139,7 @@ impl<'a> NetworkBuilder<'a> {

let grpc = grpc::Behaviour::new(self.grpc_context);

debug!("Known peers: {:?}", self.known_peers);
let behaviour = Behaviour {
gossipsub,
peer_info: PeerInfoBehaviour::new(PEER_INFO_PROTOCOL, &peer_key),
Expand All @@ -148,23 +157,29 @@ impl<'a> NetworkBuilder<'a> {
grpc,
};

let transport = {
let multiplex_config = libp2p::yamux::Config::default();

let transport = if self.memory_transport {
MemoryTransport::new()
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&peer_key)?)
.multiplex(multiplex_config)
.timeout(TWO_HOURS)
.boxed()
} else {
let tcp = libp2p::tcp::tokio::Transport::new(Config::default().nodelay(true));
let dns_tcp = dns::tokio::Transport::system(tcp).unwrap();

let tcp = libp2p::tcp::tokio::Transport::new(Config::default().nodelay(true));
dns_tcp.or_transport(tcp)
dns_tcp
.or_transport(tcp)
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&peer_key)?)
.multiplex(multiplex_config)
.timeout(TWO_HOURS)
.boxed()
};

let multiplex_config = libp2p::yamux::Config::default();

let transport = transport
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&peer_key)?)
.multiplex(multiplex_config)
.timeout(TWO_HOURS)
.boxed();

let swarm = Swarm::new(
transport,
behaviour,
Expand Down Expand Up @@ -216,8 +231,8 @@ impl<'a> NetworkBuilder<'a> {
pending_record_requests: HashMap::new(),
shutdown,
health_state: crate::runtime::HealthState {
bootpeer_connection_retries: 3,
successfully_connected_to_bootpeer: if self.known_peers.is_empty() {
bootnode_connection_retries: 3,
successfully_connected_to_bootnode: if self.known_peers.is_empty() {
// Node seems to be a boot node
Some(ConnectionId::new_unchecked(0))
} else {
Expand Down
61 changes: 43 additions & 18 deletions crates/topos-p2p/src/runtime/handle_event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use libp2p::{multiaddr::Protocol, swarm::SwarmEvent};
use libp2p::{core::Endpoint, multiaddr::Protocol, swarm::SwarmEvent};
use tracing::{debug, error, info, warn};

use crate::{error::P2PError, event::ComposedEvent, Event, Runtime};
Expand Down Expand Up @@ -62,13 +62,13 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
error,
} if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
&& self.health_state.dialed_bootpeer.contains(&connection_id) =>
&& self.health_state.dialed_bootnode.contains(&connection_id) =>
{
warn!("Unable to connect to bootpeer {peer_id}: {error:?}");
self.health_state.dialed_bootpeer.remove(&connection_id);
if self.health_state.dialed_bootpeer.is_empty() {
warn!("Unable to connect to bootnode {peer_id}: {error:?}");
self.health_state.dialed_bootnode.remove(&connection_id);
if self.health_state.dialed_bootnode.is_empty() {
// We tried to connect to all bootnode without success
error!("Unable to connect to any bootnode");
}
Expand Down Expand Up @@ -100,25 +100,49 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
num_established,
concurrent_dial_errors,
established_in,
} if self.health_state.dialed_bootpeer.contains(&connection_id) => {
info!("Successfully connected to bootpeer {peer_id}");
} if self.health_state.dialed_bootnode.contains(&connection_id) => {
info!("Successfully connected to bootnode {peer_id}");
if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
{
self.health_state.successfully_connected_to_bootpeer = Some(connection_id);
_ = self.health_state.dialed_bootpeer.remove(&connection_id);
self.health_state.successfully_connected_to_bootnode = Some(connection_id);
_ = self.health_state.dialed_bootnode.remove(&connection_id);
}
}

SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
peer_id,
endpoint,
connection_id,
..
} => {
info!(
"Connection established with peer {peer_id} as {:?}",
endpoint.to_endpoint()
);
if self
.health_state
.successfully_connected_to_bootnode
.is_none()
&& self.boot_peers.contains(&peer_id)
{
info!(
"Connection established with bootnode {peer_id} as {:?}",
endpoint.to_endpoint()
);

if endpoint.to_endpoint() == Endpoint::Listener {
if let Err(error) = self.swarm.dial(peer_id) {
error!(
"Unable to dial bootnode {peer_id} after incoming connection: \
{error}"
);
}
}
} else {
info!(
"Connection established with peer {peer_id} as {:?}",
endpoint.to_endpoint()
);
}

if self.swarm.connected_peers().count() >= self.config.minimum_cluster_size {
if let Err(error) = self.swarm.behaviour_mut().gossipsub.subscribe() {
Expand Down Expand Up @@ -164,8 +188,8 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
peer_id: Some(ref peer_id),
connection_id,
} if self.boot_peers.contains(peer_id) => {
info!("Dialing bootpeer {peer_id} on connection: {connection_id}");
self.health_state.dialed_bootpeer.insert(connection_id);
info!("Dialing bootnode {peer_id} on connection: {connection_id}");
self.health_state.dialed_bootnode.insert(connection_id);
}

SwarmEvent::Dialing {
Expand All @@ -185,6 +209,7 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
SwarmEvent::ListenerError { listener_id, error } => {
error!("Unhandled ListenerError {listener_id:?} | {error}")
}

event => {
warn!("Unhandled SwarmEvent: {:?}", event);
}
Expand Down
10 changes: 5 additions & 5 deletions crates/topos-p2p/src/runtime/handle_event/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl EventHandler<Box<Event>> for Runtime {
{
if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
{
warn!(
Expand Down Expand Up @@ -85,11 +85,11 @@ impl EventHandler<Box<Event>> for Runtime {
} if num_remaining == 0
&& self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
{
match self.health_state.bootpeer_connection_retries.checked_sub(1) {
match self.health_state.bootnode_connection_retries.checked_sub(1) {
None => {
error!(
"Bootstrap query finished but unable to connect to bootnode, stopping"
Expand All @@ -103,7 +103,7 @@ impl EventHandler<Box<Event>> for Runtime {
{} more times",
new
);
self.health_state.bootpeer_connection_retries = new;
self.health_state.bootnode_connection_retries = new;
}
}
}
Expand All @@ -119,7 +119,7 @@ impl EventHandler<Box<Event>> for Runtime {
} if num_remaining == 0
&& self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_some()
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
{
Expand Down
44 changes: 43 additions & 1 deletion crates/topos-p2p/src/runtime/handle_event/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,52 @@
use tracing::debug;

use crate::{behaviour::grpc, Runtime};

use super::{EventHandler, EventResult};

#[async_trait::async_trait]
impl EventHandler<grpc::Event> for Runtime {
async fn handle(&mut self, _event: grpc::Event) -> EventResult {
async fn handle(&mut self, event: grpc::Event) -> EventResult {
match event {
grpc::Event::OutboundFailure {
peer_id,
request_id,
error,
} => {
debug!(
"Outbound connection failure to peer {} for request {}: {}",
peer_id, request_id, error
);
}
grpc::Event::OutboundSuccess {
peer_id,
request_id,
..
} => {
debug!(
"Outbound connection success to peer {} for request {}",
peer_id, request_id
);
}
grpc::Event::InboundNegotiatedConnection {
request_id,
connection_id,
} => {
debug!(
"Inbound connection negotiated for request {} with connection {}",
request_id, connection_id
);
}
grpc::Event::OutboundNegotiatedConnection {
peer_id,
request_id,
} => {
debug!(
"Outbound connection negotiated to peer {} for request {}",
peer_id, request_id
);
}
}
Ok(())
}
}
6 changes: 3 additions & 3 deletions crates/topos-p2p/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ pub(crate) struct HealthState {
/// Indicates if the node is listening on any address
pub(crate) is_listening: bool,
/// List the bootnodes that the node has tried to connect to
pub(crate) dialed_bootpeer: HashSet<ConnectionId>,
pub(crate) dialed_bootnode: HashSet<ConnectionId>,
/// Indicates if the node has successfully connected to a bootnode
pub(crate) successfully_connected_to_bootpeer: Option<ConnectionId>,
pub(crate) successfully_connected_to_bootnode: Option<ConnectionId>,
/// Track the number of remaining retries to connect to any bootnode
pub(crate) bootpeer_connection_retries: usize,
pub(crate) bootnode_connection_retries: usize,
}

impl Runtime {
Expand Down
60 changes: 60 additions & 0 deletions crates/topos-p2p/src/tests/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::time::Duration;

use futures::{future::join_all, FutureExt};
use rstest::rstest;
use test_log::test;
use topos_test_sdk::tce::NodeConfig;
use tracing::Instrument;

#[rstest]
#[test(tokio::test)]
#[timeout(Duration::from_secs(5))]
async fn two_bootnode_communicating() {
let bootnode = NodeConfig::memory(2);
let local = NodeConfig::memory(1);
let bootnode_known_peers = vec![(local.peer_id(), local.addr.clone())];
let local_known_peers = vec![(bootnode.peer_id(), bootnode.addr.clone())];

let mut handlers = Vec::new();

let context_local = tracing::info_span!("start_node", "peer_id" = local.peer_id().to_string());

let context_bootnode =
tracing::info_span!("start_node", "peer_id" = bootnode.peer_id().to_string());
handlers.push(
async move {
let (client, mut stream, runtime) = crate::network::builder()
.minimum_cluster_size(1)
.peer_key(local.keypair.clone())
.listen_addresses(&[local.addr.clone()])
.known_peers(&local_known_peers)
.memory()
.build()
.await
.expect("Unable to create p2p network");

runtime.bootstrap(&mut stream).await
}
.instrument(context_local)
.boxed(),
);

handlers.push(
async move {
let (client, mut stream, runtime) = crate::network::builder()
.minimum_cluster_size(1)
.peer_key(bootnode.keypair.clone())
.listen_addresses(&[bootnode.addr.clone()])
.known_peers(&bootnode_known_peers)
.memory()
.build()
.await
.expect("Unable to create p2p network");

runtime.bootstrap(&mut stream).await
}
.instrument(context_bootnode)
.boxed(),
);
assert!(join_all(handlers).await.iter().all(Result::is_ok));
}
1 change: 1 addition & 0 deletions crates/topos-p2p/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod behaviour;
mod bootstrap;
mod command;
mod support;
2 changes: 1 addition & 1 deletion crates/topos-test-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ethers.workspace = true
async-trait.workspace = true
futures.workspace = true
lazy_static = { version = "1.4.0" }
libp2p.workspace = true
libp2p = { workspace = true, features = ["macros"] }
proc_macro_sdk = { path = "./proc_macro_sdk/" }
rand.workspace = true
rstest.workspace = true
Expand Down
Loading

0 comments on commit b8cd730

Please sign in to comment.