Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix transaction chunking on QUIC batch send #26642

Merged
merged 2 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,21 @@ impl QuicClient {
Ok(())
}

fn compute_chunk_length(num_buffers_to_chunk: usize, num_chunks: usize) -> usize {
// The function is equivalent to checked div_ceil()
// Also, if num_chunks == 0 || num_buffers_per_chunk == 0, return 1
num_buffers_to_chunk
.checked_div(num_chunks)
.map_or(1, |value| {
if num_buffers_to_chunk.checked_rem(num_chunks).unwrap_or(0) != 0 {
value.saturating_add(1)
} else {
value
}
})
.max(1)
}

pub async fn send_batch<T>(
&self,
buffers: &[T],
Expand Down Expand Up @@ -462,9 +477,9 @@ impl QuicClient {
// by just getting a reference to the NewConnection once
let connection_ref: &NewConnection = &connection;

let chunks = buffers[1..buffers.len()]
.iter()
.chunks(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS);
let chunk_len =
Self::compute_chunk_length(buffers.len() - 1, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS);
let chunks = buffers[1..buffers.len()].iter().chunks(chunk_len);

let futures: Vec<_> = chunks
.into_iter()
Expand Down Expand Up @@ -566,3 +581,29 @@ impl TpuConnection for QuicTpuConnection {
Ok(())
}
}

#[cfg(test)]
mod tests {
use crate::nonblocking::quic_client::QuicClient;

#[test]
fn test_transaction_batch_chunking() {
assert_eq!(QuicClient::compute_chunk_length(0, 0), 1);
assert_eq!(QuicClient::compute_chunk_length(10, 0), 1);
assert_eq!(QuicClient::compute_chunk_length(0, 10), 1);
assert_eq!(QuicClient::compute_chunk_length(usize::MAX, usize::MAX), 1);
assert_eq!(QuicClient::compute_chunk_length(10, usize::MAX), 1);
assert!(QuicClient::compute_chunk_length(usize::MAX, 10) == (usize::MAX / 10) + 1);
assert_eq!(QuicClient::compute_chunk_length(10, 1), 10);
assert_eq!(QuicClient::compute_chunk_length(10, 2), 5);
assert_eq!(QuicClient::compute_chunk_length(10, 3), 4);
assert_eq!(QuicClient::compute_chunk_length(10, 4), 3);
assert_eq!(QuicClient::compute_chunk_length(10, 5), 2);
assert_eq!(QuicClient::compute_chunk_length(10, 6), 2);
assert_eq!(QuicClient::compute_chunk_length(10, 7), 2);
assert_eq!(QuicClient::compute_chunk_length(10, 8), 2);
assert_eq!(QuicClient::compute_chunk_length(10, 9), 2);
assert_eq!(QuicClient::compute_chunk_length(10, 10), 1);
assert_eq!(QuicClient::compute_chunk_length(10, 11), 1);
}
}
1 change: 1 addition & 0 deletions sdk/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub const QUIC_PORT_OFFSET: u16 = 6;
// that seems to maximize TPS on GCE (higher values don't seem to
// give significant improvement or seem to impact stability)
pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128;
pub const QUIC_MIN_STAKED_CONCURRENT_STREAMS: usize = 128;

pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000;
pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000;
Expand Down
120 changes: 105 additions & 15 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use {
solana_sdk::{
packet::{Packet, PACKET_DATA_SIZE},
pubkey::Pubkey,
quic::{QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
quic::{
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
QUIC_MIN_STAKED_CONCURRENT_STREAMS,
},
signature::Keypair,
timing,
},
Expand Down Expand Up @@ -156,6 +159,34 @@ fn get_connection_stake(
})
}

fn compute_max_allowed_uni_streams(
peer_type: ConnectionPeerType,
peer_stake: u64,
staked_nodes: Arc<RwLock<StakedNodes>>,
) -> usize {
if peer_stake == 0 {
// Treat stake = 0 as unstaked
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
} else {
match peer_type {
ConnectionPeerType::Staked => {
let staked_nodes = staked_nodes.read().unwrap();

// No checked math for f64 type. So let's explicitly check for 0 here
if staked_nodes.total_stake == 0 {
QUIC_MIN_STAKED_CONCURRENT_STREAMS
} else {
(((peer_stake as f64 / staked_nodes.total_stake as f64)
* QUIC_TOTAL_STAKED_CONCURRENT_STREAMS as f64)
as usize)
.max(QUIC_MIN_STAKED_CONCURRENT_STREAMS)
}
}
_ => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
}
}
}

async fn setup_connection(
connecting: Connecting,
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
Expand Down Expand Up @@ -233,19 +264,19 @@ async fn setup_connection(

if let Some((mut connection_table_l, stake)) = table_and_stake {
let table_type = connection_table_l.peer_type;
let max_uni_streams = match table_type {
ConnectionPeerType::Unstaked => {
VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64)
}
ConnectionPeerType::Staked => {
let staked_nodes = staked_nodes.read().unwrap();
VarInt::from_u64(
((stake as f64 / staked_nodes.total_stake as f64)
* QUIC_TOTAL_STAKED_CONCURRENT_STREAMS)
as u64,
)
}
};
let max_uni_streams = VarInt::from_u64(compute_max_allowed_uni_streams(
table_type,
stake,
staked_nodes.clone(),
) as u64);

debug!(
"Peer type: {:?}, stake {}, total stake {}, max streams {}",
table_type,
stake,
staked_nodes.read().unwrap().total_stake,
max_uni_streams.unwrap().into_inner()
);

if let Ok(max_uni_streams) = max_uni_streams {
connection.set_max_concurrent_uni_streams(max_uni_streams);
Expand Down Expand Up @@ -517,7 +548,7 @@ impl Drop for ConnectionEntry {
}
}

#[derive(Copy, Clone)]
#[derive(Copy, Clone, Debug)]
enum ConnectionPeerType {
Unstaked,
Staked,
Expand Down Expand Up @@ -675,6 +706,7 @@ pub mod test {
use {
super::*,
crate::{
nonblocking::quic::compute_max_allowed_uni_streams,
quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
tls_certificates::new_self_signed_tls_certificate_chain,
},
Expand Down Expand Up @@ -1362,4 +1394,62 @@ pub mod test {
}
assert_eq!(table.total_size, 0);
}

#[test]
fn test_max_allowed_uni_streams() {
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, staked_nodes.clone()),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10, staked_nodes.clone()),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, staked_nodes.clone()),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, staked_nodes.clone()),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
);
staked_nodes.write().unwrap().total_stake = 10000;
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1000, staked_nodes.clone()),
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS / (10_f64)) as usize
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 100, staked_nodes.clone()),
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS / (100_f64)) as usize
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, staked_nodes.clone()),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1, staked_nodes.clone()),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, staked_nodes.clone()),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(
ConnectionPeerType::Unstaked,
1000,
staked_nodes.clone()
),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 1, staked_nodes.clone()),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, staked_nodes),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
}
}