diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 52f55c95f6..fca4a5abf8 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -71,10 +71,11 @@ pub fn new( let (event_sender, event_receiver) = mpsc::channel(1); ( - Client::new(command_sender, local_peer_id), + Client::new(command_sender.clone(), local_peer_id), event_receiver, MainLoop::new( swarm, + command_sender, command_receiver, event_sender, peers, @@ -191,6 +192,7 @@ enum Command { new_block: NewBlock, sender: EmptyResultSender, }, + CheckProtocols(PeerId), /// For testing purposes only _Test(TestCommand), } diff --git a/crates/p2p/src/main_loop.rs b/crates/p2p/src/main_loop.rs index e77d014940..dd91d55dfd 100644 --- a/crates/p2p/src/main_loop.rs +++ b/crates/p2p/src/main_loop.rs @@ -31,6 +31,7 @@ use crate::{ pub struct MainLoop { bootstrap_cfg: BootstrapConfig, swarm: libp2p::swarm::Swarm, + command_sender: mpsc::Sender, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, peers: Arc>, @@ -77,6 +78,7 @@ struct PendingQueries { impl MainLoop { pub(crate) fn new( swarm: libp2p::swarm::Swarm, + command_sender: mpsc::Sender, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, peers: Arc>, @@ -86,6 +88,7 @@ impl MainLoop { Self { bootstrap_cfg: periodic_cfg.bootstrap, swarm, + command_sender, command_receiver, event_sender, peers, @@ -171,6 +174,14 @@ impl MainLoop { SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => { + let command_sender = self.command_sender.clone(); + tokio::spawn(async move { + // After a reasonable delay, check that the peer is communicating with us + // in a meaningful way, over the protocols that move the chain forward. + tokio::time::sleep(Duration::from_secs(5)).await; + let _ = command_sender.send(Command::CheckProtocols(peer_id)).await; + }); + self.peers.write().await.peer_connected(&peer_id); if endpoint.is_dialer() { @@ -841,6 +852,26 @@ impl MainLoop { let result = self.publish_data(topic, &data); let _ = sender.send(result); } + Command::CheckProtocols(peer_id) => { + // Ensure that the peer is using at least one of the protocols which move + // the chain forward. + let behaviour = self.swarm.behaviour_mut(); + if !behaviour + .gossipsub_mut() + .all_peers() + .any(|(id, topics)| *id == peer_id && !topics.is_empty()) + && !behaviour.headers_sync_mut().is_connected(&peer_id) + && !behaviour.bodies_sync_mut().is_connected(&peer_id) + && !behaviour.transactions_sync_mut().is_connected(&peer_id) + && !behaviour.receipts_sync_mut().is_connected(&peer_id) + && !behaviour.events_sync_mut().is_connected(&peer_id) + { + tracing::debug!(%peer_id, "Peer is not using any meaningful protocol, disconnecting"); + if let Err(e) = self.disconnect(peer_id).await { + tracing::debug!(%peer_id, %e, "Failed to disconnect peer"); + } + } + } Command::_Test(command) => self.handle_test_command(command).await, }; }