Skip to content

Commit

Permalink
Refactor QUIC new connection handler function (solana-labs#26855)
Browse files Browse the repository at this point in the history
* Refactor QUIC new connection handler function

* cleanup setup_connection

* more cleanup
  • Loading branch information
pgarg66 authored and apfitzge committed Aug 9, 2022
1 parent 88d49f8 commit ac70d3f
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 114 deletions.
298 changes: 184 additions & 114 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ fn prune_unstaked_connection_table(
fn get_connection_stake(
connection: &Connection,
staked_nodes: Arc<RwLock<StakedNodes>>,
) -> Option<(Pubkey, u64)> {
) -> Option<(Pubkey, u64, u64)> {
connection
.peer_identity()
.and_then(|der_cert_any| der_cert_any.downcast::<Vec<rustls::Certificate>>().ok())
Expand All @@ -151,10 +151,11 @@ fn get_connection_stake(
debug!("Peer public key is {:?}", pubkey);

let staked_nodes = staked_nodes.read().unwrap();
let total_stake = staked_nodes.total_stake;
staked_nodes
.pubkey_stake_map
.get(&pubkey)
.map(|stake| (pubkey, *stake))
.map(|stake| (pubkey, *stake, total_stake))
})
})
}
Expand Down Expand Up @@ -185,6 +186,125 @@ pub fn compute_max_allowed_uni_streams(
}
}

enum ConnectionHandlerError {
ConnectionAddError,
MaxStreamError,
}

struct NewConnectionHandlerParams {
packet_sender: Sender<PacketBatch>,
remote_pubkey: Option<Pubkey>,
stake: u64,
total_stake: u64,
max_connections_per_peer: usize,
stats: Arc<StreamStats>,
}

impl NewConnectionHandlerParams {
fn new_unstaked(
packet_sender: Sender<PacketBatch>,
max_connections_per_peer: usize,
stats: Arc<StreamStats>,
) -> NewConnectionHandlerParams {
NewConnectionHandlerParams {
packet_sender,
remote_pubkey: None,
stake: 0,
total_stake: 0,
max_connections_per_peer,
stats,
}
}
}

fn handle_and_cache_new_connection(
new_connection: NewConnection,
mut connection_table_l: MutexGuard<ConnectionTable>,
connection_table: Arc<Mutex<ConnectionTable>>,
params: &NewConnectionHandlerParams,
) -> Result<(), ConnectionHandlerError> {
let NewConnection {
connection,
uni_streams,
..
} = new_connection;

if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
connection_table_l.peer_type,
params.stake,
params.total_stake,
) as u64)
{
connection.set_max_concurrent_uni_streams(max_uni_streams);
debug!(
"Peer type: {:?}, stake {}, total stake {}, max streams {}",
connection_table_l.peer_type,
params.stake,
params.total_stake,
max_uni_streams.into_inner()
);

let remote_addr = connection.remote_address();

if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
remote_addr.port(),
Some(connection),
params.stake,
timing::timestamp(),
params.max_connections_per_peer,
) {
drop(connection_table_l);
tokio::spawn(handle_connection(
uni_streams,
params.packet_sender.clone(),
remote_addr,
params.remote_pubkey,
last_update,
connection_table,
stream_exit,
params.stats.clone(),
params.stake,
));
Ok(())
} else {
params
.stats
.connection_add_failed
.fetch_add(1, Ordering::Relaxed);
Err(ConnectionHandlerError::ConnectionAddError)
}
} else {
params
.stats
.connection_add_failed_invalid_stream_count
.fetch_add(1, Ordering::Relaxed);
Err(ConnectionHandlerError::MaxStreamError)
}
}

fn prune_unstaked_connections_and_add_new_connection(
new_connection: NewConnection,
mut connection_table_l: MutexGuard<ConnectionTable>,
connection_table: Arc<Mutex<ConnectionTable>>,
max_connections: usize,
params: &NewConnectionHandlerParams,
) -> Result<(), ConnectionHandlerError> {
let stats = params.stats.clone();
if max_connections > 0 {
prune_unstaked_connection_table(&mut connection_table_l, max_connections, stats);
handle_and_cache_new_connection(
new_connection,
connection_table_l,
connection_table,
params,
)
} else {
new_connection.connection.close(0u32.into(), &[0u8]);
Err(ConnectionHandlerError::ConnectionAddError)
}
}

async fn setup_connection(
connecting: Connecting,
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
Expand All @@ -205,126 +325,76 @@ async fn setup_connection(
if let Ok(new_connection) = connecting_result {
stats.total_connections.fetch_add(1, Ordering::Relaxed);
stats.total_new_connections.fetch_add(1, Ordering::Relaxed);
let NewConnection {
connection,
uni_streams,
..
} = new_connection;

let remote_addr = connection.remote_address();
let mut remote_pubkey = None;

let table_and_stake = {
let (some_pubkey, stake) = get_connection_stake(&connection, staked_nodes.clone())
.map_or((None, 0), |(pubkey, stake)| (Some(pubkey), stake));
if stake > 0 {
remote_pubkey = some_pubkey;
let mut connection_table_l = staked_connection_table.lock().unwrap();
if connection_table_l.total_size >= max_staked_connections {
let num_pruned = connection_table_l.prune_random(stake);
if num_pruned == 0 {
if max_unstaked_connections > 0 {
// If we couldn't prune a connection in the staked connection table, let's
// put this connection in the unstaked connection table. If needed, prune a
// connection from the unstaked connection table.
connection_table_l = unstaked_connection_table.lock().unwrap();
prune_unstaked_connection_table(
&mut connection_table_l,
max_unstaked_connections,
stats.clone(),
);
Some((connection_table_l, stake))
} else {
stats
.connection_add_failed_on_pruning
.fetch_add(1, Ordering::Relaxed);
None
}
} else {
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
Some((connection_table_l, stake))
}
} else {
Some((connection_table_l, stake))
}
} else if max_unstaked_connections > 0 {
let mut connection_table_l = unstaked_connection_table.lock().unwrap();
prune_unstaked_connection_table(
&mut connection_table_l,
max_unstaked_connections,
stats.clone(),
);
Some((connection_table_l, 0))
} else {
None
}
};

if let Some((mut connection_table_l, stake)) = table_and_stake {
let table_type = connection_table_l.peer_type;
let total_stake = staked_nodes.read().map_or(0, |stakes| stakes.total_stake);
drop(staked_nodes);

let max_uni_streams =
VarInt::from_u64(
compute_max_allowed_uni_streams(table_type, stake, total_stake) as u64,
);

debug!(
"Peer type: {:?}, stake {}, total stake {}, max streams {}",
table_type,
stake,
total_stake,
max_uni_streams.unwrap().into_inner()
);

if let Ok(max_uni_streams) = max_uni_streams {
connection.set_max_concurrent_uni_streams(max_uni_streams);
if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
ConnectionTableKey::new(remote_addr.ip(), remote_pubkey),
remote_addr.port(),
Some(connection),
let params = get_connection_stake(&new_connection.connection, staked_nodes.clone())
.map_or(
NewConnectionHandlerParams::new_unstaked(
packet_sender.clone(),
max_connections_per_peer,
stats.clone(),
),
|(pubkey, stake, total_stake)| NewConnectionHandlerParams {
packet_sender,
remote_pubkey: Some(pubkey),
stake,
timing::timestamp(),
total_stake,
max_connections_per_peer,
stats: stats.clone(),
},
);

if params.stake > 0 {
let mut connection_table_l = staked_connection_table.lock().unwrap();
if connection_table_l.total_size >= max_staked_connections {
let num_pruned = connection_table_l.prune_random(params.stake);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
}

if connection_table_l.total_size < max_staked_connections {
if let Ok(()) = handle_and_cache_new_connection(
new_connection,
connection_table_l,
staked_connection_table.clone(),
&params,
) {
drop(connection_table_l);
let stats = stats.clone();
let connection_table = match table_type {
ConnectionPeerType::Unstaked => {
stats
.connection_added_from_unstaked_peer
.fetch_add(1, Ordering::Relaxed);
unstaked_connection_table.clone()
}
ConnectionPeerType::Staked => {
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
staked_connection_table.clone()
}
};
tokio::spawn(handle_connection(
uni_streams,
packet_sender,
remote_addr,
remote_pubkey,
last_update,
connection_table,
stream_exit,
stats,
stake,
));
} else {
stats.connection_add_failed.fetch_add(1, Ordering::Relaxed);
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
}
} else {
stats
.connection_add_failed_invalid_stream_count
.fetch_add(1, Ordering::Relaxed);
// If we couldn't prune a connection in the staked connection table, let's
// put this connection in the unstaked connection table. If needed, prune a
// connection from the unstaked connection table.
if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
new_connection,
unstaked_connection_table.lock().unwrap(),
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
) {
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
} else {
stats
.connection_add_failed_on_pruning
.fetch_add(1, Ordering::Relaxed);
stats
.connection_add_failed_staked_node
.fetch_add(1, Ordering::Relaxed);
}
}
} else if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
new_connection,
unstaked_connection_table.lock().unwrap(),
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
) {
stats
.connection_added_from_unstaked_peer
.fetch_add(1, Ordering::Relaxed);
} else {
connection.close(0u32.into(), &[0u8]);
stats
.connection_add_failed_unstaked_node
.fetch_add(1, Ordering::Relaxed);
Expand Down
7 changes: 7 additions & 0 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ pub struct StreamStats {
pub(crate) connection_added_from_unstaked_peer: AtomicUsize,
pub(crate) connection_add_failed: AtomicUsize,
pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize,
pub(crate) connection_add_failed_staked_node: AtomicUsize,
pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
pub(crate) connection_add_failed_on_pruning: AtomicUsize,
pub(crate) connection_setup_timeout: AtomicUsize,
Expand Down Expand Up @@ -193,6 +194,12 @@ impl StreamStats {
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_add_failed_staked_node",
self.connection_add_failed_staked_node
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_add_failed_unstaked_node",
self.connection_add_failed_unstaked_node
Expand Down

0 comments on commit ac70d3f

Please sign in to comment.