Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disallow duplicate connections between peers #1682

Merged
merged 4 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions crates/p2p/src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ impl MainLoop {
// Connection management
// ===========================
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
peer_id,
endpoint,
num_established,
..
} => {
// Extract the IP address of the peer from his multiaddr.
let peer_ip = get_ip(endpoint.get_remote_address());
Expand All @@ -194,10 +197,21 @@ impl MainLoop {
return;
};

// If this is an incoming connection, we have to prevent the peer from
// reconnecting too quickly.
if endpoint.is_listener() {
// Different timeouts apply to direct peers and peers connecting over a relay.
// This is an incoming connection.

// Only allow one connection per peer.
if num_established.get() > 1 {
tracing::debug!(%peer_id, "Peer has more than one connection, closing");
if let Err(e) = self.disconnect(peer_id).await {
tracing::debug!(%e, "Failed to disconnect peer");
}
return;
}

// Prevent the peer from reconnecting too quickly.

// Different connection timeouts apply to direct peers and peers connecting over a relay.
Mirko-von-Leipzig marked this conversation as resolved.
Show resolved Hide resolved
let is_relay = endpoint
.get_remote_address()
.iter()
Expand All @@ -213,7 +227,7 @@ impl MainLoop {
if recent_peers.contains(&peer_ip) {
tracing::debug!(%peer_id, "Peer attempted to reconnect too quickly, closing");
if let Err(e) = self.disconnect(peer_id).await {
tracing::error!(%e, "Failed to disconnect peer");
tracing::debug!(%e, "Failed to disconnect peer");
}
return;
} else {
Expand Down
93 changes: 84 additions & 9 deletions crates/p2p/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ struct TestPeer {

impl TestPeer {
#[must_use]
pub fn new(periodic_cfg: PeriodicTaskConfig) -> Self {
let keypair = Keypair::generate_ed25519();
pub fn new(periodic_cfg: PeriodicTaskConfig, keypair: Keypair) -> Self {
let peer_id = keypair.public().to_peer_id();
let peers: Arc<RwLock<Peers>> = Default::default();
let (client, event_receiver, main_loop) = crate::new(
Expand Down Expand Up @@ -95,7 +94,7 @@ impl TestPeer {

impl Default for TestPeer {
fn default() -> Self {
Self::new(Default::default())
Self::new(Default::default(), Keypair::generate_ed25519())
}
}

Expand Down Expand Up @@ -237,9 +236,9 @@ async fn periodic_bootstrap() {
direct_connection_timeout: Duration::from_millis(500),
relay_connection_timeout: Duration::from_millis(500),
};
let mut boot = TestPeer::new(periodic_cfg);
let mut peer1 = TestPeer::new(periodic_cfg);
let mut peer2 = TestPeer::new(periodic_cfg);
let mut boot = TestPeer::new(periodic_cfg, Keypair::generate_ed25519());
let mut peer1 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519());
let mut peer2 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519());

let mut boot_addr = boot.start_listening().await.unwrap();
boot_addr.push(Protocol::P2p(boot.peer_id));
Expand Down Expand Up @@ -296,14 +295,14 @@ async fn reconnect_too_quickly() {
bootstrap: BootstrapConfig {
period: Duration::from_millis(500),
// Bootstrapping can cause redials, so set the offset to a high value.
start_offset: Duration::from_secs(3),
start_offset: Duration::from_secs(10),
},
direct_connection_timeout: CONNECTION_TIMEOUT,
relay_connection_timeout: Duration::from_millis(500),
};

let mut peer1 = TestPeer::new(periodic_cfg);
let mut peer2 = TestPeer::new(periodic_cfg);
let mut peer1 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519());
let mut peer2 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519());

let addr2 = peer2.start_listening().await.unwrap();
tracing::info!(%peer2.peer_id, %addr2);
Expand Down Expand Up @@ -394,6 +393,82 @@ async fn reconnect_too_quickly() {
.await;
}

/// Test that each peer accepts at most one connection from any other peer, and duplicate
/// connections are closed.
#[test_log::test(tokio::test)]
async fn duplicate_connection() {
const CONNECTION_TIMEOUT: Duration = Duration::from_millis(50);
let periodic_cfg = PeriodicTaskConfig {
bootstrap: BootstrapConfig {
period: Duration::from_millis(500),
// Bootstrapping can cause redials, so set the offset to a high value.
start_offset: Duration::from_secs(10),
},
direct_connection_timeout: CONNECTION_TIMEOUT,
relay_connection_timeout: Duration::from_millis(500),
};
let keypair = Keypair::generate_ed25519();
let mut peer1 = TestPeer::new(periodic_cfg, keypair.clone());
let mut peer1_copy = TestPeer::new(periodic_cfg, keypair);
let mut peer2 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519());

let addr2 = peer2.start_listening().await.unwrap();
tracing::info!(%peer2.peer_id, %addr2);

// Open the connection.
peer1
.client
.dial(peer2.peer_id, addr2.clone())
.await
.unwrap();

wait_for_event(&mut peer1.event_receiver, |event| match event {
Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer2.peer_id => {
Some(())
}
_ => None,
})
.await;

wait_for_event(&mut peer2.event_receiver, |event| match event {
Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer1.peer_id => {
Some(())
}
_ => None,
})
.await;

// Ensure that the connection timeout has passed, so this is not the reason why the connection
// is getting closed.
tokio::time::sleep(CONNECTION_TIMEOUT).await;

// Try to open another connection using the same peer ID and IP address (in this case,
// localhost).
peer1_copy
.client
.dial(peer2.peer_id, addr2.clone())
.await
.unwrap();

wait_for_event(&mut peer1_copy.event_receiver, |event| match event {
Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer2.peer_id => {
Some(())
}
_ => None,
})
.await;

wait_for_event(&mut peer1_copy.event_receiver, |event| match event {
Event::Test(TestEvent::ConnectionClosed { remote, .. }) if remote == peer2.peer_id => {
Some(())
}
_ => None,
})
.await;

assert!(peer1_copy.connected().await.is_empty());
}

#[rstest]
#[case::server_to_client(server_to_client().await)]
#[case::client_to_server(client_to_server().await)]
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/jsonrpc/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ mod tests {
#[case::object_with_spaces("{ }")]
#[case::object_with_newlines("{ \n }")]
fn empty(#[case] s: &str) {
let raw_value = RawValue::from_string(dbg!(s).to_owned()).unwrap();
let raw_value = RawValue::from_string(s.to_owned()).unwrap();
let uut = RawParams(Some(&raw_value));

assert!(uut.is_empty());
Expand Down
3 changes: 1 addition & 2 deletions crates/rpc/src/jsonrpc/websocket/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,7 @@ mod tests {
let timeout_result = timeout(Duration::from_millis(100), self.receiver.next()).await;

match timeout_result {
Ok(Some(reception_result)) => {
let _ = dbg!(reception_result);
Ok(Some(_)) => {
panic!("Unexpected message received")
}
Ok(None) => {
Expand Down