Skip to content

Commit

Permalink
Set receive_window per quic connection (#26936)
Browse files Browse the repository at this point in the history
This change sets the receive_window for non-staked node to 1 * PACKET_DATA_SIZE, and maps the staked nodes's connection's receive_window between 1.2 * PACKET_DATA_SIZE to 10 * PACKET_DATA_SIZE based on the stakes.

The changes is based on Quinn library change to support per connection receive_window tweak at the server side. quinn-rs/quinn#1393
lijunwangs authored Aug 9, 2022
1 parent f7c6901 commit a69470f
Showing 7 changed files with 263 additions and 32 deletions.
63 changes: 57 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 13 additions & 4 deletions core/src/staked_nodes_updater_service.rs
Original file line number Diff line number Diff line change
@@ -36,11 +36,15 @@ impl StakedNodesUpdaterService {
let mut new_ip_to_stake = HashMap::new();
let mut new_id_to_stake = HashMap::new();
let mut total_stake = 0;
let mut max_stake: u64 = 0;
let mut min_stake: u64 = u64::MAX;
if Self::try_refresh_stake_maps(
&mut last_stakes,
&mut new_ip_to_stake,
&mut new_id_to_stake,
&mut total_stake,
&mut max_stake,
&mut min_stake,
&bank_forks,
&cluster_info,
) {
@@ -61,16 +65,21 @@ impl StakedNodesUpdaterService {
ip_to_stake: &mut HashMap<IpAddr, u64>,
id_to_stake: &mut HashMap<Pubkey, u64>,
total_stake: &mut u64,
max_stake: &mut u64,
min_stake: &mut u64,
bank_forks: &RwLock<BankForks>,
cluster_info: &ClusterInfo,
) -> bool {
if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION {
let root_bank = bank_forks.read().unwrap().root_bank();
let staked_nodes = root_bank.staked_nodes();
*total_stake = staked_nodes
.iter()
.map(|(_pubkey, stake)| stake)
.sum::<u64>();

for stake in staked_nodes.values() {
*total_stake += stake;
*max_stake = *stake.max(max_stake);
*min_stake = *stake.min(min_stake);
}

*id_to_stake = cluster_info
.tvu_peers()
.into_iter()
63 changes: 57 additions & 6 deletions programs/bpf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions sdk/src/quic.rs
Original file line number Diff line number Diff line change
@@ -12,3 +12,15 @@ pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000;
// applications. Different applications vary, but most seem to
// be in the 30-60 second range
pub const QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS: u64 = 60_000;

/// The receive window for QUIC connection from unstaked nodes is
/// set to this ratio times [`solana_sdk::packet::PACKET_DATA_SIZE`]
pub const QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO: u64 = 1;

/// The receive window for QUIC connection from minimum staked nodes is
/// set to this ratio times [`solana_sdk::packet::PACKET_DATA_SIZE`]
pub const QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO: u64 = 2;

/// The receive window for QUIC connection from maximum staked nodes is
/// set to this ratio times [`solana_sdk::packet::PACKET_DATA_SIZE`]
pub const QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO: u64 = 10;
4 changes: 3 additions & 1 deletion streamer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -21,7 +21,9 @@ nix = "0.24.2"
pem = "1.0.2"
percentage = "0.1.0"
pkcs8 = { version = "0.8.0", features = ["alloc"] }
quinn = "0.8.3"
quinn = {git = "https://github.com/quinn-rs/quinn.git", branch = "0.8.x", commit = "37c19743cc881cf71369946d572849d5d2ffc3fd"}
quinn-proto = {git = "https://github.com/quinn-rs/quinn.git", branch = "0.8.x", commit = "37c19743cc881cf71369946d572849d5d2ffc3fd"}

rand = "0.7.0"
rcgen = "0.9.2"
rustls = { version = "0.20.6", features = ["dangerous_configuration"] }
134 changes: 119 additions & 15 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
@@ -12,14 +12,16 @@ use {
Connecting, Connection, Endpoint, EndpointConfig, Incoming, IncomingUniStreams,
NewConnection, VarInt,
},
quinn_proto::VarIntBoundsExceeded,
rand::{thread_rng, Rng},
solana_perf::packet::PacketBatch,
solana_sdk::{
packet::{Packet, PACKET_DATA_SIZE},
pubkey::Pubkey,
quic::{
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO,
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO,
},
signature::Keypair,
timing,
@@ -142,7 +144,7 @@ fn prune_unstaked_connection_table(
fn get_connection_stake(
connection: &Connection,
staked_nodes: Arc<RwLock<StakedNodes>>,
) -> Option<(Pubkey, u64, u64)> {
) -> Option<(Pubkey, u64, u64, u64, u64)> {
connection
.peer_identity()
.and_then(|der_cert_any| der_cert_any.downcast::<Vec<rustls::Certificate>>().ok())
@@ -152,10 +154,12 @@ fn get_connection_stake(

let staked_nodes = staked_nodes.read().unwrap();
let total_stake = staked_nodes.total_stake;
let max_stake = staked_nodes.max_stake;
let min_stake = staked_nodes.min_stake;
staked_nodes
.pubkey_stake_map
.get(&pubkey)
.map(|stake| (pubkey, *stake, total_stake))
.map(|stake| (pubkey, *stake, total_stake, max_stake, min_stake))
})
})
}
@@ -198,6 +202,8 @@ struct NewConnectionHandlerParams {
total_stake: u64,
max_connections_per_peer: usize,
stats: Arc<StreamStats>,
max_stake: u64,
min_stake: u64,
}

impl NewConnectionHandlerParams {
@@ -213,6 +219,8 @@ impl NewConnectionHandlerParams {
total_stake: 0,
max_connections_per_peer,
stats,
max_stake: 0,
min_stake: 0,
}
}
}
@@ -236,16 +244,29 @@ fn handle_and_cache_new_connection(
) as u64)
{
connection.set_max_concurrent_uni_streams(max_uni_streams);
debug!(
"Peer type: {:?}, stake {}, total stake {}, max streams {}",
let receive_window = compute_recieve_window(
params.max_stake,
params.min_stake,
connection_table_l.peer_type,
params.stake,
params.total_stake,
max_uni_streams.into_inner()
);

if let Ok(receive_window) = receive_window {
connection.set_receive_window(receive_window);
}

let remote_addr = connection.remote_address();

debug!(
"Peer type: {:?}, stake {}, total stake {}, max streams {} receive_window {:?} from peer {}",
connection_table_l.peer_type,
params.stake,
params.total_stake,
max_uni_streams.into_inner(),
receive_window,
remote_addr,
);

if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
remote_addr.port(),
@@ -305,6 +326,50 @@ fn prune_unstaked_connections_and_add_new_connection(
}
}

/// Calculate the ratio for per connection receive window from a staked peer
fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, stake: u64) -> u64 {
// Testing shows the maximum througput from a connection is achieved at receive_window =
// PACKET_DATA_SIZE * 10. Beyond that, there is not much gain. We linearly map the
// stake to the ratio range from QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO to
// QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO. Where the linear algebra of finding the ratio 'r'
// for stake 's' is,
// r(s) = a * s + b. Given the max_stake, min_stake, max_ratio, min_ratio, we can find
// a and b.

if stake > max_stake {
return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
}

let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO;
if max_stake > min_stake {
let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64;
let b = max_ratio as f64 - ((max_stake as f64) * a);
let ratio = (a * stake as f64) + b;
ratio.round() as u64
} else {
QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO
}
}

fn compute_recieve_window(
max_stake: u64,
min_stake: u64,
peer_type: ConnectionPeerType,
peer_stake: u64,
) -> Result<VarInt, VarIntBoundsExceeded> {
match peer_type {
ConnectionPeerType::Unstaked => {
VarInt::from_u64((PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO) as u64)
}
ConnectionPeerType::Staked => {
let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake);
VarInt::from_u64((PACKET_DATA_SIZE as u64 * ratio) as u64)
}
}
}

async fn setup_connection(
connecting: Connecting,
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
@@ -333,13 +398,17 @@ async fn setup_connection(
max_connections_per_peer,
stats.clone(),
),
|(pubkey, stake, total_stake)| NewConnectionHandlerParams {
packet_sender,
remote_pubkey: Some(pubkey),
stake,
total_stake,
max_connections_per_peer,
stats: stats.clone(),
|(pubkey, stake, total_stake, max_stake, min_stake)| {
NewConnectionHandlerParams {
packet_sender,
remote_pubkey: Some(pubkey),
stake,
total_stake,
max_connections_per_peer,
stats: stats.clone(),
max_stake,
min_stake,
}
},
);

@@ -1475,6 +1544,7 @@ pub mod test {
}

#[test]

fn test_max_allowed_uni_streams() {
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, 0),
@@ -1525,4 +1595,38 @@ pub mod test {
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
}

#[test]
fn test_cacluate_receive_window_ratio_for_staked_node() {
let mut max_stake = 10000;
let mut min_stake = 0;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, min_stake);
assert_eq!(ratio, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO);

let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
assert_eq!(ratio, max_ratio);

let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake / 2);
let average_ratio =
(QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO) / 2;
assert_eq!(ratio, average_ratio);

max_stake = 10000;
min_stake = 10000;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
assert_eq!(ratio, max_ratio);

max_stake = 0;
min_stake = 0;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
assert_eq!(ratio, max_ratio);

max_stake = 1000;
min_stake = 10;
let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10);
assert_eq!(ratio, max_ratio);
}
}
2 changes: 2 additions & 0 deletions streamer/src/streamer.rs
Original file line number Diff line number Diff line change
@@ -28,6 +28,8 @@ use {
#[derive(Default)]
pub struct StakedNodes {
pub total_stake: u64,
pub max_stake: u64,
pub min_stake: u64,
pub ip_stake_map: HashMap<IpAddr, u64>,
pub pubkey_stake_map: HashMap<Pubkey, u64>,
}

0 comments on commit a69470f

Please sign in to comment.