diff --git a/Cargo.lock b/Cargo.lock index 7841faa2d87d14..3c92a68f9bfb0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6542,6 +6542,7 @@ dependencies = [ "solana-logger", "solana-sdk", "solana-version", + "static_assertions", "tokio", "url 2.5.0", ] diff --git a/core/src/validator.rs b/core/src/validator.rs index 98a267aeafc71a..32b165f77246c9 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -269,6 +269,7 @@ pub struct ValidatorConfig { pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup, pub wen_restart_proto_path: Option, pub unified_scheduler_handler_threads: Option, + pub ip_echo_server_threads: NonZeroUsize, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, } @@ -338,6 +339,7 @@ impl Default for ValidatorConfig { use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(), wen_restart_proto_path: None, unified_scheduler_handler_threads: None, + ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"), replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), } @@ -1079,6 +1081,7 @@ impl Validator { None => None, Some(tcp_listener) => Some(solana_net_utils::ip_echo_server( tcp_listener, + config.ip_echo_server_threads, Some(node.info.shred_version()), )), }; diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index d1c726051e6558..76ab14f27a6b4a 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -7,6 +7,7 @@ use { solana_client::{ connection_cache::ConnectionCache, rpc_client::RpcClient, tpu_client::TpuClientWrapper, }, + solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS, solana_perf::recycler::Recycler, solana_runtime::bank_forks::BankForks, solana_sdk::{ @@ -159,8 +160,14 @@ pub fn discover( if let Some(my_gossip_addr) = my_gossip_addr { info!("Gossip Address: {:?}", my_gossip_addr); } - let _ip_echo_server = ip_echo - .map(|tcp_listener| solana_net_utils::ip_echo_server(tcp_listener, Some(my_shred_version))); + + let _ip_echo_server = ip_echo.map(|tcp_listener| { + solana_net_utils::ip_echo_server( + tcp_listener, + DEFAULT_IP_ECHO_SERVER_THREADS, + Some(my_shred_version), + ) + }); let (met_criteria, elapsed, all_peers, tvu_peers) = spy( spy_ref.clone(), num_nodes, diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 45045203412a73..0e4ee5a9af31ff 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -68,6 +68,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup, wen_restart_proto_path: config.wen_restart_proto_path.clone(), unified_scheduler_handler_threads: config.unified_scheduler_handler_threads, + ip_echo_server_threads: config.ip_echo_server_threads, replay_forks_threads: config.replay_forks_threads, replay_transactions_threads: config.replay_transactions_threads, } diff --git a/net-utils/Cargo.toml b/net-utils/Cargo.toml index 0d8a82f7a994cd..3486b30bbb9cda 100644 --- a/net-utils/Cargo.toml +++ b/net-utils/Cargo.toml @@ -22,6 +22,7 @@ socket2 = { workspace = true } solana-logger = { workspace = true } solana-sdk = { workspace = true } solana-version = { workspace = true } +static_assertions = { workspace = true } tokio = { workspace = true, features = ["full"] } url = { workspace = true } diff --git a/net-utils/src/bin/ip_address_server.rs b/net-utils/src/bin/ip_address_server.rs index a194ad2c5cf2b8..6d5b7939ce4782 100644 --- a/net-utils/src/bin/ip_address_server.rs +++ b/net-utils/src/bin/ip_address_server.rs @@ -1,5 +1,6 @@ use { clap::{Arg, Command}, + solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS, std::net::{Ipv4Addr, SocketAddr, TcpListener}, }; @@ -21,7 +22,11 @@ fn main() { .unwrap_or_else(|_| panic!("Unable to parse {port}")); let bind_addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, port)); let tcp_listener = TcpListener::bind(bind_addr).expect("unable to start tcp listener"); - let _runtime = solana_net_utils::ip_echo_server(tcp_listener, /*shred_version=*/ None); + let _runtime = solana_net_utils::ip_echo_server( + tcp_listener, + DEFAULT_IP_ECHO_SERVER_THREADS, + /*shred_version=*/ None, + ); loop { std::thread::park(); } diff --git a/net-utils/src/ip_echo_server.rs b/net-utils/src/ip_echo_server.rs index 64fbedadc7acf9..2d5782dcae1cdc 100644 --- a/net-utils/src/ip_echo_server.rs +++ b/net-utils/src/ip_echo_server.rs @@ -6,6 +6,7 @@ use { std::{ io, net::{IpAddr, SocketAddr}, + num::NonZeroUsize, time::Duration, }, tokio::{ @@ -18,6 +19,14 @@ use { pub type IpEchoServer = Runtime; +// Enforce a minimum of two threads: +// - One thread to monitor the TcpListener and spawn async tasks +// - One thread to service the spawned tasks +// The unsafe is safe because we're using a fixed, known non-zero value +pub const MINIMUM_IP_ECHO_SERVER_THREADS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(2) }; +// IP echo requests require little computation and come in fairly infrequently, +// so keep the number of server workers small to avoid overhead +pub const DEFAULT_IP_ECHO_SERVER_THREADS: NonZeroUsize = MINIMUM_IP_ECHO_SERVER_THREADS; pub const MAX_PORT_COUNT_PER_MESSAGE: usize = 4; const IO_TIMEOUT: Duration = Duration::from_secs(5); @@ -168,6 +177,7 @@ async fn run_echo_server(tcp_listener: std::net::TcpListener, shred_version: Opt /// connects. Used by |get_public_ip_addr| pub fn ip_echo_server( tcp_listener: std::net::TcpListener, + num_server_threads: NonZeroUsize, // Cluster shred-version of the node running the server. shred_version: Option, ) -> IpEchoServer { @@ -175,6 +185,7 @@ pub fn ip_echo_server( let runtime = tokio::runtime::Builder::new_multi_thread() .thread_name("solIpEchoSrvrRt") + .worker_threads(num_server_threads.get()) .enable_all() .build() .expect("new tokio runtime"); diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 1ff48173def0da..2d1b6249f3fcb1 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -16,7 +16,10 @@ use { }; mod ip_echo_server; -pub use ip_echo_server::{ip_echo_server, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE}; +pub use ip_echo_server::{ + ip_echo_server, IpEchoServer, DEFAULT_IP_ECHO_SERVER_THREADS, MAX_PORT_COUNT_PER_MESSAGE, + MINIMUM_IP_ECHO_SERVER_THREADS, +}; use ip_echo_server::{IpEchoServerMessage, IpEchoServerResponse}; /// A data type representing a public Udp socket @@ -744,7 +747,11 @@ mod tests { let (_server_port, (server_udp_socket, server_tcp_listener)) = bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); - let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(42)); + let _runtime = ip_echo_server( + server_tcp_listener, + DEFAULT_IP_ECHO_SERVER_THREADS, + /*shred_version=*/ Some(42), + ); let server_ip_echo_addr = server_udp_socket.local_addr().unwrap(); assert_eq!( @@ -764,7 +771,11 @@ mod tests { let (client_port, (client_udp_socket, client_tcp_listener)) = bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); - let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(65535)); + let _runtime = ip_echo_server( + server_tcp_listener, + DEFAULT_IP_ECHO_SERVER_THREADS, + /*shred_version=*/ Some(65535), + ); let ip_echo_server_addr = server_udp_socket.local_addr().unwrap(); assert_eq!( diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 3a24fed647a1a4..3696f9ba5cb30d 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5323,6 +5323,7 @@ dependencies = [ "solana-logger", "solana-sdk", "solana-version", + "static_assertions", "tokio", "url 2.5.0", ] diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 53d8cf15d984a0..4c3221f9e661fe 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -9,6 +9,7 @@ use { // Need this struct to provide &str whose lifetime matches that of the CLAP Arg's pub struct DefaultThreadArgs { + pub ip_echo_server_threads: String, pub replay_forks_threads: String, pub replay_transactions_threads: String, } @@ -16,6 +17,7 @@ pub struct DefaultThreadArgs { impl Default for DefaultThreadArgs { fn default() -> Self { Self { + ip_echo_server_threads: IpEchoServerThreadsArg::default().to_string(), replay_forks_threads: ReplayForksThreadsArg::default().to_string(), replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(), } @@ -24,6 +26,7 @@ impl Default for DefaultThreadArgs { pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { vec![ + new_thread_arg::(&defaults.ip_echo_server_threads), new_thread_arg::(&defaults.replay_forks_threads), new_thread_arg::(&defaults.replay_transactions_threads), ] @@ -41,12 +44,18 @@ fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> { } pub struct NumThreadConfig { + pub ip_echo_server_threads: NonZeroUsize, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, } pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { NumThreadConfig { + ip_echo_server_threads: value_t_or_exit!( + matches, + IpEchoServerThreadsArg::NAME, + NonZeroUsize + ), replay_forks_threads: if matches.is_present("replay_slots_concurrently") { NonZeroUsize::new(4).expect("4 is non-zero") } else { @@ -86,6 +95,20 @@ trait ThreadArg { } } +struct IpEchoServerThreadsArg; +impl ThreadArg for IpEchoServerThreadsArg { + const NAME: &'static str = "ip_echo_server_threads"; + const LONG_NAME: &'static str = "ip-echo-server-threads"; + const HELP: &'static str = "Number of threads to use for the IP echo server"; + + fn default() -> usize { + solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS.get() + } + fn min() -> usize { + solana_net_utils::MINIMUM_IP_ECHO_SERVER_THREADS.get() + } +} + struct ReplayForksThreadsArg; impl ThreadArg for ReplayForksThreadsArg { const NAME: &'static str = "replay_forks_threads"; diff --git a/validator/src/main.rs b/validator/src/main.rs index 56050031975a52..151281bc8ae874 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1332,6 +1332,7 @@ pub fn main() { let full_api = matches.is_present("full_rpc_api"); let cli::thread_args::NumThreadConfig { + ip_echo_server_threads, replay_forks_threads, replay_transactions_threads, } = cli::thread_args::parse_num_threads_args(&matches); @@ -1474,6 +1475,7 @@ pub fn main() { use_snapshot_archives_at_startup::cli::NAME, UseSnapshotArchivesAtStartup ), + ip_echo_server_threads, replay_forks_threads, replay_transactions_threads, ..ValidatorConfig::default()