Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set max concurrent uni streams accordingly -- do not over allocate open uni streams #1060

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions quic-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use {
signature::{Keypair, Signer},
},
solana_streamer::{
nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType},
nonblocking::quic::{ConnectionPeerType, UniStreamQosUtil},
streamer::StakedNodes,
tls_certificates::new_dummy_x509_certificate,
},
Expand Down Expand Up @@ -135,7 +135,7 @@ impl QuicConfig {
},
)
});
compute_max_allowed_uni_streams(client_type, total_stake)
UniStreamQosUtil::compute_max_allowed_uni_streams(client_type, total_stake)
}

pub fn update_client_certificate(&mut self, keypair: &Keypair, _ipaddr: IpAddr) {
Expand Down
213 changes: 175 additions & 38 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,30 +281,61 @@ fn get_connection_stake(
))
}

pub fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize {
match peer_type {
ConnectionPeerType::Staked(peer_stake) => {
// No checked math for f64 type. So let's explicitly check for 0 here
if total_stake == 0 || peer_stake > total_stake {
warn!(
"Invalid stake values: peer_stake: {:?}, total_stake: {:?}",
peer_stake, total_stake,
);
pub struct UniStreamQosUtil {}

QUIC_MIN_STAKED_CONCURRENT_STREAMS
} else {
let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS
- QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;

(((peer_stake as f64 / total_stake as f64) * delta) as usize
+ QUIC_MIN_STAKED_CONCURRENT_STREAMS)
.clamp(
QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_MAX_STAKED_CONCURRENT_STREAMS,
)
impl UniStreamQosUtil {
/// Calculate the maximum allowed uni streams based on stake information.
pub fn compute_max_allowed_uni_streams(
peer_type: ConnectionPeerType,
total_stake: u64,
) -> usize {
match peer_type {
ConnectionPeerType::Staked(peer_stake) => {
// No checked math for f64 type. So let's explicitly check for 0 here
if total_stake == 0 || peer_stake > total_stake {
warn!(
"Invalid stake values: peer_stake: {:?}, total_stake: {:?}",
peer_stake, total_stake,
);

QUIC_MIN_STAKED_CONCURRENT_STREAMS
} else {
let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS
- QUIC_MIN_STAKED_CONCURRENT_STREAMS)
as f64;

(((peer_stake as f64 / total_stake as f64) * delta) as usize
+ QUIC_MIN_STAKED_CONCURRENT_STREAMS)
.clamp(
QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_MAX_STAKED_CONCURRENT_STREAMS,
)
}
}
ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
}
ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
}

/// Given the load EMA, derive the streams per throttle window.
/// Do not allow concurrent streams more than the max streams per throttle window.
pub(crate) fn max_allowed_uni_streams_per_throttling_interval_ema(
ema: &StakedStreamLoadEMA,
peer_type: ConnectionPeerType,
total_stake: u64,
) -> u64 {
let max_streams_per_throttle_window =
ema.available_load_capacity_in_throttling_duration(peer_type, total_stake);
(Self::compute_max_allowed_uni_streams(peer_type, total_stake) as u64)
.min(max_streams_per_throttle_window)
}

/// Given the max_streams_per_throttling_interval, derive the streams per throttle window.
/// Do not allow concurrent streams more than the max streams per throttle window.
pub fn max_concurrent_uni_streams_per_throttling_interval(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a function for this? It's just a wrapper on min. Why not directly use min() where we are calling this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes the design goal more explicit and easier to do test

max_streams_per_throttling_interval: u64,
max_concurrent_uni_streams: u64,
) -> u64 {
max_concurrent_uni_streams.min(max_streams_per_throttling_interval)
}
}

Expand Down Expand Up @@ -357,11 +388,13 @@ fn handle_and_cache_new_connection(
wait_for_chunk_timeout: Duration,
stream_load_ema: Arc<StakedStreamLoadEMA>,
) -> Result<(), ConnectionHandlerError> {
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
params.peer_type,
params.total_stake,
) as u64)
{
if let Ok(max_uni_streams) = VarInt::from_u64(
UniStreamQosUtil::max_allowed_uni_streams_per_throttling_interval_ema(
&stream_load_ema,
params.peer_type,
params.total_stake,
),
) {
let remote_addr = connection.remote_address();
let receive_window =
compute_recieve_window(params.max_stake, params.min_stake, params.peer_type);
Expand All @@ -387,11 +420,6 @@ fn handle_and_cache_new_connection(
{
drop(connection_table_l);

if let Ok(receive_window) = receive_window {
connection.set_receive_window(receive_window);
}
connection.set_max_concurrent_uni_streams(max_uni_streams);

tokio::spawn(handle_connection(
connection,
remote_addr,
Expand All @@ -402,6 +430,8 @@ fn handle_and_cache_new_connection(
wait_for_chunk_timeout,
stream_load_ema,
stream_counter,
max_uni_streams,
receive_window.ok(),
));
Ok(())
} else {
Expand Down Expand Up @@ -793,6 +823,7 @@ fn track_streamer_fetch_packet_performance(
.fetch_add(measure.as_us(), Ordering::Relaxed);
}

#[allow(clippy::too_many_arguments)]
async fn handle_connection(
connection: Connection,
remote_addr: SocketAddr,
Expand All @@ -803,6 +834,8 @@ async fn handle_connection(
wait_for_chunk_timeout: Duration,
stream_load_ema: Arc<StakedStreamLoadEMA>,
stream_counter: Arc<ConnectionStreamCounter>,
mut max_uni_streams: VarInt,
receive_window: Option<VarInt>,
) {
let stats = params.stats;
debug!(
Expand All @@ -811,8 +844,15 @@ async fn handle_connection(
stats.total_streams.load(Ordering::Relaxed),
stats.total_connections.load(Ordering::Relaxed),
);
connection.set_max_concurrent_uni_streams(max_uni_streams);
if let Some(receive_window) = receive_window {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any benefit of moving receive_window setting to this function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It encapsulates better to put QOS related config for connections in one place (receive window and max concurrent uni streams.

connection.set_receive_window(receive_window);
}
let stable_id = connection.stable_id();
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let max_concurrent_uni_streams =
UniStreamQosUtil::compute_max_allowed_uni_streams(params.peer_type, params.total_stake)
as u64;
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(stream) =
tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await
Expand Down Expand Up @@ -856,6 +896,20 @@ async fn handle_connection(
sleep(throttle_duration).await;
}
}
let max_uni_streams_in_interval = VarInt::from_u64(
UniStreamQosUtil::max_concurrent_uni_streams_per_throttling_interval(
max_streams_per_throttling_interval,
max_concurrent_uni_streams,
),
);

if let Ok(max_uni_streams_in_interval) = max_uni_streams_in_interval {
// Update max concurrent uni streams if needed
if max_uni_streams_in_interval != max_uni_streams {
connection.set_max_concurrent_uni_streams(max_uni_streams_in_interval);
max_uni_streams = max_uni_streams_in_interval;
}
}
stream_load_ema.increment_load(params.peer_type);
stream_counter.stream_count.fetch_add(1, Ordering::Relaxed);
stats.total_streams.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -1267,7 +1321,7 @@ pub mod test {
use {
super::*,
crate::{
nonblocking::quic::compute_max_allowed_uni_streams,
nonblocking::quic::UniStreamQosUtil,
quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
tls_certificates::new_dummy_x509_certificate,
},
Expand Down Expand Up @@ -2098,33 +2152,39 @@ pub mod test {

fn test_max_allowed_uni_streams() {
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0),
UniStreamQosUtil::compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0),
UniStreamQosUtil::compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
);
let delta =
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked(1000), 10000),
UniStreamQosUtil::compute_max_allowed_uni_streams(
ConnectionPeerType::Staked(1000),
10000
),
QUIC_MAX_STAKED_CONCURRENT_STREAMS,
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked(100), 10000),
UniStreamQosUtil::compute_max_allowed_uni_streams(
ConnectionPeerType::Staked(100),
10000
),
((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS)
.min(QUIC_MAX_STAKED_CONCURRENT_STREAMS)
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000),
UniStreamQosUtil::compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
}

#[test]
fn test_cacluate_receive_window_ratio_for_staked_node() {
let mut max_stake = 10000;
let mut max_stake: u64 = 10000;
let mut min_stake = 0;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, min_stake);
assert_eq!(ratio, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO);
Expand Down Expand Up @@ -2155,4 +2215,81 @@ pub mod test {
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10);
assert_eq!(ratio, max_ratio);
}

#[test]
fn test_max_allowed_uni_streams_per_throttling_interval_ema() {
let load_ema = StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
);

let total_stake = 10000;

assert_eq!(
UniStreamQosUtil::compute_max_allowed_uni_streams(
ConnectionPeerType::Unstaked,
total_stake
),
128
);

let max_uni_streams_in_ema_window =
UniStreamQosUtil::max_allowed_uni_streams_per_throttling_interval_ema(
&load_ema,
ConnectionPeerType::Unstaked,
total_stake,
);

assert_eq!(max_uni_streams_in_ema_window, 10);

assert_eq!(
UniStreamQosUtil::compute_max_allowed_uni_streams(
ConnectionPeerType::Staked(1000),
total_stake
),
512
);

let max_uni_streams_in_ema_window =
UniStreamQosUtil::max_allowed_uni_streams_per_throttling_interval_ema(
&load_ema,
ConnectionPeerType::Staked(1000),
total_stake,
);

assert_eq!(max_uni_streams_in_ema_window, 512);

let max_uni_streams_in_ema_window =
UniStreamQosUtil::max_allowed_uni_streams_per_throttling_interval_ema(
&load_ema,
ConnectionPeerType::Staked(1),
total_stake,
);

assert_eq!(max_uni_streams_in_ema_window, 11);

let max_uni_streams_in_ema_window =
UniStreamQosUtil::max_allowed_uni_streams_per_throttling_interval_ema(
&load_ema,
ConnectionPeerType::Staked(10),
total_stake,
);

assert_eq!(max_uni_streams_in_ema_window, 80);
}

#[test]
fn test_max_concurrent_uni_streams_per_throttling_interval() {
// the current unit streams in a throttling window should be always the minimum of the allowed
// max streams per throttle window and the max concurrent streams allowed for it.
assert_eq!(
UniStreamQosUtil::max_concurrent_uni_streams_per_throttling_interval(10, 20),
10
);
assert_eq!(
UniStreamQosUtil::max_concurrent_uni_streams_per_throttling_interval(20, 10),
10
);
}
}
Loading