Skip to content

Commit

Permalink
Fix transaction chunking on QUIC batch send (solana-labs#26642)
Browse files Browse the repository at this point in the history
* Fix chunking of transaction at batch transmit via QUIC

* clippy fixes
  • Loading branch information
pgarg66 authored and lijunwangs committed Sep 13, 2022
1 parent 657a1a5 commit 5e16953
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 18 deletions.
47 changes: 44 additions & 3 deletions client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,21 @@ impl QuicClient {
Ok(())
}

fn compute_chunk_length(num_buffers_to_chunk: usize, num_chunks: usize) -> usize {
// The function is equivalent to checked div_ceil()
// Also, if num_chunks == 0 || num_buffers_per_chunk == 0, return 1
num_buffers_to_chunk
.checked_div(num_chunks)
.map_or(1, |value| {
if num_buffers_to_chunk.checked_rem(num_chunks).unwrap_or(0) != 0 {
value.saturating_add(1)
} else {
value
}
})
.max(1)
}

pub async fn send_batch<T>(
&self,
buffers: &[T],
Expand Down Expand Up @@ -470,9 +485,9 @@ impl QuicClient {
// by just getting a reference to the NewConnection once
let connection_ref: &NewConnection = &connection;

let chunks = buffers[1..buffers.len()]
.iter()
.chunks(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS);
let chunk_len =
Self::compute_chunk_length(buffers.len() - 1, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS);
let chunks = buffers[1..buffers.len()].iter().chunks(chunk_len);

let futures: Vec<_> = chunks
.into_iter()
Expand Down Expand Up @@ -574,3 +589,29 @@ impl TpuConnection for QuicTpuConnection {
Ok(())
}
}

#[cfg(test)]
mod tests {
use crate::nonblocking::quic_client::QuicClient;

#[test]
fn test_transaction_batch_chunking() {
assert_eq!(QuicClient::compute_chunk_length(0, 0), 1);
assert_eq!(QuicClient::compute_chunk_length(10, 0), 1);
assert_eq!(QuicClient::compute_chunk_length(0, 10), 1);
assert_eq!(QuicClient::compute_chunk_length(usize::MAX, usize::MAX), 1);
assert_eq!(QuicClient::compute_chunk_length(10, usize::MAX), 1);
assert!(QuicClient::compute_chunk_length(usize::MAX, 10) == (usize::MAX / 10) + 1);
assert_eq!(QuicClient::compute_chunk_length(10, 1), 10);
assert_eq!(QuicClient::compute_chunk_length(10, 2), 5);
assert_eq!(QuicClient::compute_chunk_length(10, 3), 4);
assert_eq!(QuicClient::compute_chunk_length(10, 4), 3);
assert_eq!(QuicClient::compute_chunk_length(10, 5), 2);
assert_eq!(QuicClient::compute_chunk_length(10, 6), 2);
assert_eq!(QuicClient::compute_chunk_length(10, 7), 2);
assert_eq!(QuicClient::compute_chunk_length(10, 8), 2);
assert_eq!(QuicClient::compute_chunk_length(10, 9), 2);
assert_eq!(QuicClient::compute_chunk_length(10, 10), 1);
assert_eq!(QuicClient::compute_chunk_length(10, 11), 1);
}
}
1 change: 1 addition & 0 deletions sdk/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub const QUIC_PORT_OFFSET: u16 = 6;
// that seems to maximize TPS on GCE (higher values don't seem to
// give significant improvement or seem to impact stability)
pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128;
pub const QUIC_MIN_STAKED_CONCURRENT_STREAMS: usize = 128;

pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000;
pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000;
Expand Down
120 changes: 105 additions & 15 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use {
solana_sdk::{
packet::{Packet, PACKET_DATA_SIZE},
pubkey::Pubkey,
quic::{QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
quic::{
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
QUIC_MIN_STAKED_CONCURRENT_STREAMS,
},
signature::Keypair,
timing,
},
Expand Down Expand Up @@ -156,6 +159,34 @@ fn get_connection_stake(
})
}

fn compute_max_allowed_uni_streams(
peer_type: ConnectionPeerType,
peer_stake: u64,
staked_nodes: Arc<RwLock<StakedNodes>>,
) -> usize {
if peer_stake == 0 {
// Treat stake = 0 as unstaked
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
} else {
match peer_type {
ConnectionPeerType::Staked => {
let staked_nodes = staked_nodes.read().unwrap();

// No checked math for f64 type. So let's explicitly check for 0 here
if staked_nodes.total_stake == 0 {
QUIC_MIN_STAKED_CONCURRENT_STREAMS
} else {
(((peer_stake as f64 / staked_nodes.total_stake as f64)
* QUIC_TOTAL_STAKED_CONCURRENT_STREAMS as f64)
as usize)
.max(QUIC_MIN_STAKED_CONCURRENT_STREAMS)
}
}
_ => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
}
}
}

async fn setup_connection(
connecting: Connecting,
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
Expand Down Expand Up @@ -233,19 +264,19 @@ async fn setup_connection(

if let Some((mut connection_table_l, stake)) = table_and_stake {
let table_type = connection_table_l.peer_type;
let max_uni_streams = match table_type {
ConnectionPeerType::Unstaked => {
VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64)
}
ConnectionPeerType::Staked => {
let staked_nodes = staked_nodes.read().unwrap();
VarInt::from_u64(
((stake as f64 / staked_nodes.total_stake as f64)
* QUIC_TOTAL_STAKED_CONCURRENT_STREAMS)
as u64,
)
}
};
let max_uni_streams = VarInt::from_u64(compute_max_allowed_uni_streams(
table_type,
stake,
staked_nodes.clone(),
) as u64);

debug!(
"Peer type: {:?}, stake {}, total stake {}, max streams {}",
table_type,
stake,
staked_nodes.read().unwrap().total_stake,
max_uni_streams.unwrap().into_inner()
);

if let Ok(max_uni_streams) = max_uni_streams {
connection.set_max_concurrent_uni_streams(max_uni_streams);
Expand Down Expand Up @@ -526,7 +557,7 @@ impl Drop for ConnectionEntry {
}
}

#[derive(Copy, Clone)]
#[derive(Copy, Clone, Debug)]
enum ConnectionPeerType {
Unstaked,
Staked,
Expand Down Expand Up @@ -684,6 +715,7 @@ pub mod test {
use {
super::*,
crate::{
nonblocking::quic::compute_max_allowed_uni_streams,
quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
tls_certificates::new_self_signed_tls_certificate_chain,
},
Expand Down Expand Up @@ -1371,4 +1403,62 @@ pub mod test {
}
assert_eq!(table.total_size, 0);
}

#[test]
fn test_max_allowed_uni_streams() {
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, staked_nodes.clone()),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10, staked_nodes.clone()),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, staked_nodes.clone()),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, staked_nodes.clone()),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
);
staked_nodes.write().unwrap().total_stake = 10000;
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1000, staked_nodes.clone()),
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS / (10_f64)) as usize
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 100, staked_nodes.clone()),
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS / (100_f64)) as usize
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, staked_nodes.clone()),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1, staked_nodes.clone()),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, staked_nodes.clone()),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(
ConnectionPeerType::Unstaked,
1000,
staked_nodes.clone()
),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 1, staked_nodes.clone()),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, staked_nodes),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
}
}

0 comments on commit 5e16953

Please sign in to comment.