diff --git a/quic-client/src/lib.rs b/quic-client/src/lib.rs index 86ddd154fc8b13..e9741777ccb6ac 100644 --- a/quic-client/src/lib.rs +++ b/quic-client/src/lib.rs @@ -26,11 +26,7 @@ use { pubkey::Pubkey, signature::{Keypair, Signer}, }, - solana_streamer::{ - nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType}, - streamer::StakedNodes, - tls_certificates::new_dummy_x509_certificate, - }, + solana_streamer::{streamer::StakedNodes, tls_certificates::new_dummy_x509_certificate}, std::{ net::{IpAddr, SocketAddr}, sync::{Arc, RwLock}, @@ -65,13 +61,12 @@ impl ConnectionPool for QuicPool { fn create_pool_entry( &self, - config: &Self::NewConnectionConfig, + _config: &Self::NewConnectionConfig, addr: &SocketAddr, ) -> Arc { Arc::new(Quic(Arc::new(QuicClient::new( self.endpoint.clone(), *addr, - config.compute_max_parallel_streams(), )))) } } @@ -120,24 +115,6 @@ impl QuicConfig { QuicLazyInitializedEndpoint::new(cert_guard.clone(), self.client_endpoint.as_ref().cloned()) } - fn compute_max_parallel_streams(&self) -> usize { - let (client_type, total_stake) = - self.maybe_client_pubkey - .map_or((ConnectionPeerType::Unstaked, 0), |pubkey| { - self.maybe_staked_nodes.as_ref().map_or( - (ConnectionPeerType::Unstaked, 0), - |stakes| { - let rstakes = stakes.read().unwrap(); - rstakes.get_node_stake(&pubkey).map_or( - (ConnectionPeerType::Unstaked, rstakes.total_stake()), - |stake| (ConnectionPeerType::Staked(stake), rstakes.total_stake()), - ) - }, - ) - }); - compute_max_allowed_uni_streams(client_type, total_stake) - } - pub fn update_client_certificate(&mut self, keypair: &Keypair, _ipaddr: IpAddr) { let (cert, priv_key) = new_dummy_x509_certificate(keypair); @@ -250,59 +227,3 @@ pub fn new_quic_connection_cache( let connection_manager = QuicConnectionManager::new_with_connection_config(config); ConnectionCache::new(name, connection_manager, connection_pool_size) } - -#[cfg(test)] -mod tests { - use { - super::*, - solana_sdk::quic::{ - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, - }, - std::collections::HashMap, - }; - - #[test] - fn test_connection_cache_max_parallel_chunks() { - solana_logger::setup(); - - let mut connection_config = QuicConfig::new().unwrap(); - assert_eq!( - connection_config.compute_max_parallel_streams(), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - - let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - let pubkey = Pubkey::new_unique(); - connection_config.set_staked_nodes(&staked_nodes, &pubkey); - assert_eq!( - connection_config.compute_max_parallel_streams(), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - let overrides = HashMap::::default(); - let mut stakes = HashMap::from([(Pubkey::new_unique(), 10_000)]); - *staked_nodes.write().unwrap() = - StakedNodes::new(Arc::new(stakes.clone()), overrides.clone()); - assert_eq!( - connection_config.compute_max_parallel_streams(), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - - stakes.insert(pubkey, 1); - *staked_nodes.write().unwrap() = - StakedNodes::new(Arc::new(stakes.clone()), overrides.clone()); - let delta = - (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; - - assert_eq!( - connection_config.compute_max_parallel_streams(), - (QUIC_MIN_STAKED_CONCURRENT_STREAMS as f64 + (1f64 / 10000f64) * delta) as usize - ); - stakes.insert(pubkey, 1_000); - *staked_nodes.write().unwrap() = StakedNodes::new(Arc::new(stakes.clone()), overrides); - assert_ne!( - connection_config.compute_max_parallel_streams(), - QUIC_MIN_STAKED_CONCURRENT_STREAMS - ); - } -} diff --git a/quic-client/src/nonblocking/quic_client.rs b/quic-client/src/nonblocking/quic_client.rs index c6a66fe56bcd10..0d6bede4149f69 100644 --- a/quic-client/src/nonblocking/quic_client.rs +++ b/quic-client/src/nonblocking/quic_client.rs @@ -4,8 +4,7 @@ use { async_lock::Mutex, async_trait::async_trait, - futures::future::{join_all, TryFutureExt}, - itertools::Itertools, + futures::future::TryFutureExt, log::*, quinn::{ crypto::rustls::QuicClientConfig, ClientConfig, ClosedStream, ConnectError, Connection, @@ -20,10 +19,7 @@ use { solana_net_utils::VALIDATOR_PORT_RANGE, solana_rpc_client_api::client_error::ErrorKind as ClientErrorKind, solana_sdk::{ - quic::{ - QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT, - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, - }, + quic::{QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT}, signature::Keypair, transport::Result as TransportResult, }, @@ -284,21 +280,15 @@ pub struct QuicClient { connection: Arc>>, addr: SocketAddr, stats: Arc, - chunk_size: usize, } impl QuicClient { - pub fn new( - endpoint: Arc, - addr: SocketAddr, - chunk_size: usize, - ) -> Self { + pub fn new(endpoint: Arc, addr: SocketAddr) -> Self { Self { endpoint, connection: Arc::new(Mutex::new(None)), addr, stats: Arc::new(ClientStats::default()), - chunk_size, } } @@ -307,7 +297,6 @@ impl QuicClient { connection: &Connection, ) -> Result<(), QuicError> { let mut send_stream = connection.open_uni().await?; - send_stream.write_all(data).await?; Ok(()) } @@ -520,28 +509,8 @@ impl QuicClient { .await .map_err(Into::::into)?; - // Used to avoid dereferencing the Arc multiple times below - // by just getting a reference to the NewConnection once - let connection_ref: &Connection = &connection; - - let chunks = buffers[1..buffers.len()].iter().chunks(self.chunk_size); - - let futures: Vec<_> = chunks - .into_iter() - .map(|buffs| { - join_all( - buffs - .into_iter() - .map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)), - ) - }) - .collect(); - - for f in futures { - f.await - .into_iter() - .try_for_each(|res| res) - .map_err(Into::::into)?; + for data in buffers[1..buffers.len()].iter() { + Self::_send_buffer_using_conn(data.as_ref(), &connection).await?; } Ok(()) } @@ -574,11 +543,7 @@ impl QuicClientConnection { addr: SocketAddr, connection_stats: Arc, ) -> Self { - let client = Arc::new(QuicClient::new( - endpoint, - addr, - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, - )); + let client = Arc::new(QuicClient::new(endpoint, addr)); Self::new_with_client(client, connection_stats) }