diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 0cec9a7d..a7248811 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -70,10 +70,10 @@ pub(crate) mod handle; const LOG_TARGET: &str = "litep2p::transport-manager"; /// Score for a working address. -const SCORE_DIAL_SUCCESS: i32 = 100i32; +const SCORE_CONNECT_SUCCESS: i32 = 100i32; /// Score for a non-working address. -const SCORE_DIAL_FAILURE: i32 = -100i32; +const SCORE_CONNECT_FAILURE: i32 = -100i32; /// TODO: enum ConnectionEstablishedResult { @@ -698,7 +698,7 @@ impl TransportManager { PeerState::Dialing { ref mut record } => { debug_assert_eq!(record.connection_id(), &Some(connection_id)); - record.update_score(SCORE_DIAL_FAILURE); + record.update_score(SCORE_CONNECT_FAILURE); context.addresses.insert(record.clone()); context.state = PeerState::Disconnected { dial_record: None }; @@ -711,7 +711,7 @@ impl TransportManager { record, dial_record: Some(mut dial_record), } => { - dial_record.update_score(SCORE_DIAL_FAILURE); + dial_record.update_score(SCORE_CONNECT_FAILURE); context.addresses.insert(dial_record); context.state = PeerState::Connected { @@ -730,7 +730,7 @@ impl TransportManager { "dial failed for a disconnected peer", ); - dial_record.update_score(SCORE_DIAL_FAILURE); + dial_record.update_score(SCORE_CONNECT_FAILURE); context.addresses.insert(dial_record); Ok(()) @@ -914,7 +914,10 @@ impl TransportManager { let mut peers = self.peers.write(); match peers.get_mut(&peer) { Some(context) => match context.state { - PeerState::Connected { .. } => match context.secondary_connection { + PeerState::Connected { + ref mut dial_record, + .. + } => match context.secondary_connection { Some(_) => { tracing::debug!( target: LOG_TARGET, @@ -932,29 +935,57 @@ impl TransportManager { context.addresses.insert(AddressRecord::new( &peer, endpoint.address().clone(), - SCORE_DIAL_SUCCESS, + SCORE_CONNECT_SUCCESS, None, )) } return Ok(ConnectionEstablishedResult::Reject); } - None => { - tracing::debug!( + None => match dial_record.take() { + Some(record) + if record.connection_id() == &Some(endpoint.connection_id()) => + { + tracing::debug!( + target: LOG_TARGET, + ?peer, + connection_id = ?endpoint.connection_id(), + address = ?endpoint.address(), + "dialed connection opened as secondary connection", + ); + + context.secondary_connection = Some(AddressRecord::new( + &peer, + endpoint.address().clone(), + SCORE_CONNECT_SUCCESS, + Some(endpoint.connection_id()), + )); + } + None => { + tracing::debug!( + target: LOG_TARGET, + ?peer, + connection_id = ?endpoint.connection_id(), + address = ?endpoint.address(), + "secondary connection", + ); + + context.secondary_connection = Some(AddressRecord::new( + &peer, + endpoint.address().clone(), + SCORE_CONNECT_SUCCESS, + Some(endpoint.connection_id()), + )); + } + Some(record) => tracing::warn!( target: LOG_TARGET, ?peer, connection_id = ?endpoint.connection_id(), address = ?endpoint.address(), - "secondary connection", - ); - - context.secondary_connection = Some(AddressRecord::new( - &peer, - endpoint.address().clone(), - SCORE_DIAL_SUCCESS, - Some(endpoint.connection_id()), - )); - } + dial_record = ?record, + "unknown connection opened as secondary connection, discarding", + ), + }, }, PeerState::Dialing { ref record, .. } => { match record.connection_id() == &Some(endpoint.connection_id()) { @@ -986,7 +1017,7 @@ impl TransportManager { record: AddressRecord::new( &peer, endpoint.address().clone(), - SCORE_DIAL_SUCCESS, + SCORE_CONNECT_SUCCESS, Some(endpoint.connection_id()), ), dial_record: Some(record.clone()), @@ -1036,14 +1067,14 @@ impl TransportManager { let record = match records.remove(endpoint.address()) { Some(mut record) => { - record.update_score(SCORE_DIAL_SUCCESS); + record.update_score(SCORE_CONNECT_SUCCESS); record.set_connection_id(endpoint.connection_id()); record } None => AddressRecord::new( &peer, endpoint.address().clone(), - SCORE_DIAL_SUCCESS, + SCORE_CONNECT_SUCCESS, Some(endpoint.connection_id()), ), }; @@ -1076,7 +1107,7 @@ impl TransportManager { AddressRecord::new( &peer, endpoint.address().clone(), - SCORE_DIAL_SUCCESS, + SCORE_CONNECT_SUCCESS, Some(endpoint.connection_id()), ), Some(dial_record), @@ -1086,7 +1117,7 @@ impl TransportManager { AddressRecord::new( &peer, endpoint.address().clone(), - SCORE_DIAL_SUCCESS, + SCORE_CONNECT_SUCCESS, Some(endpoint.connection_id()), ), None, @@ -1107,7 +1138,7 @@ impl TransportManager { record: AddressRecord::new( &peer, endpoint.address().clone(), - SCORE_DIAL_SUCCESS, + SCORE_CONNECT_SUCCESS, Some(endpoint.connection_id()), ), dial_record: None, @@ -1186,7 +1217,7 @@ impl TransportManager { // all other address records back to `AddressStore`. and ask // transport to negotiate the let mut dial_record = records.remove(&address).expect("address to exist"); - dial_record.update_score(SCORE_DIAL_SUCCESS); + dial_record.update_score(SCORE_CONNECT_SUCCESS); // negotiate the connection match self @@ -1298,7 +1329,7 @@ impl TransportManager { if transports.is_empty() { for (_, mut record) in records { - record.update_score(SCORE_DIAL_FAILURE); + record.update_score(SCORE_CONNECT_FAILURE); context.addresses.insert(record); } @@ -2259,7 +2290,7 @@ mod tests { } => { let seconary_connection = context.secondary_connection.as_ref().unwrap(); assert_eq!(seconary_connection.address(), &address2); - assert_eq!(seconary_connection.score(), SCORE_DIAL_SUCCESS); + assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS); } state => panic!("invalid state: {state:?}"), } @@ -2282,7 +2313,7 @@ mod tests { } => { let seconary_connection = peer.secondary_connection.as_ref().unwrap(); assert_eq!(seconary_connection.address(), &address2); - assert_eq!(seconary_connection.score(), SCORE_DIAL_SUCCESS); + assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS); assert!(peer.addresses.contains(&address3)); } state => panic!("invalid state: {state:?}"), @@ -2365,7 +2396,7 @@ mod tests { } => { let seconary_connection = context.secondary_connection.as_ref().unwrap(); assert_eq!(seconary_connection.address(), &address2); - assert_eq!(seconary_connection.score(), SCORE_DIAL_SUCCESS); + assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS); } state => panic!("invalid state: {state:?}"), } @@ -2467,7 +2498,7 @@ mod tests { } => { let seconary_connection = context.secondary_connection.as_ref().unwrap(); assert_eq!(seconary_connection.address(), &address2); - assert_eq!(seconary_connection.score(), SCORE_DIAL_SUCCESS); + assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS); } state => panic!("invalid state: {state:?}"), } @@ -2579,7 +2610,7 @@ mod tests { } => { let seconary_connection = context.secondary_connection.as_ref().unwrap(); assert_eq!(seconary_connection.address(), &address2); - assert_eq!(seconary_connection.score(), SCORE_DIAL_SUCCESS); + assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS); } state => panic!("invalid state: {state:?}"), } @@ -2616,7 +2647,7 @@ mod tests { } => { let seconary_connection = context.secondary_connection.as_ref().unwrap(); assert_eq!(seconary_connection.address(), &address2); - assert_eq!(seconary_connection.score(), SCORE_DIAL_SUCCESS); + assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS); } state => panic!("invalid state: {state:?}"), } diff --git a/tests/connection/mod.rs b/tests/connection/mod.rs index cc63334f..33524cae 100644 --- a/tests/connection/mod.rs +++ b/tests/connection/mod.rs @@ -1352,3 +1352,151 @@ async fn unspecified_listen_address_websocket() { } } } + +#[tokio::test] +async fn simultaneous_dial_then_redial_tcp() { + simultaneous_dial_then_redial( + Transport::Tcp(TcpConfig { + reuse_port: false, + ..Default::default() + }), + Transport::Tcp(TcpConfig { + reuse_port: false, + ..Default::default() + }), + ) + .await +} + +#[tokio::test] +async fn simultaneous_dial_then_redial_websocket() { + simultaneous_dial_then_redial( + Transport::WebSocket(WebSocketConfig { + reuse_port: false, + ..Default::default() + }), + Transport::WebSocket(WebSocketConfig { + reuse_port: false, + ..Default::default() + }), + ) + .await +} + +#[tokio::test] +async fn simultaneous_dial_then_redial_quic() { + simultaneous_dial_then_redial( + Transport::Quic(Default::default()), + Transport::Quic(Default::default()), + ) + .await +} + +async fn simultaneous_dial_then_redial(transport1: Transport, transport2: Transport) { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let (ping_config1, _ping_event_stream1) = PingConfig::default(); + let config1 = ConfigBuilder::new() + .with_keypair(Keypair::generate()) + .with_libp2p_ping(ping_config1); + + let config1 = match transport1 { + Transport::Tcp(config) => config1.with_tcp(config), + Transport::Quic(config) => config1.with_quic(config), + Transport::WebSocket(config) => config1.with_websocket(config), + } + .build(); + + let (ping_config2, _ping_event_stream2) = PingConfig::default(); + let config2 = ConfigBuilder::new() + .with_keypair(Keypair::generate()) + .with_libp2p_ping(ping_config2); + + let config2 = match transport2 { + Transport::Tcp(config) => config2.with_tcp(config), + Transport::Quic(config) => config2.with_quic(config), + Transport::WebSocket(config) => config2.with_websocket(config), + } + .build(); + + let mut litep2p1 = Litep2p::new(config1).unwrap(); + let mut litep2p2 = Litep2p::new(config2).unwrap(); + let peer1 = *litep2p1.local_peer_id(); + let peer2 = *litep2p2.local_peer_id(); + + litep2p1.add_known_address(peer2, litep2p2.listen_addresses().cloned()); + litep2p2.add_known_address(peer1, litep2p1.listen_addresses().cloned()); + + let (_, _) = tokio::join!(litep2p1.dial(&peer2), litep2p2.dial(&peer1)); + + let mut peer1_open = false; + let mut peer2_open = false; + + while !peer1_open || !peer2_open { + tokio::select! { + event = litep2p1.next_event() => match event.unwrap() { + Litep2pEvent::ConnectionEstablished { .. } => { + peer1_open = true; + } + _ => {}, + }, + event = litep2p2.next_event() => match event.unwrap() { + Litep2pEvent::ConnectionEstablished { .. } => { + peer2_open = true; + } + _ => {}, + }, + } + } + + let mut peer1_close = false; + let mut peer2_close = false; + + while !peer1_close || !peer2_close { + tokio::select! { + event = litep2p1.next_event() => match event.unwrap() { + Litep2pEvent::ConnectionClosed { .. } => { + peer1_close = true; + } + _ => {}, + }, + event = litep2p2.next_event() => match event.unwrap() { + Litep2pEvent::ConnectionClosed { .. } => { + peer2_close = true; + } + _ => {}, + }, + } + } + + let (_, _) = tokio::join!(litep2p1.dial(&peer2), litep2p2.dial(&peer1)); + + let future = async move { + let mut peer1_open = false; + let mut peer2_open = false; + + while !peer1_open || !peer2_open { + tokio::select! { + event = litep2p1.next_event() => match event.unwrap() { + Litep2pEvent::ConnectionEstablished { .. } => { + peer1_open = true; + } + _ => {}, + }, + event = litep2p2.next_event() => match event.unwrap() { + Litep2pEvent::ConnectionEstablished { .. } => { + peer2_open = true; + } + _ => {}, + }, + } + } + }; + + match tokio::time::timeout(std::time::Duration::from_secs(10), future).await { + Err(_) => panic!("failed to open notification stream"), + _ => {} + } +}