Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weight concurrent streams by stake #25993

Merged
merged 2 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use {
},
solana_measure::measure::Measure,
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS},
solana_sdk::quic::{
QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc},
Expand Down Expand Up @@ -395,7 +397,7 @@ impl QuicClient {

let chunks = buffers[1..buffers.len()]
.iter()
.chunks(QUIC_MAX_CONCURRENT_STREAMS);
.chunks(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS);

let futures: Vec<_> = chunks
.into_iter()
Expand Down
8 changes: 5 additions & 3 deletions client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ mod tests {
tpu_connection::TpuConnection,
},
solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::quic::{spawn_server, StreamStats},
solana_streamer::{
quic::{spawn_server, StreamStats},
streamer::StakedNodes,
},
std::{
collections::HashMap,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -28,7 +30,7 @@ mod tests {
let (sender, receiver) = unbounded();
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s.try_clone().unwrap(),
Expand Down
6 changes: 3 additions & 3 deletions core/src/find_packet_sender_stake_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_sdk::timing::timestamp,
solana_streamer::streamer::{self, StreamerError},
solana_streamer::streamer::{self, StakedNodes, StreamerError},
std::{
collections::HashMap,
net::IpAddr,
Expand Down Expand Up @@ -79,7 +79,7 @@ impl FindPacketSenderStakeStage {
pub fn new(
packet_receiver: streamer::PacketBatchReceiver,
sender: FindPacketSenderStakeSender,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
staked_nodes: Arc<RwLock<StakedNodes>>,
name: &'static str,
) -> Self {
let mut stats = FindPacketSenderStakeStats::default();
Expand All @@ -105,7 +105,7 @@ impl FindPacketSenderStakeStage {
Measure::start("apply_sender_stakes_time");
let mut apply_stake = || {
let ip_to_stake = staked_nodes.read().unwrap();
Self::apply_sender_stakes(&mut batches, &ip_to_stake);
Self::apply_sender_stakes(&mut batches, &ip_to_stake.stake_map);
};
apply_stake();
apply_sender_stakes_time.stop();
Expand Down
13 changes: 11 additions & 2 deletions core/src/staked_nodes_updater_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
solana_gossip::cluster_info::ClusterInfo,
solana_runtime::bank_forks::BankForks,
solana_streamer::streamer::StakedNodes,
std::{
collections::HashMap,
net::IpAddr,
Expand All @@ -24,22 +25,25 @@ impl StakedNodesUpdaterService {
exit: Arc<AtomicBool>,
cluster_info: Arc<ClusterInfo>,
bank_forks: Arc<RwLock<BankForks>>,
shared_staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
shared_staked_nodes: Arc<RwLock<StakedNodes>>,
) -> Self {
let thread_hdl = Builder::new()
.name("sol-sn-updater".to_string())
.spawn(move || {
let mut last_stakes = Instant::now();
while !exit.load(Ordering::Relaxed) {
let mut new_ip_to_stake = HashMap::new();
let mut total_stake = 0;
if Self::try_refresh_ip_to_stake(
&mut last_stakes,
&mut new_ip_to_stake,
&mut total_stake,
&bank_forks,
&cluster_info,
) {
let mut shared = shared_staked_nodes.write().unwrap();
*shared = new_ip_to_stake;
shared.total_stake = total_stake as f64;
shared.stake_map = new_ip_to_stake;
}
}
})
Expand All @@ -51,12 +55,17 @@ impl StakedNodesUpdaterService {
fn try_refresh_ip_to_stake(
last_stakes: &mut Instant,
ip_to_stake: &mut HashMap<IpAddr, u64>,
total_stake: &mut u64,
bank_forks: &RwLock<BankForks>,
cluster_info: &ClusterInfo,
) -> bool {
if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION {
let root_bank = bank_forks.read().unwrap().root_bank();
let staked_nodes = root_bank.staked_nodes();
*total_stake = staked_nodes
.iter()
.map(|(_pubkey, stake)| stake)
.sum::<u64>();
*ip_to_stake = cluster_info
.tvu_peers()
.into_iter()
Expand Down
8 changes: 4 additions & 4 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use {
vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
},
solana_sdk::signature::Keypair,
solana_streamer::quic::{
spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS,
solana_streamer::{
quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
std::{
collections::HashMap,
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
thread,
Expand Down Expand Up @@ -126,7 +126,7 @@ impl Tpu {
Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
);

let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let staked_nodes_updater_service = StakedNodesUpdaterService::new(
exit.clone(),
cluster_info.clone(),
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub const QUIC_PORT_OFFSET: u16 = 6;
// Empirically found max number of concurrent streams
// that seems to maximize TPS on GCE (higher values don't seem to
// give significant improvement or seem to impact stability)
pub const QUIC_MAX_CONCURRENT_STREAMS: usize = 2048;
pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What impact does this have on TPU clients? Are there some stats about TPS or network performance with and without this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see a huge impact of w/o this change in my tpu-client test:

With this branch

2022-06-17T20:47:16.938332781Z INFO solana_bench_tps::bench] ---------------------+---------------+--------------------
[2022-06-17T20:47:16.938343148Z INFO solana_bench_tps::bench] http://35.247.120.58:8899 | 67938.42 | 1213244
[2022-06-17T20:47:16.938348907Z INFO solana_bench_tps::bench]
Average max TPS: 67938.42, 0 nodes had 0 TPS
[2022-06-17T20:47:16.938360601Z INFO solana_bench_tps::bench]
Highest TPS: 67938.42 sampling period 1s max transactions: 1213244 clients: 1 drop rate: 0.60
[2022-06-17T20:47:16.938367384Z INFO solana_bench_tps::bench] Average TPS: 19827.145

[2022-06-17T20:55:40.669179958Z INFO solana_bench_tps::bench] http://35.247.120.58:8899 | 55039.97 | 1311307
[2022-06-17T20:55:40.669184817Z INFO solana_bench_tps::bench]
Average max TPS: 55039.97, 0 nodes had 0 TPS
[2022-06-17T20:55:40.669194603Z INFO solana_bench_tps::bench]
Highest TPS: 55039.97 sampling period 1s max transactions: 1311307 clients: 1 drop rate: 0.29
[2022-06-17T20:55:40.669198742Z INFO solana_bench_tps::bench] Average TPS: 21661.62

[2022-06-18T01:07:46.093266884Z INFO solana_bench_tps::bench] Node address | Max TPS | Total Transactions
[2022-06-18T01:07:46.093280556Z INFO solana_bench_tps::bench] ---------------------+---------------+--------------------
[2022-06-18T01:07:46.093285691Z INFO solana_bench_tps::bench] http://35.247.120.58:8899 | 58969.85 | 1367411
[2022-06-18T01:07:46.093301930Z INFO solana_bench_tps::bench]
Average max TPS: 58969.85, 0 nodes had 0 TPS
[2022-06-18T01:07:46.093292778Z INFO solana_metrics::metrics] datapoint: bench-tps-lamport_balance balance=14373355930776120i
[2022-06-18T01:07:46.093311329Z INFO solana_bench_tps::bench]
Highest TPS: 58969.85 sampling period 1s max transactions: 1367411 clients: 1 drop rate: 0.76
[2022-06-18T01:07:46.093349226Z INFO solana_bench_tps::bench] Average TPS: 22578.842

Master

[2022-06-17T21:33:02.909510265Z INFO solana_bench_tps::bench] Node address | Max TPS | Total Transactions
[2022-06-17T21:33:02.909517556Z INFO solana_bench_tps::bench] ---------------------+---------------+--------------------
[2022-06-17T21:33:02.909520144Z INFO solana_bench_tps::bench] http://35.247.120.58:8899 | 44006.95 | 1102674
[2022-06-17T21:33:02.909535908Z INFO solana_bench_tps::bench]
Average max TPS: 44006.95, 0 nodes had 0 TPS
[2022-06-17T21:33:02.909548109Z INFO solana_bench_tps::bench]
Highest TPS: 44006.95 sampling period 1s max transactions: 1102674 clients: 1 drop rate: 0.35
[2022-06-17T21:33:02.909526414Z INFO solana_metrics::metrics] datapoint: bench-tps-lamport_balance balance=14373355930776120i
[2022-06-17T21:33:02.909555115Z INFO solana_bench_tps::bench] Average TPS: 18090.404

[2022-06-17T21:38:51.706520412Z INFO solana_bench_tps::bench] Token balance: 14373355930776120
[2022-06-17T21:38:51.706553141Z INFO solana_bench_tps::bench] Node address | Max TPS | Total Transactions
[2022-06-17T21:38:51.706559304Z INFO solana_bench_tps::bench] ---------------------+---------------+--------------------
[2022-06-17T21:38:51.706562073Z INFO solana_bench_tps::bench] http://35.247.120.58:8899 | 65728.62 | 1300790
[2022-06-17T21:38:51.706573521Z INFO solana_bench_tps::bench]
Average max TPS: 65728.62, 0 nodes had 0 TPS
[2022-06-17T21:38:51.706581605Z INFO solana_bench_tps::bench]
Highest TPS: 65728.62 sampling period 1s max transactions: 1300790 clients: 1 drop rate: 0.34
[2022-06-17T21:38:51.706595604Z INFO solana_bench_tps::bench] Average TPS: 21306.563


pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000;
pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000;
34 changes: 26 additions & 8 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use {
crate::quic::{configure_server, QuicServerError, StreamStats},
crate::{
quic::{configure_server, QuicServerError, StreamStats},
streamer::StakedNodes,
},
crossbeam_channel::Sender,
futures_util::stream::StreamExt,
quinn::{Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection},
quinn::{Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection, VarInt},
solana_perf::packet::PacketBatch,
solana_sdk::{
packet::{Packet, PACKET_DATA_SIZE},
quic::QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
signature::Keypair,
timing,
},
Expand All @@ -21,6 +25,8 @@ use {
tokio::{task::JoinHandle, time::timeout},
};

const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How was this number chosen? With 383.3M SOL staked on mainnet, that means every 3833 SOL staked gives you a stream, which means that any node with less than that amount staked gets 0 streams and counts as unstaked. According to Solana Beach, on mainnet, the 1776th staked node has just about enough stake for one stream, which means that ~90% of validators on the network get a stream (including a long tail of delinquent or totally unstaked validators).

So this number might even be a little too high, unless we know that a validator can handle a total of 100k concurrent streams.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100k streams * 1280 bytes per packet would be ~128MB and ~81MB for unstaked (128 * 500 * 1280) so that sounds pretty conservative in terms of packet memory use. Each stream and connection has other additional overhead as well though.


#[allow(clippy::too_many_arguments)]
pub fn spawn_server(
sock: UdpSocket,
Expand All @@ -29,7 +35,7 @@ pub fn spawn_server(
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_ip: usize,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
Expand Down Expand Up @@ -59,7 +65,7 @@ pub async fn run_server(
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_ip: usize,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
Expand Down Expand Up @@ -97,18 +103,30 @@ pub async fn run_server(

let (mut connection_table_l, stake) = {
let staked_nodes = staked_nodes.read().unwrap();
if let Some(stake) = staked_nodes.get(&remote_addr.ip()) {
if let Some(stake) = staked_nodes.stake_map.get(&remote_addr.ip()) {
let stake = *stake;
let total_stake = staked_nodes.total_stake;
drop(staked_nodes);
let mut connection_table_l = staked_connection_table.lock().unwrap();
let num_pruned = connection_table_l.prune_oldest(max_staked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
connection.set_max_concurrent_uni_streams(
VarInt::from_u64(
((stake as f64 / total_stake as f64)
* QUIC_TOTAL_STAKED_CONCURRENT_STREAMS)
as u64,
)
.unwrap(),
);
(connection_table_l, stake)
} else {
drop(staked_nodes);
let mut connection_table_l = connection_table.lock().unwrap();
let num_pruned = connection_table_l.prune_oldest(max_unstaked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
connection.set_max_concurrent_uni_streams(
VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64).unwrap(),
);
(connection_table_l, 0)
}
};
Expand Down Expand Up @@ -449,7 +467,7 @@ pub mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
Expand Down Expand Up @@ -648,7 +666,7 @@ pub mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
Expand Down Expand Up @@ -678,7 +696,7 @@ pub mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
Expand Down
14 changes: 7 additions & 7 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use {
crate::streamer::StakedNodes,
crossbeam_channel::Sender,
pem::Pem,
pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier},
Expand All @@ -7,11 +8,10 @@ use {
solana_perf::packet::PacketBatch,
solana_sdk::{
packet::PACKET_DATA_SIZE,
quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS},
quic::{QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
signature::Keypair,
},
std::{
collections::HashMap,
error::Error,
net::{IpAddr, UdpSocket},
sync::{
Expand Down Expand Up @@ -49,7 +49,7 @@ pub(crate) fn configure_server(
let config = Arc::get_mut(&mut server_config.transport).unwrap();

// QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability
const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_CONCURRENT_STREAMS * 2) as u32;
const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS * 2) as u32;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated. i can't help but ask why this doubling is here rather than SDK where the variable is declared?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryleung-solana can you please remove the doubling now that streams don't have head-of-line blocking? #26086

config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
Expand Down Expand Up @@ -258,7 +258,7 @@ pub fn spawn_server(
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_ip: usize,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
Expand Down Expand Up @@ -306,7 +306,7 @@ mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
Expand Down Expand Up @@ -361,7 +361,7 @@ mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
Expand Down Expand Up @@ -403,7 +403,7 @@ mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
Expand Down
7 changes: 7 additions & 0 deletions streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ use {
thiserror::Error,
};

// Total stake and nodes => stake map
#[derive(Default)]
pub struct StakedNodes {
pub total_stake: f64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why is this an f64? Seems like a u64 would make more sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is mostly due to its float arithmetic? Like?

                        VarInt::from_u64(
                            ((stake as f64 / total_stake as f64)
                                * QUIC_TOTAL_STAKED_CONCURRENT_STREAMS)
                                as u64,
                        )
                        .unwrap(),

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should just flip the ops and drop the floats. floats are the devil

pub stake_map: HashMap<IpAddr, u64>,
}

pub type PacketBatchReceiver = Receiver<PacketBatch>;
pub type PacketBatchSender = Sender<PacketBatch>;

Expand Down