diff --git a/Cargo.lock b/Cargo.lock index 110787fcd478bf..4fec0cb566457a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3323,9 +3323,9 @@ checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" [[package]] name = "quinn" -version = "0.8.1" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "584865613896a1f644d757e52c45c573441c8b04cac38ac13990b0235203db66" +checksum = "d7542006acd6e057ff632307d219954c44048f818898da03113d6c0086bfddd9" dependencies = [ "bytes", "futures-channel", @@ -3342,9 +3342,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.8.0" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "063dedf7983c8d57db474218f258daa85b627de6f2dbc458b690a93b1de790e8" +checksum = "3a13a5c0a674c1ce7150c9df7bc4a1e46c2fbbe7c710f56c0dc78b1a810e779e" dependencies = [ "bytes", "fxhash", diff --git a/client/src/nonblocking/quic_client.rs b/client/src/nonblocking/quic_client.rs index 7252024578ff97..53e20d83a1e49a 100644 --- a/client/src/nonblocking/quic_client.rs +++ b/client/src/nonblocking/quic_client.rs @@ -15,7 +15,9 @@ use { }, solana_measure::measure::Measure, solana_net_utils::VALIDATOR_PORT_RANGE, - solana_sdk::quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS}, + solana_sdk::quic::{ + QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, + }, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{atomic::Ordering, Arc}, @@ -395,7 +397,7 @@ impl QuicClient { let chunks = buffers[1..buffers.len()] .iter() - .chunks(QUIC_MAX_CONCURRENT_STREAMS); + .chunks(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS); let futures: Vec<_> = chunks .into_iter() diff --git a/client/tests/quic_client.rs b/client/tests/quic_client.rs index 7964737c786c99..40d02cbbbe1dcf 100644 --- a/client/tests/quic_client.rs +++ b/client/tests/quic_client.rs @@ -8,9 +8,11 @@ mod tests { tpu_connection::TpuConnection, }, solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair}, - solana_streamer::quic::{spawn_server, StreamStats}, + solana_streamer::{ + quic::{spawn_server, StreamStats}, + streamer::StakedNodes, + }, std::{ - collections::HashMap, net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, @@ -28,7 +30,7 @@ mod tests { let (sender, receiver) = unbounded(); let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s.try_clone().unwrap(), diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs index 5060ab2a721716..ec02d080c96964 100644 --- a/core/src/find_packet_sender_stake_stage.rs +++ b/core/src/find_packet_sender_stake_stage.rs @@ -3,7 +3,7 @@ use { solana_measure::measure::Measure, solana_perf::packet::PacketBatch, solana_sdk::timing::timestamp, - solana_streamer::streamer::{self, StreamerError}, + solana_streamer::streamer::{self, StakedNodes, StreamerError}, std::{ collections::HashMap, net::IpAddr, @@ -79,7 +79,7 @@ impl FindPacketSenderStakeStage { pub fn new( packet_receiver: streamer::PacketBatchReceiver, sender: FindPacketSenderStakeSender, - staked_nodes: Arc>>, + staked_nodes: Arc>, name: &'static str, ) -> Self { let mut stats = FindPacketSenderStakeStats::default(); @@ -105,7 +105,7 @@ impl FindPacketSenderStakeStage { Measure::start("apply_sender_stakes_time"); let mut apply_stake = || { let ip_to_stake = staked_nodes.read().unwrap(); - Self::apply_sender_stakes(&mut batches, &ip_to_stake); + Self::apply_sender_stakes(&mut batches, &ip_to_stake.stake_map); }; apply_stake(); apply_sender_stakes_time.stop(); diff --git a/core/src/staked_nodes_updater_service.rs b/core/src/staked_nodes_updater_service.rs index 45adbfe2ee7bd6..e26740a3c7a8cb 100644 --- a/core/src/staked_nodes_updater_service.rs +++ b/core/src/staked_nodes_updater_service.rs @@ -1,6 +1,7 @@ use { solana_gossip::cluster_info::ClusterInfo, solana_runtime::bank_forks::BankForks, + solana_streamer::streamer::StakedNodes, std::{ collections::HashMap, net::IpAddr, @@ -24,7 +25,7 @@ impl StakedNodesUpdaterService { exit: Arc, cluster_info: Arc, bank_forks: Arc>, - shared_staked_nodes: Arc>>, + shared_staked_nodes: Arc>, ) -> Self { let thread_hdl = Builder::new() .name("sol-sn-updater".to_string()) @@ -32,14 +33,17 @@ impl StakedNodesUpdaterService { let mut last_stakes = Instant::now(); while !exit.load(Ordering::Relaxed) { let mut new_ip_to_stake = HashMap::new(); + let mut total_stake = 0; if Self::try_refresh_ip_to_stake( &mut last_stakes, &mut new_ip_to_stake, + &mut total_stake, &bank_forks, &cluster_info, ) { let mut shared = shared_staked_nodes.write().unwrap(); - *shared = new_ip_to_stake; + shared.total_stake = total_stake as f64; + shared.stake_map = new_ip_to_stake; } } }) @@ -51,12 +55,17 @@ impl StakedNodesUpdaterService { fn try_refresh_ip_to_stake( last_stakes: &mut Instant, ip_to_stake: &mut HashMap, + total_stake: &mut u64, bank_forks: &RwLock, cluster_info: &ClusterInfo, ) -> bool { if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION { let root_bank = bank_forks.read().unwrap().root_bank(); let staked_nodes = root_bank.staked_nodes(); + *total_stake = staked_nodes + .iter() + .map(|(_pubkey, stake)| stake) + .sum::(); *ip_to_stake = cluster_info .tvu_peers() .into_iter() diff --git a/core/src/tpu.rs b/core/src/tpu.rs index f5589318cb2276..cf6d3956bd5369 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -30,11 +30,11 @@ use { vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender}, }, solana_sdk::signature::Keypair, - solana_streamer::quic::{ - spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + solana_streamer::{ + quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, + streamer::StakedNodes, }, std::{ - collections::HashMap, net::UdpSocket, sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, thread, @@ -127,7 +127,7 @@ impl Tpu { Some(bank_forks.read().unwrap().get_vote_only_mode_signal()), ); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let staked_nodes_updater_service = StakedNodesUpdaterService::new( exit.clone(), cluster_info.clone(), diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index c73f9a16d4fead..e7f3a1d3ef33d9 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -2944,9 +2944,9 @@ dependencies = [ [[package]] name = "quinn" -version = "0.8.1" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "584865613896a1f644d757e52c45c573441c8b04cac38ac13990b0235203db66" +checksum = "d7542006acd6e057ff632307d219954c44048f818898da03113d6c0086bfddd9" dependencies = [ "bytes", "futures-channel", @@ -2963,9 +2963,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.8.1" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2b1562bf4998b0c6d1841a4742b7103bb82cdde61374833de826bab9e8ad498" +checksum = "3a13a5c0a674c1ce7150c9df7bc4a1e46c2fbbe7c710f56c0dc78b1a810e779e" dependencies = [ "bytes", "fxhash", diff --git a/sdk/src/quic.rs b/sdk/src/quic.rs index acfb8894086de8..48467bf520a5ca 100644 --- a/sdk/src/quic.rs +++ b/sdk/src/quic.rs @@ -2,7 +2,7 @@ pub const QUIC_PORT_OFFSET: u16 = 6; // Empirically found max number of concurrent streams // that seems to maximize TPS on GCE (higher values don't seem to // give significant improvement or seem to impact stability) -pub const QUIC_MAX_CONCURRENT_STREAMS: usize = 2048; +pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128; pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000; pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000; diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index ad2ed9bae10824..9b91281b9e00ab 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -19,7 +19,7 @@ log = "0.4.14" nix = "0.23.1" pem = "1.0.2" pkcs8 = { version = "0.8.0", features = ["alloc"] } -quinn = "0.8.1" +quinn = "0.8.3" rand = "0.7.0" rcgen = "0.9.2" rustls = { version = "0.20.4", features = ["dangerous_configuration"] } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 8351b4eaf57e68..df17ad0b625bcd 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,11 +1,15 @@ use { - crate::quic::{configure_server, QuicServerError, StreamStats}, + crate::{ + quic::{configure_server, QuicServerError, StreamStats}, + streamer::StakedNodes, + }, crossbeam_channel::Sender, futures_util::stream::StreamExt, - quinn::{Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection}, + quinn::{Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection, VarInt}, solana_perf::packet::PacketBatch, solana_sdk::{ packet::{Packet, PACKET_DATA_SIZE}, + quic::QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, signature::Keypair, timing, }, @@ -21,6 +25,8 @@ use { tokio::{task::JoinHandle, time::timeout}, }; +const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64; + #[allow(clippy::too_many_arguments)] pub fn spawn_server( sock: UdpSocket, @@ -29,7 +35,7 @@ pub fn spawn_server( packet_sender: Sender, exit: Arc, max_connections_per_ip: usize, - staked_nodes: Arc>>, + staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, @@ -59,7 +65,7 @@ pub async fn run_server( packet_sender: Sender, exit: Arc, max_connections_per_ip: usize, - staked_nodes: Arc>>, + staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, @@ -97,18 +103,30 @@ pub async fn run_server( let (mut connection_table_l, stake) = { let staked_nodes = staked_nodes.read().unwrap(); - if let Some(stake) = staked_nodes.get(&remote_addr.ip()) { + if let Some(stake) = staked_nodes.stake_map.get(&remote_addr.ip()) { let stake = *stake; + let total_stake = staked_nodes.total_stake; drop(staked_nodes); let mut connection_table_l = staked_connection_table.lock().unwrap(); let num_pruned = connection_table_l.prune_oldest(max_staked_connections); stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); + connection.set_max_concurrent_uni_streams( + VarInt::from_u64( + ((stake as f64 / total_stake as f64) + * QUIC_TOTAL_STAKED_CONCURRENT_STREAMS) + as u64, + ) + .unwrap(), + ); (connection_table_l, stake) } else { drop(staked_nodes); let mut connection_table_l = connection_table.lock().unwrap(); let num_pruned = connection_table_l.prune_oldest(max_unstaked_connections); stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); + connection.set_max_concurrent_uni_streams( + VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64).unwrap(), + ); (connection_table_l, 0) } }; @@ -449,7 +467,7 @@ pub mod test { let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s, @@ -648,7 +666,7 @@ pub mod test { let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s, @@ -678,7 +696,7 @@ pub mod test { let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s, diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 0ac58b38972f0b..ef3788d6dd8e6d 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -1,4 +1,5 @@ use { + crate::streamer::StakedNodes, crossbeam_channel::Sender, pem::Pem, pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier}, @@ -7,11 +8,10 @@ use { solana_perf::packet::PacketBatch, solana_sdk::{ packet::PACKET_DATA_SIZE, - quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS}, + quic::{QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS}, signature::Keypair, }, std::{ - collections::HashMap, error::Error, net::{IpAddr, UdpSocket}, sync::{ @@ -49,7 +49,7 @@ pub(crate) fn configure_server( let config = Arc::get_mut(&mut server_config.transport).unwrap(); // QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability - const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_CONCURRENT_STREAMS * 2) as u32; + const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS * 2) as u32; config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into()); config.stream_receive_window((PACKET_DATA_SIZE as u32).into()); config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into()); @@ -258,7 +258,7 @@ pub fn spawn_server( packet_sender: Sender, exit: Arc, max_connections_per_ip: usize, - staked_nodes: Arc>>, + staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, @@ -306,7 +306,7 @@ mod test { let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s, @@ -361,7 +361,7 @@ mod test { let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s, @@ -403,7 +403,7 @@ mod test { let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s, diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 42a5fbbc4f0267..18383181127043 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -24,6 +24,13 @@ use { thiserror::Error, }; +// Total stake and nodes => stake map +#[derive(Default)] +pub struct StakedNodes { + pub total_stake: f64, + pub stake_map: HashMap, +} + pub type PacketBatchReceiver = Receiver; pub type PacketBatchSender = Sender;