Skip to content

Commit

Permalink
Weight concurrent streams by stake (backport #25993) (#26110)
Browse files Browse the repository at this point in the history
* Weight concurrent streams by stake (#25993)

Weight concurrent streams by stake for staked nodes
Ported changes from #25056 after address merge conflicts and some refactoring

(cherry picked from commit 61946a4)

* Updated quinn version to fix the comp issue with merge

* Fixed a missue Cargo.lock file

Co-authored-by: Lijun Wang <[email protected]>
  • Loading branch information
mergify[bot] and lijunwangs authored Jun 22, 2022
1 parent ba50e7f commit 5b864ef
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 39 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use {
},
solana_measure::measure::Measure,
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS},
solana_sdk::quic::{
QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc},
Expand Down Expand Up @@ -395,7 +397,7 @@ impl QuicClient {

let chunks = buffers[1..buffers.len()]
.iter()
.chunks(QUIC_MAX_CONCURRENT_STREAMS);
.chunks(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS);

let futures: Vec<_> = chunks
.into_iter()
Expand Down
8 changes: 5 additions & 3 deletions client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ mod tests {
tpu_connection::TpuConnection,
},
solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::quic::{spawn_server, StreamStats},
solana_streamer::{
quic::{spawn_server, StreamStats},
streamer::StakedNodes,
},
std::{
collections::HashMap,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -28,7 +30,7 @@ mod tests {
let (sender, receiver) = unbounded();
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s.try_clone().unwrap(),
Expand Down
6 changes: 3 additions & 3 deletions core/src/find_packet_sender_stake_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_sdk::timing::timestamp,
solana_streamer::streamer::{self, StreamerError},
solana_streamer::streamer::{self, StakedNodes, StreamerError},
std::{
collections::HashMap,
net::IpAddr,
Expand Down Expand Up @@ -79,7 +79,7 @@ impl FindPacketSenderStakeStage {
pub fn new(
packet_receiver: streamer::PacketBatchReceiver,
sender: FindPacketSenderStakeSender,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
staked_nodes: Arc<RwLock<StakedNodes>>,
name: &'static str,
) -> Self {
let mut stats = FindPacketSenderStakeStats::default();
Expand All @@ -105,7 +105,7 @@ impl FindPacketSenderStakeStage {
Measure::start("apply_sender_stakes_time");
let mut apply_stake = || {
let ip_to_stake = staked_nodes.read().unwrap();
Self::apply_sender_stakes(&mut batches, &ip_to_stake);
Self::apply_sender_stakes(&mut batches, &ip_to_stake.stake_map);
};
apply_stake();
apply_sender_stakes_time.stop();
Expand Down
13 changes: 11 additions & 2 deletions core/src/staked_nodes_updater_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
solana_gossip::cluster_info::ClusterInfo,
solana_runtime::bank_forks::BankForks,
solana_streamer::streamer::StakedNodes,
std::{
collections::HashMap,
net::IpAddr,
Expand All @@ -24,22 +25,25 @@ impl StakedNodesUpdaterService {
exit: Arc<AtomicBool>,
cluster_info: Arc<ClusterInfo>,
bank_forks: Arc<RwLock<BankForks>>,
shared_staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
shared_staked_nodes: Arc<RwLock<StakedNodes>>,
) -> Self {
let thread_hdl = Builder::new()
.name("sol-sn-updater".to_string())
.spawn(move || {
let mut last_stakes = Instant::now();
while !exit.load(Ordering::Relaxed) {
let mut new_ip_to_stake = HashMap::new();
let mut total_stake = 0;
if Self::try_refresh_ip_to_stake(
&mut last_stakes,
&mut new_ip_to_stake,
&mut total_stake,
&bank_forks,
&cluster_info,
) {
let mut shared = shared_staked_nodes.write().unwrap();
*shared = new_ip_to_stake;
shared.total_stake = total_stake as f64;
shared.stake_map = new_ip_to_stake;
}
}
})
Expand All @@ -51,12 +55,17 @@ impl StakedNodesUpdaterService {
fn try_refresh_ip_to_stake(
last_stakes: &mut Instant,
ip_to_stake: &mut HashMap<IpAddr, u64>,
total_stake: &mut u64,
bank_forks: &RwLock<BankForks>,
cluster_info: &ClusterInfo,
) -> bool {
if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION {
let root_bank = bank_forks.read().unwrap().root_bank();
let staked_nodes = root_bank.staked_nodes();
*total_stake = staked_nodes
.iter()
.map(|(_pubkey, stake)| stake)
.sum::<u64>();
*ip_to_stake = cluster_info
.tvu_peers()
.into_iter()
Expand Down
8 changes: 4 additions & 4 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use {
vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
},
solana_sdk::signature::Keypair,
solana_streamer::quic::{
spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS,
solana_streamer::{
quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
std::{
collections::HashMap,
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
thread,
Expand Down Expand Up @@ -127,7 +127,7 @@ impl Tpu {
Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
);

let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let staked_nodes_updater_service = StakedNodesUpdaterService::new(
exit.clone(),
cluster_info.clone(),
Expand Down
8 changes: 4 additions & 4 deletions programs/bpf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub const QUIC_PORT_OFFSET: u16 = 6;
// Empirically found max number of concurrent streams
// that seems to maximize TPS on GCE (higher values don't seem to
// give significant improvement or seem to impact stability)
pub const QUIC_MAX_CONCURRENT_STREAMS: usize = 2048;
pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128;

pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000;
pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000;
2 changes: 1 addition & 1 deletion streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ log = "0.4.14"
nix = "0.23.1"
pem = "1.0.2"
pkcs8 = { version = "0.8.0", features = ["alloc"] }
quinn = "0.8.1"
quinn = "0.8.3"
rand = "0.7.0"
rcgen = "0.9.2"
rustls = { version = "0.20.4", features = ["dangerous_configuration"] }
Expand Down
34 changes: 26 additions & 8 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use {
crate::quic::{configure_server, QuicServerError, StreamStats},
crate::{
quic::{configure_server, QuicServerError, StreamStats},
streamer::StakedNodes,
},
crossbeam_channel::Sender,
futures_util::stream::StreamExt,
quinn::{Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection},
quinn::{Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection, VarInt},
solana_perf::packet::PacketBatch,
solana_sdk::{
packet::{Packet, PACKET_DATA_SIZE},
quic::QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
signature::Keypair,
timing,
},
Expand All @@ -21,6 +25,8 @@ use {
tokio::{task::JoinHandle, time::timeout},
};

const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64;

#[allow(clippy::too_many_arguments)]
pub fn spawn_server(
sock: UdpSocket,
Expand All @@ -29,7 +35,7 @@ pub fn spawn_server(
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_ip: usize,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
Expand Down Expand Up @@ -59,7 +65,7 @@ pub async fn run_server(
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_ip: usize,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
Expand Down Expand Up @@ -97,18 +103,30 @@ pub async fn run_server(

let (mut connection_table_l, stake) = {
let staked_nodes = staked_nodes.read().unwrap();
if let Some(stake) = staked_nodes.get(&remote_addr.ip()) {
if let Some(stake) = staked_nodes.stake_map.get(&remote_addr.ip()) {
let stake = *stake;
let total_stake = staked_nodes.total_stake;
drop(staked_nodes);
let mut connection_table_l = staked_connection_table.lock().unwrap();
let num_pruned = connection_table_l.prune_oldest(max_staked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
connection.set_max_concurrent_uni_streams(
VarInt::from_u64(
((stake as f64 / total_stake as f64)
* QUIC_TOTAL_STAKED_CONCURRENT_STREAMS)
as u64,
)
.unwrap(),
);
(connection_table_l, stake)
} else {
drop(staked_nodes);
let mut connection_table_l = connection_table.lock().unwrap();
let num_pruned = connection_table_l.prune_oldest(max_unstaked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
connection.set_max_concurrent_uni_streams(
VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64).unwrap(),
);
(connection_table_l, 0)
}
};
Expand Down Expand Up @@ -449,7 +467,7 @@ pub mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
Expand Down Expand Up @@ -648,7 +666,7 @@ pub mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
Expand Down Expand Up @@ -678,7 +696,7 @@ pub mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
Expand Down
14 changes: 7 additions & 7 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use {
crate::streamer::StakedNodes,
crossbeam_channel::Sender,
pem::Pem,
pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier},
Expand All @@ -7,11 +8,10 @@ use {
solana_perf::packet::PacketBatch,
solana_sdk::{
packet::PACKET_DATA_SIZE,
quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS},
quic::{QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
signature::Keypair,
},
std::{
collections::HashMap,
error::Error,
net::{IpAddr, UdpSocket},
sync::{
Expand Down Expand Up @@ -49,7 +49,7 @@ pub(crate) fn configure_server(
let config = Arc::get_mut(&mut server_config.transport).unwrap();

// QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability
const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_CONCURRENT_STREAMS * 2) as u32;
const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS * 2) as u32;
config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
Expand Down Expand Up @@ -258,7 +258,7 @@ pub fn spawn_server(
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_ip: usize,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
Expand Down Expand Up @@ -306,7 +306,7 @@ mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
Expand Down Expand Up @@ -361,7 +361,7 @@ mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
Expand Down Expand Up @@ -403,7 +403,7 @@ mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
Expand Down
7 changes: 7 additions & 0 deletions streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ use {
thiserror::Error,
};

// Total stake and nodes => stake map
#[derive(Default)]
pub struct StakedNodes {
pub total_stake: f64,
pub stake_map: HashMap<IpAddr, u64>,
}

pub type PacketBatchReceiver = Receiver<PacketBatch>;
pub type PacketBatchSender = Sender<PacketBatch>;

Expand Down

0 comments on commit 5b864ef

Please sign in to comment.