Skip to content

Commit

Permalink
quic: Fix memory leak
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv committed Oct 30, 2024
1 parent 2bec26a commit 97bc506
Showing 1 changed file with 46 additions and 14 deletions.
60 changes: 46 additions & 14 deletions src/transport/quic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use multiaddr::{Multiaddr, Protocol};
use quinn::{ClientConfig, Connecting, Connection, Endpoint, IdleTimeout};

use std::{
collections::{HashMap, HashSet},
collections::HashMap,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -120,9 +120,9 @@ pub(crate) struct QuicTransport {
/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
opened_raw: HashMap<ConnectionId, (NegotiatedConnection, Multiaddr)>,

/// Canceled raw connections.
canceled: HashSet<ConnectionId>,

/// Cancel raw connections futures.
///
/// This is cancelling `Self::pending_raw_connections`.
cancel_futures: HashMap<ConnectionId, AbortHandle>,
}

Expand Down Expand Up @@ -235,7 +235,6 @@ impl TransportBuilder for QuicTransport {
context,
config,
listener,
canceled: HashSet::new(),
opened_raw: HashMap::new(),
pending_open: HashMap::new(),
pending_dials: HashMap::new(),
Expand Down Expand Up @@ -477,8 +476,11 @@ impl Transport for QuicTransport {

/// Cancel opening connections.
fn cancel(&mut self, connection_id: ConnectionId) {
self.canceled.insert(connection_id);
self.cancel_futures.remove(&connection_id).map(|handle| handle.abort());
// Cancel the future if it exists.
// State clean-up happens inside the `poll_next`.
if let Some(handle) = self.cancel_futures.get(&connection_id) {
handle.abort();
}
}
}

Expand Down Expand Up @@ -510,27 +512,57 @@ impl Stream for QuicTransport {
connection_id,
address,
stream,
} =>
if !self.canceled.remove(&connection_id) {
} => {
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
?address,
"raw connection without a cancel handle",
);
continue;
};

if !handle.is_aborted() {
self.opened_raw.insert(connection_id, (stream, address.clone()));

return Poll::Ready(Some(TransportEvent::ConnectionOpened {
connection_id,
address,
}));
},
}
}

RawConnectionResult::Failed {
connection_id,
errors,
} =>
if !self.canceled.remove(&connection_id) {
} => {
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
?errors,
"raw connection without a cancel handle",
);
continue;
};

if !handle.is_aborted() {
return Poll::Ready(Some(TransportEvent::OpenFailure {
connection_id,
errors,
}));
},
}
}

RawConnectionResult::Canceled { connection_id } => {
self.canceled.remove(&connection_id);
if self.cancel_futures.remove(&connection_id).is_none() {
tracing::warn!(
target: LOG_TARGET,
?connection_id,
"raw cancelled connection without a cancel handle",
);
}
}
}
}
Expand Down

0 comments on commit 97bc506

Please sign in to comment.