Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Close QUIC connection before dropping the entry (#26269)
Browse files Browse the repository at this point in the history
  • Loading branch information
pgarg66 authored Jun 28, 2022
1 parent fa77cc5 commit e8ed7c1
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use {
futures_util::stream::StreamExt,
percentage::Percentage,
quinn::{
Connecting, Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection, VarInt,
Connecting, Connection, Endpoint, EndpointConfig, Incoming, IncomingUniStreams,
NewConnection, VarInt,
},
solana_perf::packet::PacketBatch,
solana_sdk::{
Expand Down Expand Up @@ -177,6 +178,7 @@ async fn setup_connection(
if stake != 0 || max_unstaked_connections > 0 {
if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
&remote_addr,
Some(connection),
timing::timestamp(),
max_connections_per_ip,
) {
Expand Down Expand Up @@ -375,14 +377,21 @@ struct ConnectionEntry {
exit: Arc<AtomicBool>,
last_update: Arc<AtomicU64>,
port: u16,
connection: Option<Connection>,
}

impl ConnectionEntry {
fn new(exit: Arc<AtomicBool>, last_update: Arc<AtomicU64>, port: u16) -> Self {
fn new(
exit: Arc<AtomicBool>,
last_update: Arc<AtomicU64>,
port: u16,
connection: Option<Connection>,
) -> Self {
Self {
exit,
last_update,
port,
connection,
}
}

Expand All @@ -393,6 +402,9 @@ impl ConnectionEntry {

impl Drop for ConnectionEntry {
fn drop(&mut self) {
if let Some(conn) = self.connection.take() {
conn.close(0u32.into(), &[0u8]);
}
self.exit.store(true, Ordering::Relaxed);
}
}
Expand Down Expand Up @@ -438,6 +450,7 @@ impl ConnectionTable {
fn try_add_connection(
&mut self,
addr: &SocketAddr,
connection: Option<Connection>,
last_update: u64,
max_connections_per_ip: usize,
) -> Option<(Arc<AtomicU64>, Arc<AtomicBool>)> {
Expand All @@ -454,6 +467,7 @@ impl ConnectionTable {
exit.clone(),
last_update.clone(),
addr.port(),
connection,
));
self.total_size += 1;
Some((last_update, exit))
Expand Down Expand Up @@ -843,12 +857,12 @@ pub mod test {
.collect();
for (i, socket) in sockets.iter().enumerate() {
table
.try_add_connection(socket, i as u64, max_connections_per_ip)
.try_add_connection(socket, None, i as u64, max_connections_per_ip)
.unwrap();
}
num_entries += 1;
table
.try_add_connection(&sockets[0], 5, max_connections_per_ip)
.try_add_connection(&sockets[0], None, 5, max_connections_per_ip)
.unwrap();

let new_size = 3;
Expand Down Expand Up @@ -880,11 +894,11 @@ pub mod test {
.collect();
for (i, socket) in sockets.iter().enumerate() {
table
.try_add_connection(socket, (i * 2) as u64, max_connections_per_ip)
.try_add_connection(socket, None, (i * 2) as u64, max_connections_per_ip)
.unwrap();

table
.try_add_connection(socket, (i * 2 + 1) as u64, max_connections_per_ip)
.try_add_connection(socket, None, (i * 2 + 1) as u64, max_connections_per_ip)
.unwrap();
}

Expand All @@ -893,6 +907,7 @@ pub mod test {
table
.try_add_connection(
&single_connection_addr,
None,
(num_ips * 2) as u64,
max_connections_per_ip,
)
Expand Down

0 comments on commit e8ed7c1

Please sign in to comment.