Skip to content

Commit

Permalink
Reduce the default number of IP echo server threads (#354)
Browse files Browse the repository at this point in the history
The IP echo server currently spins up a worker thread for every thread
on the machine. Observing some data for nodes,
- MNB validators and RPC nodes look to get several hundred of these
  requests per day
- MNB entrypoint nodes look to get 2-3 requests per second on average

In both instances, the current threadpool is severely overprovisioned
which is a waste of resources. This PR plumnbs a flag to control the
number of worker threads for this pool as well as setting a default of
two threads for this server. Two threads allow for one thread to always
listen on the TCP port while the other thread processes requests
steviez authored Apr 1, 2024
1 parent 92c9b45 commit 79e316e
Showing 11 changed files with 72 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
@@ -269,6 +269,7 @@ pub struct ValidatorConfig {
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
pub wen_restart_proto_path: Option<PathBuf>,
pub unified_scheduler_handler_threads: Option<usize>,
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}
@@ -338,6 +339,7 @@ impl Default for ValidatorConfig {
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
wen_restart_proto_path: None,
unified_scheduler_handler_threads: None,
ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
}
@@ -1079,6 +1081,7 @@ impl Validator {
None => None,
Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
tcp_listener,
config.ip_echo_server_threads,
Some(node.info.shred_version()),
)),
};
11 changes: 9 additions & 2 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ use {
solana_client::{
connection_cache::ConnectionCache, rpc_client::RpcClient, tpu_client::TpuClientWrapper,
},
solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS,
solana_perf::recycler::Recycler,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
@@ -159,8 +160,14 @@ pub fn discover(
if let Some(my_gossip_addr) = my_gossip_addr {
info!("Gossip Address: {:?}", my_gossip_addr);
}
let _ip_echo_server = ip_echo
.map(|tcp_listener| solana_net_utils::ip_echo_server(tcp_listener, Some(my_shred_version)));

let _ip_echo_server = ip_echo.map(|tcp_listener| {
solana_net_utils::ip_echo_server(
tcp_listener,
DEFAULT_IP_ECHO_SERVER_THREADS,
Some(my_shred_version),
)
});
let (met_criteria, elapsed, all_peers, tvu_peers) = spy(
spy_ref.clone(),
num_nodes,
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
@@ -68,6 +68,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
ip_echo_server_threads: config.ip_echo_server_threads,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
}
1 change: 1 addition & 0 deletions net-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ socket2 = { workspace = true }
solana-logger = { workspace = true }
solana-sdk = { workspace = true }
solana-version = { workspace = true }
static_assertions = { workspace = true }
tokio = { workspace = true, features = ["full"] }
url = { workspace = true }

7 changes: 6 additions & 1 deletion net-utils/src/bin/ip_address_server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
clap::{Arg, Command},
solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS,
std::net::{Ipv4Addr, SocketAddr, TcpListener},
};

@@ -21,7 +22,11 @@ fn main() {
.unwrap_or_else(|_| panic!("Unable to parse {port}"));
let bind_addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, port));
let tcp_listener = TcpListener::bind(bind_addr).expect("unable to start tcp listener");
let _runtime = solana_net_utils::ip_echo_server(tcp_listener, /*shred_version=*/ None);
let _runtime = solana_net_utils::ip_echo_server(
tcp_listener,
DEFAULT_IP_ECHO_SERVER_THREADS,
/*shred_version=*/ None,
);
loop {
std::thread::park();
}
11 changes: 11 additions & 0 deletions net-utils/src/ip_echo_server.rs
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ use {
std::{
io,
net::{IpAddr, SocketAddr},
num::NonZeroUsize,
time::Duration,
},
tokio::{
@@ -18,6 +19,14 @@ use {

pub type IpEchoServer = Runtime;

// Enforce a minimum of two threads:
// - One thread to monitor the TcpListener and spawn async tasks
// - One thread to service the spawned tasks
// The unsafe is safe because we're using a fixed, known non-zero value
pub const MINIMUM_IP_ECHO_SERVER_THREADS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(2) };
// IP echo requests require little computation and come in fairly infrequently,
// so keep the number of server workers small to avoid overhead
pub const DEFAULT_IP_ECHO_SERVER_THREADS: NonZeroUsize = MINIMUM_IP_ECHO_SERVER_THREADS;
pub const MAX_PORT_COUNT_PER_MESSAGE: usize = 4;

const IO_TIMEOUT: Duration = Duration::from_secs(5);
@@ -168,13 +177,15 @@ async fn run_echo_server(tcp_listener: std::net::TcpListener, shred_version: Opt
/// connects. Used by |get_public_ip_addr|
pub fn ip_echo_server(
tcp_listener: std::net::TcpListener,
num_server_threads: NonZeroUsize,
// Cluster shred-version of the node running the server.
shred_version: Option<u16>,
) -> IpEchoServer {
tcp_listener.set_nonblocking(true).unwrap();

let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("solIpEchoSrvrRt")
.worker_threads(num_server_threads.get())
.enable_all()
.build()
.expect("new tokio runtime");
17 changes: 14 additions & 3 deletions net-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -16,7 +16,10 @@ use {
};

mod ip_echo_server;
pub use ip_echo_server::{ip_echo_server, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE};
pub use ip_echo_server::{
ip_echo_server, IpEchoServer, DEFAULT_IP_ECHO_SERVER_THREADS, MAX_PORT_COUNT_PER_MESSAGE,
MINIMUM_IP_ECHO_SERVER_THREADS,
};
use ip_echo_server::{IpEchoServerMessage, IpEchoServerResponse};

/// A data type representing a public Udp socket
@@ -744,7 +747,11 @@ mod tests {
let (_server_port, (server_udp_socket, server_tcp_listener)) =
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();

let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(42));
let _runtime = ip_echo_server(
server_tcp_listener,
DEFAULT_IP_ECHO_SERVER_THREADS,
/*shred_version=*/ Some(42),
);

let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
assert_eq!(
@@ -764,7 +771,11 @@ mod tests {
let (client_port, (client_udp_socket, client_tcp_listener)) =
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();

let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(65535));
let _runtime = ip_echo_server(
server_tcp_listener,
DEFAULT_IP_ECHO_SERVER_THREADS,
/*shred_version=*/ Some(65535),
);

let ip_echo_server_addr = server_udp_socket.local_addr().unwrap();
assert_eq!(
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
@@ -9,13 +9,15 @@ use {

// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
pub struct DefaultThreadArgs {
pub ip_echo_server_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
}

impl Default for DefaultThreadArgs {
fn default() -> Self {
Self {
ip_echo_server_threads: IpEchoServerThreadsArg::default().to_string(),
replay_forks_threads: ReplayForksThreadsArg::default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(),
}
@@ -24,6 +26,7 @@ impl Default for DefaultThreadArgs {

pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
vec![
new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
]
@@ -41,12 +44,18 @@ fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
}

pub struct NumThreadConfig {
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}

pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
NumThreadConfig {
ip_echo_server_threads: value_t_or_exit!(
matches,
IpEchoServerThreadsArg::NAME,
NonZeroUsize
),
replay_forks_threads: if matches.is_present("replay_slots_concurrently") {
NonZeroUsize::new(4).expect("4 is non-zero")
} else {
@@ -86,6 +95,20 @@ trait ThreadArg {
}
}

struct IpEchoServerThreadsArg;
impl ThreadArg for IpEchoServerThreadsArg {
const NAME: &'static str = "ip_echo_server_threads";
const LONG_NAME: &'static str = "ip-echo-server-threads";
const HELP: &'static str = "Number of threads to use for the IP echo server";

fn default() -> usize {
solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS.get()
}
fn min() -> usize {
solana_net_utils::MINIMUM_IP_ECHO_SERVER_THREADS.get()
}
}

struct ReplayForksThreadsArg;
impl ThreadArg for ReplayForksThreadsArg {
const NAME: &'static str = "replay_forks_threads";
2 changes: 2 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1332,6 +1332,7 @@ pub fn main() {
let full_api = matches.is_present("full_rpc_api");

let cli::thread_args::NumThreadConfig {
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
} = cli::thread_args::parse_num_threads_args(&matches);
@@ -1474,6 +1475,7 @@ pub fn main() {
use_snapshot_archives_at_startup::cli::NAME,
UseSnapshotArchivesAtStartup
),
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
..ValidatorConfig::default()

0 comments on commit 79e316e

Please sign in to comment.