diff --git a/crates/p2p/src/main_loop.rs b/crates/p2p/src/main_loop.rs index c9d20d267..0792b2aba 100644 --- a/crates/p2p/src/main_loop.rs +++ b/crates/p2p/src/main_loop.rs @@ -179,7 +179,10 @@ impl MainLoop { // Connection management // =========================== SwarmEvent::ConnectionEstablished { - peer_id, endpoint, .. + peer_id, + endpoint, + num_established, + .. } => { // Extract the IP address of the peer from his multiaddr. let peer_ip = get_ip(endpoint.get_remote_address()); @@ -194,10 +197,21 @@ impl MainLoop { return; }; - // If this is an incoming connection, we have to prevent the peer from - // reconnecting too quickly. if endpoint.is_listener() { - // Different timeouts apply to direct peers and peers connecting over a relay. + // This is an incoming connection. + + // Only allow one connection per peer. + if num_established.get() > 1 { + tracing::debug!(%peer_id, "Peer has more than one connection, closing"); + if let Err(e) = self.disconnect(peer_id).await { + tracing::debug!(%e, "Failed to disconnect peer"); + } + return; + } + + // Prevent the peer from reconnecting too quickly. + + // Different connection timeouts apply to direct peers and peers connecting over a relay. let is_relay = endpoint .get_remote_address() .iter() @@ -213,7 +227,7 @@ impl MainLoop { if recent_peers.contains(&peer_ip) { tracing::debug!(%peer_id, "Peer attempted to reconnect too quickly, closing"); if let Err(e) = self.disconnect(peer_id).await { - tracing::error!(%e, "Failed to disconnect peer"); + tracing::debug!(%e, "Failed to disconnect peer"); } return; } else { diff --git a/crates/p2p/src/tests.rs b/crates/p2p/src/tests.rs index 3964096c1..a09961414 100644 --- a/crates/p2p/src/tests.rs +++ b/crates/p2p/src/tests.rs @@ -36,8 +36,7 @@ struct TestPeer { impl TestPeer { #[must_use] - pub fn new(periodic_cfg: PeriodicTaskConfig) -> Self { - let keypair = Keypair::generate_ed25519(); + pub fn new(periodic_cfg: PeriodicTaskConfig, keypair: Keypair) -> Self { let peer_id = keypair.public().to_peer_id(); let peers: Arc> = Default::default(); let (client, event_receiver, main_loop) = crate::new( @@ -95,7 +94,7 @@ impl TestPeer { impl Default for TestPeer { fn default() -> Self { - Self::new(Default::default()) + Self::new(Default::default(), Keypair::generate_ed25519()) } } @@ -237,9 +236,9 @@ async fn periodic_bootstrap() { direct_connection_timeout: Duration::from_millis(500), relay_connection_timeout: Duration::from_millis(500), }; - let mut boot = TestPeer::new(periodic_cfg); - let mut peer1 = TestPeer::new(periodic_cfg); - let mut peer2 = TestPeer::new(periodic_cfg); + let mut boot = TestPeer::new(periodic_cfg, Keypair::generate_ed25519()); + let mut peer1 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519()); + let mut peer2 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519()); let mut boot_addr = boot.start_listening().await.unwrap(); boot_addr.push(Protocol::P2p(boot.peer_id)); @@ -296,14 +295,14 @@ async fn reconnect_too_quickly() { bootstrap: BootstrapConfig { period: Duration::from_millis(500), // Bootstrapping can cause redials, so set the offset to a high value. - start_offset: Duration::from_secs(3), + start_offset: Duration::from_secs(10), }, direct_connection_timeout: CONNECTION_TIMEOUT, relay_connection_timeout: Duration::from_millis(500), }; - let mut peer1 = TestPeer::new(periodic_cfg); - let mut peer2 = TestPeer::new(periodic_cfg); + let mut peer1 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519()); + let mut peer2 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519()); let addr2 = peer2.start_listening().await.unwrap(); tracing::info!(%peer2.peer_id, %addr2); @@ -394,6 +393,82 @@ async fn reconnect_too_quickly() { .await; } +/// Test that each peer accepts at most one connection from any other peer, and duplicate +/// connections are closed. +#[test_log::test(tokio::test)] +async fn duplicate_connection() { + const CONNECTION_TIMEOUT: Duration = Duration::from_millis(50); + let periodic_cfg = PeriodicTaskConfig { + bootstrap: BootstrapConfig { + period: Duration::from_millis(500), + // Bootstrapping can cause redials, so set the offset to a high value. + start_offset: Duration::from_secs(10), + }, + direct_connection_timeout: CONNECTION_TIMEOUT, + relay_connection_timeout: Duration::from_millis(500), + }; + let keypair = Keypair::generate_ed25519(); + let mut peer1 = TestPeer::new(periodic_cfg, keypair.clone()); + let mut peer1_copy = TestPeer::new(periodic_cfg, keypair); + let mut peer2 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519()); + + let addr2 = peer2.start_listening().await.unwrap(); + tracing::info!(%peer2.peer_id, %addr2); + + // Open the connection. + peer1 + .client + .dial(peer2.peer_id, addr2.clone()) + .await + .unwrap(); + + wait_for_event(&mut peer1.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer2.peer_id => { + Some(()) + } + _ => None, + }) + .await; + + wait_for_event(&mut peer2.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer1.peer_id => { + Some(()) + } + _ => None, + }) + .await; + + // Ensure that the connection timeout has passed, so this is not the reason why the connection + // is getting closed. + tokio::time::sleep(CONNECTION_TIMEOUT).await; + + // Try to open another connection using the same peer ID and IP address (in this case, + // localhost). + peer1_copy + .client + .dial(peer2.peer_id, addr2.clone()) + .await + .unwrap(); + + wait_for_event(&mut peer1_copy.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer2.peer_id => { + Some(()) + } + _ => None, + }) + .await; + + wait_for_event(&mut peer1_copy.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionClosed { remote, .. }) if remote == peer2.peer_id => { + Some(()) + } + _ => None, + }) + .await; + + assert!(peer1_copy.connected().await.is_empty()); +} + #[rstest] #[case::server_to_client(server_to_client().await)] #[case::client_to_server(client_to_server().await)] diff --git a/crates/rpc/src/jsonrpc/request.rs b/crates/rpc/src/jsonrpc/request.rs index c79dd2b3a..fdef96240 100644 --- a/crates/rpc/src/jsonrpc/request.rs +++ b/crates/rpc/src/jsonrpc/request.rs @@ -222,7 +222,7 @@ mod tests { #[case::object_with_spaces("{ }")] #[case::object_with_newlines("{ \n }")] fn empty(#[case] s: &str) { - let raw_value = RawValue::from_string(dbg!(s).to_owned()).unwrap(); + let raw_value = RawValue::from_string(s.to_owned()).unwrap(); let uut = RawParams(Some(&raw_value)); assert!(uut.is_empty()); diff --git a/crates/rpc/src/jsonrpc/websocket/logic.rs b/crates/rpc/src/jsonrpc/websocket/logic.rs index d0228dbc0..5b07335a4 100644 --- a/crates/rpc/src/jsonrpc/websocket/logic.rs +++ b/crates/rpc/src/jsonrpc/websocket/logic.rs @@ -574,8 +574,7 @@ mod tests { let timeout_result = timeout(Duration::from_millis(100), self.receiver.next()).await; match timeout_result { - Ok(Some(reception_result)) => { - let _ = dbg!(reception_result); + Ok(Some(_)) => { panic!("Unexpected message received") } Ok(None) => {