diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index f668a08edd38c3..490bfedafdcfe0 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,7 +1,8 @@ use { crate::{ nonblocking::stream_throttle::{ - ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING, + ConnectionStreamCounter, StakedStreamLoadEMA, MAX_STREAMS_PER_MS, + STREAM_STOP_CODE_THROTTLING, STREAM_THROTTLING_INTERVAL_MS, }, quic::{configure_server, QuicServerError, StreamStats}, streamer::StakedNodes, @@ -498,10 +499,16 @@ async fn setup_connection( stats.clone(), ), |(pubkey, stake, total_stake, max_stake, min_stake)| { - let peer_type = if stake > 0 { - ConnectionPeerType::Staked(stake) - } else { + // The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle + // interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams. + let min_stake_ratio = + 1_f64 / (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) as f64; + let stake_ratio = stake as f64 / total_stake as f64; + let peer_type = if stake_ratio < min_stake_ratio { + // If it is a staked connection with ultra low stake ratio, treat it as unstaked. ConnectionPeerType::Unstaked + } else { + ConnectionPeerType::Staked(stake) }; NewConnectionHandlerParams { packet_sender, diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index aa5e53aa1b156b..95ba1cb550538e 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -15,9 +15,9 @@ use { }; /// Limit to 250K PPS -const MAX_STREAMS_PER_MS: u64 = 250; +pub const MAX_STREAMS_PER_MS: u64 = 250; const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20; -const STREAM_THROTTLING_INTERVAL_MS: u64 = 100; +pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100; pub const STREAM_STOP_CODE_THROTTLING: u32 = 15; const STREAM_LOAD_EMA_INTERVAL_MS: u64 = 5; const STREAM_LOAD_EMA_INTERVAL_COUNT: u64 = 10;