Skip to content

Commit

Permalink
only allow one connection per peer
Browse files Browse the repository at this point in the history
  • Loading branch information
sistemd committed Jan 18, 2024
1 parent 9280c41 commit 104bf74
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 16 deletions.
22 changes: 18 additions & 4 deletions crates/p2p/src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ impl MainLoop {
// Connection management
// ===========================
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
peer_id,
endpoint,
num_established,
..
} => {
// Extract the IP address of the peer from his multiaddr.
let peer_ip = get_ip(endpoint.get_remote_address());
Expand All @@ -194,10 +197,21 @@ impl MainLoop {
return;
};

// If this is an incoming connection, we have to prevent the peer from
// reconnecting too quickly.
if endpoint.is_listener() {
// Different timeouts apply to direct peers and peers connecting over a relay.
// This is an incoming connection.

// Only allow one connection per peer.
if num_established.get() > 1 {
tracing::warn!(%peer_id, "Peer has more than one connection, closing");
if let Err(e) = self.disconnect(peer_id).await {
tracing::error!(%e, "Failed to disconnect peer");
}
return;
}

// Prevent the peer from reconnecting too quickly.

// Different connection timeouts apply to direct peers and peers connecting over a relay.
let is_relay = endpoint
.get_remote_address()
.iter()
Expand Down
91 changes: 82 additions & 9 deletions crates/p2p/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ struct TestPeer {

impl TestPeer {
#[must_use]
pub fn new(periodic_cfg: PeriodicTaskConfig) -> Self {
let keypair = Keypair::generate_ed25519();
pub fn new(periodic_cfg: PeriodicTaskConfig, keypair: Keypair) -> Self {
let peer_id = keypair.public().to_peer_id();
let peers: Arc<RwLock<Peers>> = Default::default();
let (client, event_receiver, main_loop) = crate::new(
Expand Down Expand Up @@ -95,7 +94,7 @@ impl TestPeer {

impl Default for TestPeer {
fn default() -> Self {
Self::new(Default::default())
Self::new(Default::default(), Keypair::generate_ed25519())
}
}

Expand Down Expand Up @@ -237,9 +236,9 @@ async fn periodic_bootstrap() {
direct_connection_timeout: Duration::from_millis(500),
relay_connection_timeout: Duration::from_millis(500),
};
let mut boot = TestPeer::new(periodic_cfg);
let mut peer1 = TestPeer::new(periodic_cfg);
let mut peer2 = TestPeer::new(periodic_cfg);
let mut boot = TestPeer::new(periodic_cfg, Keypair::generate_ed25519());
let mut peer1 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519());
let mut peer2 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519());

let mut boot_addr = boot.start_listening().await.unwrap();
boot_addr.push(Protocol::P2p(boot.peer_id));
Expand Down Expand Up @@ -296,14 +295,14 @@ async fn reconnect_too_quickly() {
bootstrap: BootstrapConfig {
period: Duration::from_millis(500),
// Bootstrapping can cause redials, so set the offset to a high value.
start_offset: Duration::from_secs(3),
start_offset: Duration::from_secs(10),
},
direct_connection_timeout: CONNECTION_TIMEOUT,
relay_connection_timeout: Duration::from_millis(500),
};

let mut peer1 = TestPeer::new(periodic_cfg);
let mut peer2 = TestPeer::new(periodic_cfg);
let mut peer1 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519());
let mut peer2 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519());

let addr2 = peer2.start_listening().await.unwrap();
tracing::info!(%peer2.peer_id, %addr2);
Expand Down Expand Up @@ -394,6 +393,80 @@ async fn reconnect_too_quickly() {
.await;
}

/// Test that if a peer attempts to reconnect too quickly, the connection is closed.
#[test_log::test(tokio::test)]
async fn duplicate_connection() {
const CONNECTION_TIMEOUT: Duration = Duration::from_millis(50);
let periodic_cfg = PeriodicTaskConfig {
bootstrap: BootstrapConfig {
period: Duration::from_millis(500),
// Bootstrapping can cause redials, so set the offset to a high value.
start_offset: Duration::from_secs(10),
},
connection_timeout: CONNECTION_TIMEOUT,
};
let keypair = Keypair::generate_ed25519();
let mut peer1 = TestPeer::new(periodic_cfg, keypair.clone());
let mut peer1_copy = TestPeer::new(periodic_cfg, keypair);
let mut peer2 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519());

let addr2 = peer2.start_listening().await.unwrap();
tracing::info!(%peer2.peer_id, %addr2);

// Open the connection.
peer1
.client
.dial(peer2.peer_id, addr2.clone())
.await
.unwrap();

wait_event(&mut peer1.event_receiver, |event| match event {
Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer2.peer_id => {
Some(())
}
_ => None,
})
.await;

wait_event(&mut peer2.event_receiver, |event| match event {
Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer1.peer_id => {
Some(())
}
_ => None,
})
.await;

// Ensure that the connection timeout has passed, so this is not the reason why the connection
// is getting closed.
tokio::time::sleep(CONNECTION_TIMEOUT).await;

// Try to open another connection using the same peer ID and IP address (in this case,
// localhost).
peer1_copy
.client
.dial(peer2.peer_id, addr2.clone())
.await
.unwrap();

wait_event(&mut peer1_copy.event_receiver, |event| match event {
Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer2.peer_id => {
Some(())
}
_ => None,
})
.await;

wait_event(&mut peer1_copy.event_receiver, |event| match event {
Event::Test(TestEvent::ConnectionClosed { remote, .. }) if remote == peer2.peer_id => {
Some(())
}
_ => None,
})
.await;

assert!(peer1_copy.connected().await.is_empty());
}

#[rstest]
#[case::server_to_client(server_to_client().await)]
#[case::client_to_server(client_to_server().await)]
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/jsonrpc/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ mod tests {
#[case::object_with_spaces("{ }")]
#[case::object_with_newlines("{ \n }")]
fn empty(#[case] s: &str) {
let raw_value = RawValue::from_string(dbg!(s).to_owned()).unwrap();
let raw_value = RawValue::from_string(s.to_owned()).unwrap();
let uut = RawParams(Some(&raw_value));

assert!(uut.is_empty());
Expand Down
3 changes: 1 addition & 2 deletions crates/rpc/src/jsonrpc/websocket/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,7 @@ mod tests {
let timeout_result = timeout(Duration::from_millis(100), self.receiver.next()).await;

match timeout_result {
Ok(Some(reception_result)) => {
let _ = dbg!(reception_result);
Ok(Some(_)) => {
panic!("Unexpected message received")
}
Ok(None) => {
Expand Down

0 comments on commit 104bf74

Please sign in to comment.