Skip to content

Commit

Permalink
transport: Fix erroneous handling of secondary connections (#149)
Browse files Browse the repository at this point in the history
Previously `TransportManager` didn't correctly handle the case where
local node's dialing connection was opened as a secondary connection,
leaving `dial_record` as `Some` which didn't get cleaned up after the
connections were closed.

This resulted in subsequent dials to fail because `dial_record` was
`Some`, which erroneously indicated that the remote was already being
dialed, even though the dial had concluded and the connection was closed
  • Loading branch information
altonen authored Jun 13, 2024
1 parent 70819df commit cb21f17
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 33 deletions.
97 changes: 64 additions & 33 deletions src/transport/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ pub(crate) mod handle;
const LOG_TARGET: &str = "litep2p::transport-manager";

/// Score for a working address.
const SCORE_DIAL_SUCCESS: i32 = 100i32;
const SCORE_CONNECT_SUCCESS: i32 = 100i32;

/// Score for a non-working address.
const SCORE_DIAL_FAILURE: i32 = -100i32;
const SCORE_CONNECT_FAILURE: i32 = -100i32;

/// TODO:
enum ConnectionEstablishedResult {
Expand Down Expand Up @@ -698,7 +698,7 @@ impl TransportManager {
PeerState::Dialing { ref mut record } => {
debug_assert_eq!(record.connection_id(), &Some(connection_id));

record.update_score(SCORE_DIAL_FAILURE);
record.update_score(SCORE_CONNECT_FAILURE);
context.addresses.insert(record.clone());

context.state = PeerState::Disconnected { dial_record: None };
Expand All @@ -711,7 +711,7 @@ impl TransportManager {
record,
dial_record: Some(mut dial_record),
} => {
dial_record.update_score(SCORE_DIAL_FAILURE);
dial_record.update_score(SCORE_CONNECT_FAILURE);
context.addresses.insert(dial_record);

context.state = PeerState::Connected {
Expand All @@ -730,7 +730,7 @@ impl TransportManager {
"dial failed for a disconnected peer",
);

dial_record.update_score(SCORE_DIAL_FAILURE);
dial_record.update_score(SCORE_CONNECT_FAILURE);
context.addresses.insert(dial_record);

Ok(())
Expand Down Expand Up @@ -914,7 +914,10 @@ impl TransportManager {
let mut peers = self.peers.write();
match peers.get_mut(&peer) {
Some(context) => match context.state {
PeerState::Connected { .. } => match context.secondary_connection {
PeerState::Connected {
ref mut dial_record,
..
} => match context.secondary_connection {
Some(_) => {
tracing::debug!(
target: LOG_TARGET,
Expand All @@ -932,29 +935,57 @@ impl TransportManager {
context.addresses.insert(AddressRecord::new(
&peer,
endpoint.address().clone(),
SCORE_DIAL_SUCCESS,
SCORE_CONNECT_SUCCESS,
None,
))
}

return Ok(ConnectionEstablishedResult::Reject);
}
None => {
tracing::debug!(
None => match dial_record.take() {
Some(record)
if record.connection_id() == &Some(endpoint.connection_id()) =>
{
tracing::debug!(
target: LOG_TARGET,
?peer,
connection_id = ?endpoint.connection_id(),
address = ?endpoint.address(),
"dialed connection opened as secondary connection",
);

context.secondary_connection = Some(AddressRecord::new(
&peer,
endpoint.address().clone(),
SCORE_CONNECT_SUCCESS,
Some(endpoint.connection_id()),
));
}
None => {
tracing::debug!(
target: LOG_TARGET,
?peer,
connection_id = ?endpoint.connection_id(),
address = ?endpoint.address(),
"secondary connection",
);

context.secondary_connection = Some(AddressRecord::new(
&peer,
endpoint.address().clone(),
SCORE_CONNECT_SUCCESS,
Some(endpoint.connection_id()),
));
}
Some(record) => tracing::warn!(
target: LOG_TARGET,
?peer,
connection_id = ?endpoint.connection_id(),
address = ?endpoint.address(),
"secondary connection",
);

context.secondary_connection = Some(AddressRecord::new(
&peer,
endpoint.address().clone(),
SCORE_DIAL_SUCCESS,
Some(endpoint.connection_id()),
));
}
dial_record = ?record,
"unknown connection opened as secondary connection, discarding",
),
},
},
PeerState::Dialing { ref record, .. } => {
match record.connection_id() == &Some(endpoint.connection_id()) {
Expand Down Expand Up @@ -986,7 +1017,7 @@ impl TransportManager {
record: AddressRecord::new(
&peer,
endpoint.address().clone(),
SCORE_DIAL_SUCCESS,
SCORE_CONNECT_SUCCESS,
Some(endpoint.connection_id()),
),
dial_record: Some(record.clone()),
Expand Down Expand Up @@ -1036,14 +1067,14 @@ impl TransportManager {

let record = match records.remove(endpoint.address()) {
Some(mut record) => {
record.update_score(SCORE_DIAL_SUCCESS);
record.update_score(SCORE_CONNECT_SUCCESS);
record.set_connection_id(endpoint.connection_id());
record
}
None => AddressRecord::new(
&peer,
endpoint.address().clone(),
SCORE_DIAL_SUCCESS,
SCORE_CONNECT_SUCCESS,
Some(endpoint.connection_id()),
),
};
Expand Down Expand Up @@ -1076,7 +1107,7 @@ impl TransportManager {
AddressRecord::new(
&peer,
endpoint.address().clone(),
SCORE_DIAL_SUCCESS,
SCORE_CONNECT_SUCCESS,
Some(endpoint.connection_id()),
),
Some(dial_record),
Expand All @@ -1086,7 +1117,7 @@ impl TransportManager {
AddressRecord::new(
&peer,
endpoint.address().clone(),
SCORE_DIAL_SUCCESS,
SCORE_CONNECT_SUCCESS,
Some(endpoint.connection_id()),
),
None,
Expand All @@ -1107,7 +1138,7 @@ impl TransportManager {
record: AddressRecord::new(
&peer,
endpoint.address().clone(),
SCORE_DIAL_SUCCESS,
SCORE_CONNECT_SUCCESS,
Some(endpoint.connection_id()),
),
dial_record: None,
Expand Down Expand Up @@ -1186,7 +1217,7 @@ impl TransportManager {
// all other address records back to `AddressStore`. and ask
// transport to negotiate the
let mut dial_record = records.remove(&address).expect("address to exist");
dial_record.update_score(SCORE_DIAL_SUCCESS);
dial_record.update_score(SCORE_CONNECT_SUCCESS);

// negotiate the connection
match self
Expand Down Expand Up @@ -1298,7 +1329,7 @@ impl TransportManager {

if transports.is_empty() {
for (_, mut record) in records {
record.update_score(SCORE_DIAL_FAILURE);
record.update_score(SCORE_CONNECT_FAILURE);
context.addresses.insert(record);
}

Expand Down Expand Up @@ -2259,7 +2290,7 @@ mod tests {
} => {
let seconary_connection = context.secondary_connection.as_ref().unwrap();
assert_eq!(seconary_connection.address(), &address2);
assert_eq!(seconary_connection.score(), SCORE_DIAL_SUCCESS);
assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
}
state => panic!("invalid state: {state:?}"),
}
Expand All @@ -2282,7 +2313,7 @@ mod tests {
} => {
let seconary_connection = peer.secondary_connection.as_ref().unwrap();
assert_eq!(seconary_connection.address(), &address2);
assert_eq!(seconary_connection.score(), SCORE_DIAL_SUCCESS);
assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
assert!(peer.addresses.contains(&address3));
}
state => panic!("invalid state: {state:?}"),
Expand Down Expand Up @@ -2365,7 +2396,7 @@ mod tests {
} => {
let seconary_connection = context.secondary_connection.as_ref().unwrap();
assert_eq!(seconary_connection.address(), &address2);
assert_eq!(seconary_connection.score(), SCORE_DIAL_SUCCESS);
assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
}
state => panic!("invalid state: {state:?}"),
}
Expand Down Expand Up @@ -2467,7 +2498,7 @@ mod tests {
} => {
let seconary_connection = context.secondary_connection.as_ref().unwrap();
assert_eq!(seconary_connection.address(), &address2);
assert_eq!(seconary_connection.score(), SCORE_DIAL_SUCCESS);
assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
}
state => panic!("invalid state: {state:?}"),
}
Expand Down Expand Up @@ -2579,7 +2610,7 @@ mod tests {
} => {
let seconary_connection = context.secondary_connection.as_ref().unwrap();
assert_eq!(seconary_connection.address(), &address2);
assert_eq!(seconary_connection.score(), SCORE_DIAL_SUCCESS);
assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
}
state => panic!("invalid state: {state:?}"),
}
Expand Down Expand Up @@ -2616,7 +2647,7 @@ mod tests {
} => {
let seconary_connection = context.secondary_connection.as_ref().unwrap();
assert_eq!(seconary_connection.address(), &address2);
assert_eq!(seconary_connection.score(), SCORE_DIAL_SUCCESS);
assert_eq!(seconary_connection.score(), SCORE_CONNECT_SUCCESS);
}
state => panic!("invalid state: {state:?}"),
}
Expand Down
148 changes: 148 additions & 0 deletions tests/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1352,3 +1352,151 @@ async fn unspecified_listen_address_websocket() {
}
}
}

#[tokio::test]
async fn simultaneous_dial_then_redial_tcp() {
simultaneous_dial_then_redial(
Transport::Tcp(TcpConfig {
reuse_port: false,
..Default::default()
}),
Transport::Tcp(TcpConfig {
reuse_port: false,
..Default::default()
}),
)
.await
}

#[tokio::test]
async fn simultaneous_dial_then_redial_websocket() {
simultaneous_dial_then_redial(
Transport::WebSocket(WebSocketConfig {
reuse_port: false,
..Default::default()
}),
Transport::WebSocket(WebSocketConfig {
reuse_port: false,
..Default::default()
}),
)
.await
}

#[tokio::test]
async fn simultaneous_dial_then_redial_quic() {
simultaneous_dial_then_redial(
Transport::Quic(Default::default()),
Transport::Quic(Default::default()),
)
.await
}

async fn simultaneous_dial_then_redial(transport1: Transport, transport2: Transport) {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let (ping_config1, _ping_event_stream1) = PingConfig::default();
let config1 = ConfigBuilder::new()
.with_keypair(Keypair::generate())
.with_libp2p_ping(ping_config1);

let config1 = match transport1 {
Transport::Tcp(config) => config1.with_tcp(config),
Transport::Quic(config) => config1.with_quic(config),
Transport::WebSocket(config) => config1.with_websocket(config),
}
.build();

let (ping_config2, _ping_event_stream2) = PingConfig::default();
let config2 = ConfigBuilder::new()
.with_keypair(Keypair::generate())
.with_libp2p_ping(ping_config2);

let config2 = match transport2 {
Transport::Tcp(config) => config2.with_tcp(config),
Transport::Quic(config) => config2.with_quic(config),
Transport::WebSocket(config) => config2.with_websocket(config),
}
.build();

let mut litep2p1 = Litep2p::new(config1).unwrap();
let mut litep2p2 = Litep2p::new(config2).unwrap();
let peer1 = *litep2p1.local_peer_id();
let peer2 = *litep2p2.local_peer_id();

litep2p1.add_known_address(peer2, litep2p2.listen_addresses().cloned());
litep2p2.add_known_address(peer1, litep2p1.listen_addresses().cloned());

let (_, _) = tokio::join!(litep2p1.dial(&peer2), litep2p2.dial(&peer1));

let mut peer1_open = false;
let mut peer2_open = false;

while !peer1_open || !peer2_open {
tokio::select! {
event = litep2p1.next_event() => match event.unwrap() {
Litep2pEvent::ConnectionEstablished { .. } => {
peer1_open = true;
}
_ => {},
},
event = litep2p2.next_event() => match event.unwrap() {
Litep2pEvent::ConnectionEstablished { .. } => {
peer2_open = true;
}
_ => {},
},
}
}

let mut peer1_close = false;
let mut peer2_close = false;

while !peer1_close || !peer2_close {
tokio::select! {
event = litep2p1.next_event() => match event.unwrap() {
Litep2pEvent::ConnectionClosed { .. } => {
peer1_close = true;
}
_ => {},
},
event = litep2p2.next_event() => match event.unwrap() {
Litep2pEvent::ConnectionClosed { .. } => {
peer2_close = true;
}
_ => {},
},
}
}

let (_, _) = tokio::join!(litep2p1.dial(&peer2), litep2p2.dial(&peer1));

let future = async move {
let mut peer1_open = false;
let mut peer2_open = false;

while !peer1_open || !peer2_open {
tokio::select! {
event = litep2p1.next_event() => match event.unwrap() {
Litep2pEvent::ConnectionEstablished { .. } => {
peer1_open = true;
}
_ => {},
},
event = litep2p2.next_event() => match event.unwrap() {
Litep2pEvent::ConnectionEstablished { .. } => {
peer2_open = true;
}
_ => {},
},
}
}
};

match tokio::time::timeout(std::time::Duration::from_secs(10), future).await {
Err(_) => panic!("failed to open notification stream"),
_ => {}
}
}

0 comments on commit cb21f17

Please sign in to comment.