From 9f849396b003dc7810e0ef38ccfe77d7f8cb883d Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Fri, 6 May 2022 19:41:10 +0000 Subject: [PATCH 1/2] Add sender stake to quic packets Weight concurrent streams by stake for staked nodes Fixed some comp issues due to merge --- client/src/nonblocking/quic_client.rs | 6 ++-- client/tests/quic_client.rs | 8 ++++-- core/src/find_packet_sender_stake_stage.rs | 9 ++++-- core/src/staked_nodes_updater_service.rs | 13 +++++++-- core/src/tpu.rs | 8 +++--- sdk/src/quic.rs | 2 +- streamer/src/nonblocking/quic.rs | 33 +++++++++++++++++----- streamer/src/quic.rs | 14 ++++----- 8 files changed, 64 insertions(+), 29 deletions(-) diff --git a/client/src/nonblocking/quic_client.rs b/client/src/nonblocking/quic_client.rs index 7252024578ff97..53e20d83a1e49a 100644 --- a/client/src/nonblocking/quic_client.rs +++ b/client/src/nonblocking/quic_client.rs @@ -15,7 +15,9 @@ use { }, solana_measure::measure::Measure, solana_net_utils::VALIDATOR_PORT_RANGE, - solana_sdk::quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS}, + solana_sdk::quic::{ + QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, + }, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{atomic::Ordering, Arc}, @@ -395,7 +397,7 @@ impl QuicClient { let chunks = buffers[1..buffers.len()] .iter() - .chunks(QUIC_MAX_CONCURRENT_STREAMS); + .chunks(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS); let futures: Vec<_> = chunks .into_iter() diff --git a/client/tests/quic_client.rs b/client/tests/quic_client.rs index 7964737c786c99..41db227ae40e06 100644 --- a/client/tests/quic_client.rs +++ b/client/tests/quic_client.rs @@ -8,9 +8,11 @@ mod tests { tpu_connection::TpuConnection, }, solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair}, - solana_streamer::quic::{spawn_server, StreamStats}, + solana_streamer::{ + nonblocking::quic::StakedNodes, + quic::{spawn_server, StreamStats}, + }, std::{ - collections::HashMap, net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, @@ -28,7 +30,7 @@ mod tests { let (sender, receiver) = unbounded(); let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s.try_clone().unwrap(), diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs index 5060ab2a721716..700a1e91417d6d 100644 --- a/core/src/find_packet_sender_stake_stage.rs +++ b/core/src/find_packet_sender_stake_stage.rs @@ -3,7 +3,10 @@ use { solana_measure::measure::Measure, solana_perf::packet::PacketBatch, solana_sdk::timing::timestamp, - solana_streamer::streamer::{self, StreamerError}, + solana_streamer::{ + nonblocking::quic::StakedNodes, + streamer::{self, StreamerError}, + }, std::{ collections::HashMap, net::IpAddr, @@ -79,7 +82,7 @@ impl FindPacketSenderStakeStage { pub fn new( packet_receiver: streamer::PacketBatchReceiver, sender: FindPacketSenderStakeSender, - staked_nodes: Arc>>, + staked_nodes: Arc>, name: &'static str, ) -> Self { let mut stats = FindPacketSenderStakeStats::default(); @@ -105,7 +108,7 @@ impl FindPacketSenderStakeStage { Measure::start("apply_sender_stakes_time"); let mut apply_stake = || { let ip_to_stake = staked_nodes.read().unwrap(); - Self::apply_sender_stakes(&mut batches, &ip_to_stake); + Self::apply_sender_stakes(&mut batches, &ip_to_stake.0 .1); }; apply_stake(); apply_sender_stakes_time.stop(); diff --git a/core/src/staked_nodes_updater_service.rs b/core/src/staked_nodes_updater_service.rs index 45adbfe2ee7bd6..e87584e91b2f25 100644 --- a/core/src/staked_nodes_updater_service.rs +++ b/core/src/staked_nodes_updater_service.rs @@ -1,6 +1,7 @@ use { solana_gossip::cluster_info::ClusterInfo, solana_runtime::bank_forks::BankForks, + solana_streamer::nonblocking::quic::StakedNodes, std::{ collections::HashMap, net::IpAddr, @@ -24,7 +25,7 @@ impl StakedNodesUpdaterService { exit: Arc, cluster_info: Arc, bank_forks: Arc>, - shared_staked_nodes: Arc>>, + shared_staked_nodes: Arc>, ) -> Self { let thread_hdl = Builder::new() .name("sol-sn-updater".to_string()) @@ -32,14 +33,17 @@ impl StakedNodesUpdaterService { let mut last_stakes = Instant::now(); while !exit.load(Ordering::Relaxed) { let mut new_ip_to_stake = HashMap::new(); + let mut total_stake = 0; if Self::try_refresh_ip_to_stake( &mut last_stakes, &mut new_ip_to_stake, + &mut total_stake, &bank_forks, &cluster_info, ) { let mut shared = shared_staked_nodes.write().unwrap(); - *shared = new_ip_to_stake; + shared.0 .0 = total_stake as f64; + shared.0 .1 = new_ip_to_stake; } } }) @@ -51,12 +55,17 @@ impl StakedNodesUpdaterService { fn try_refresh_ip_to_stake( last_stakes: &mut Instant, ip_to_stake: &mut HashMap, + total_stake: &mut u64, bank_forks: &RwLock, cluster_info: &ClusterInfo, ) -> bool { if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION { let root_bank = bank_forks.read().unwrap().root_bank(); let staked_nodes = root_bank.staked_nodes(); + *total_stake = staked_nodes + .iter() + .map(|(_pubkey, stake)| stake) + .sum::(); *ip_to_stake = cluster_info .tvu_peers() .into_iter() diff --git a/core/src/tpu.rs b/core/src/tpu.rs index a3744c77a9a524..836f175f1ed74b 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -30,11 +30,11 @@ use { vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender}, }, solana_sdk::signature::Keypair, - solana_streamer::quic::{ - spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + solana_streamer::{ + nonblocking::quic::StakedNodes, + quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, }, std::{ - collections::HashMap, net::UdpSocket, sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, thread, @@ -126,7 +126,7 @@ impl Tpu { Some(bank_forks.read().unwrap().get_vote_only_mode_signal()), ); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let staked_nodes_updater_service = StakedNodesUpdaterService::new( exit.clone(), cluster_info.clone(), diff --git a/sdk/src/quic.rs b/sdk/src/quic.rs index acfb8894086de8..48467bf520a5ca 100644 --- a/sdk/src/quic.rs +++ b/sdk/src/quic.rs @@ -2,7 +2,7 @@ pub const QUIC_PORT_OFFSET: u16 = 6; // Empirically found max number of concurrent streams // that seems to maximize TPS on GCE (higher values don't seem to // give significant improvement or seem to impact stability) -pub const QUIC_MAX_CONCURRENT_STREAMS: usize = 2048; +pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128; pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000; pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000; diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 8351b4eaf57e68..80d42fd991589a 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -2,10 +2,11 @@ use { crate::quic::{configure_server, QuicServerError, StreamStats}, crossbeam_channel::Sender, futures_util::stream::StreamExt, - quinn::{Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection}, + quinn::{Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection, VarInt}, solana_perf::packet::PacketBatch, solana_sdk::{ packet::{Packet, PACKET_DATA_SIZE}, + quic::QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, signature::Keypair, timing, }, @@ -21,6 +22,12 @@ use { tokio::{task::JoinHandle, time::timeout}, }; +// Total stake and nodes => stake map +#[derive(Default)] +pub struct StakedNodes(pub (f64, HashMap)); + +const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64; + #[allow(clippy::too_many_arguments)] pub fn spawn_server( sock: UdpSocket, @@ -29,7 +36,7 @@ pub fn spawn_server( packet_sender: Sender, exit: Arc, max_connections_per_ip: usize, - staked_nodes: Arc>>, + staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, @@ -59,7 +66,7 @@ pub async fn run_server( packet_sender: Sender, exit: Arc, max_connections_per_ip: usize, - staked_nodes: Arc>>, + staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, @@ -97,18 +104,30 @@ pub async fn run_server( let (mut connection_table_l, stake) = { let staked_nodes = staked_nodes.read().unwrap(); - if let Some(stake) = staked_nodes.get(&remote_addr.ip()) { + if let Some(stake) = staked_nodes.0 .1.get(&remote_addr.ip()) { let stake = *stake; + let total_stake = staked_nodes.0 .0; drop(staked_nodes); let mut connection_table_l = staked_connection_table.lock().unwrap(); let num_pruned = connection_table_l.prune_oldest(max_staked_connections); stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); + connection.set_max_concurrent_uni_streams( + VarInt::from_u64( + ((stake as f64 / total_stake as f64) + * QUIC_TOTAL_STAKED_CONCURRENT_STREAMS) + as u64, + ) + .unwrap(), + ); (connection_table_l, stake) } else { drop(staked_nodes); let mut connection_table_l = connection_table.lock().unwrap(); let num_pruned = connection_table_l.prune_oldest(max_unstaked_connections); stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); + connection.set_max_concurrent_uni_streams( + VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64).unwrap(), + ); (connection_table_l, 0) } }; @@ -449,7 +468,7 @@ pub mod test { let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s, @@ -648,7 +667,7 @@ pub mod test { let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s, @@ -678,7 +697,7 @@ pub mod test { let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s, diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 0ac58b38972f0b..5cf02a0f02f8aa 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -1,4 +1,5 @@ use { + crate::nonblocking::quic::StakedNodes, crossbeam_channel::Sender, pem::Pem, pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier}, @@ -7,11 +8,10 @@ use { solana_perf::packet::PacketBatch, solana_sdk::{ packet::PACKET_DATA_SIZE, - quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS}, + quic::{QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS}, signature::Keypair, }, std::{ - collections::HashMap, error::Error, net::{IpAddr, UdpSocket}, sync::{ @@ -49,7 +49,7 @@ pub(crate) fn configure_server( let config = Arc::get_mut(&mut server_config.transport).unwrap(); // QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability - const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_CONCURRENT_STREAMS * 2) as u32; + const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS * 2) as u32; config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into()); config.stream_receive_window((PACKET_DATA_SIZE as u32).into()); config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into()); @@ -258,7 +258,7 @@ pub fn spawn_server( packet_sender: Sender, exit: Arc, max_connections_per_ip: usize, - staked_nodes: Arc>>, + staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, stats: Arc, @@ -306,7 +306,7 @@ mod test { let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s, @@ -361,7 +361,7 @@ mod test { let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s, @@ -403,7 +403,7 @@ mod test { let keypair = Keypair::new(); let ip = "127.0.0.1".parse().unwrap(); let server_address = s.local_addr().unwrap(); - let staked_nodes = Arc::new(RwLock::new(HashMap::new())); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let stats = Arc::new(StreamStats::default()); let t = spawn_server( s, From 9534aafd9705dd6481c4262244bb5d6a449d9231 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 15 Jun 2022 10:39:17 -0700 Subject: [PATCH 2/2] Refactor StakedNodes --- client/tests/quic_client.rs | 2 +- core/src/find_packet_sender_stake_stage.rs | 7 ++----- core/src/staked_nodes_updater_service.rs | 6 +++--- core/src/tpu.rs | 2 +- streamer/src/nonblocking/quic.rs | 13 ++++++------- streamer/src/quic.rs | 2 +- streamer/src/streamer.rs | 7 +++++++ 7 files changed, 21 insertions(+), 18 deletions(-) diff --git a/client/tests/quic_client.rs b/client/tests/quic_client.rs index 41db227ae40e06..40d02cbbbe1dcf 100644 --- a/client/tests/quic_client.rs +++ b/client/tests/quic_client.rs @@ -9,8 +9,8 @@ mod tests { }, solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::StakedNodes, quic::{spawn_server, StreamStats}, + streamer::StakedNodes, }, std::{ net::{SocketAddr, UdpSocket}, diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs index 700a1e91417d6d..ec02d080c96964 100644 --- a/core/src/find_packet_sender_stake_stage.rs +++ b/core/src/find_packet_sender_stake_stage.rs @@ -3,10 +3,7 @@ use { solana_measure::measure::Measure, solana_perf::packet::PacketBatch, solana_sdk::timing::timestamp, - solana_streamer::{ - nonblocking::quic::StakedNodes, - streamer::{self, StreamerError}, - }, + solana_streamer::streamer::{self, StakedNodes, StreamerError}, std::{ collections::HashMap, net::IpAddr, @@ -108,7 +105,7 @@ impl FindPacketSenderStakeStage { Measure::start("apply_sender_stakes_time"); let mut apply_stake = || { let ip_to_stake = staked_nodes.read().unwrap(); - Self::apply_sender_stakes(&mut batches, &ip_to_stake.0 .1); + Self::apply_sender_stakes(&mut batches, &ip_to_stake.stake_map); }; apply_stake(); apply_sender_stakes_time.stop(); diff --git a/core/src/staked_nodes_updater_service.rs b/core/src/staked_nodes_updater_service.rs index e87584e91b2f25..e26740a3c7a8cb 100644 --- a/core/src/staked_nodes_updater_service.rs +++ b/core/src/staked_nodes_updater_service.rs @@ -1,7 +1,7 @@ use { solana_gossip::cluster_info::ClusterInfo, solana_runtime::bank_forks::BankForks, - solana_streamer::nonblocking::quic::StakedNodes, + solana_streamer::streamer::StakedNodes, std::{ collections::HashMap, net::IpAddr, @@ -42,8 +42,8 @@ impl StakedNodesUpdaterService { &cluster_info, ) { let mut shared = shared_staked_nodes.write().unwrap(); - shared.0 .0 = total_stake as f64; - shared.0 .1 = new_ip_to_stake; + shared.total_stake = total_stake as f64; + shared.stake_map = new_ip_to_stake; } } }) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 836f175f1ed74b..320d1dcbd9e75a 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -31,8 +31,8 @@ use { }, solana_sdk::signature::Keypair, solana_streamer::{ - nonblocking::quic::StakedNodes, quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, + streamer::StakedNodes, }, std::{ net::UdpSocket, diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 80d42fd991589a..df17ad0b625bcd 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,5 +1,8 @@ use { - crate::quic::{configure_server, QuicServerError, StreamStats}, + crate::{ + quic::{configure_server, QuicServerError, StreamStats}, + streamer::StakedNodes, + }, crossbeam_channel::Sender, futures_util::stream::StreamExt, quinn::{Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection, VarInt}, @@ -22,10 +25,6 @@ use { tokio::{task::JoinHandle, time::timeout}, }; -// Total stake and nodes => stake map -#[derive(Default)] -pub struct StakedNodes(pub (f64, HashMap)); - const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64; #[allow(clippy::too_many_arguments)] @@ -104,9 +103,9 @@ pub async fn run_server( let (mut connection_table_l, stake) = { let staked_nodes = staked_nodes.read().unwrap(); - if let Some(stake) = staked_nodes.0 .1.get(&remote_addr.ip()) { + if let Some(stake) = staked_nodes.stake_map.get(&remote_addr.ip()) { let stake = *stake; - let total_stake = staked_nodes.0 .0; + let total_stake = staked_nodes.total_stake; drop(staked_nodes); let mut connection_table_l = staked_connection_table.lock().unwrap(); let num_pruned = connection_table_l.prune_oldest(max_staked_connections); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 5cf02a0f02f8aa..ef3788d6dd8e6d 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -1,5 +1,5 @@ use { - crate::nonblocking::quic::StakedNodes, + crate::streamer::StakedNodes, crossbeam_channel::Sender, pem::Pem, pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier}, diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 42a5fbbc4f0267..18383181127043 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -24,6 +24,13 @@ use { thiserror::Error, }; +// Total stake and nodes => stake map +#[derive(Default)] +pub struct StakedNodes { + pub total_stake: f64, + pub stake_map: HashMap, +} + pub type PacketBatchReceiver = Receiver; pub type PacketBatchSender = Sender;