Skip to content

Commit

Permalink
Allow configuration of replay thread pools from CLI (solana-labs#236)
Browse files Browse the repository at this point in the history
Bubble up the constants to the CLI that control the sizes of the
following two thread pools:
- The thread pool used to replay multiple forks in parallel
- The thread pool used to execute transactions in parallel
  • Loading branch information
steviez authored and palinko91 committed May 7, 2024
1 parent aac5e29 commit 77b11ec
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 49 deletions.
14 changes: 11 additions & 3 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ use {
poh_service::{self, PohService},
},
solana_program_runtime::runtime_config::RuntimeConfig,
solana_rayon_threadlimit::get_max_thread_count,
solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
Expand Down Expand Up @@ -124,6 +125,7 @@ use {
std::{
collections::{HashMap, HashSet},
net::SocketAddr,
num::NonZeroUsize,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand Down Expand Up @@ -261,14 +263,15 @@ pub struct ValidatorConfig {
pub wait_to_vote_slot: Option<Slot>,
pub ledger_column_options: LedgerColumnOptions,
pub runtime_config: RuntimeConfig,
pub replay_slots_concurrently: bool,
pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
pub block_verification_method: BlockVerificationMethod,
pub block_production_method: BlockProductionMethod,
pub generator_config: Option<GeneratorConfig>,
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
pub wen_restart_proto_path: Option<PathBuf>,
pub unified_scheduler_handler_threads: Option<usize>,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -329,14 +332,15 @@ impl Default for ValidatorConfig {
wait_to_vote_slot: None,
ledger_column_options: LedgerColumnOptions::default(),
runtime_config: RuntimeConfig::default(),
replay_slots_concurrently: false,
banking_trace_dir_byte_limit: 0,
block_verification_method: BlockVerificationMethod::default(),
block_production_method: BlockProductionMethod::default(),
generator_config: None,
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
wen_restart_proto_path: None,
unified_scheduler_handler_threads: None,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
}
}
}
Expand All @@ -347,6 +351,9 @@ impl ValidatorConfig {
enforce_ulimit_nofile: false,
rpc_config: JsonRpcConfig::default_for_test(),
block_production_method: BlockProductionMethod::ThreadLocalMultiIterator,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(get_max_thread_count())
.expect("thread count is non-zero"),
..Self::default()
}
}
Expand Down Expand Up @@ -1308,7 +1315,8 @@ impl Validator {
repair_validators: config.repair_validators.clone(),
repair_whitelist: config.repair_whitelist.clone(),
wait_for_vote_to_start_leader,
replay_slots_concurrently: config.replay_slots_concurrently,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
},
&max_slots,
block_metadata_notifier,
Expand Down
2 changes: 0 additions & 2 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
ip_echo_server_threads: config.ip_echo_server_threads,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
delay_leader_block_for_pending_fork: config.delay_leader_block_for_pending_fork,
}
}

Expand Down
43 changes: 0 additions & 43 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,23 @@ use {

// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
pub struct DefaultThreadArgs {
pub ip_echo_server_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
pub tvu_receive_threads: String,
}

impl Default for DefaultThreadArgs {
fn default() -> Self {
Self {
ip_echo_server_threads: IpEchoServerThreadsArg::default().to_string(),
replay_forks_threads: ReplayForksThreadsArg::default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(),
tvu_receive_threads: TvuReceiveThreadsArg::default().to_string(),
}
}
}

pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
vec![
new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
]
}

Expand All @@ -47,19 +41,12 @@ fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
}

pub struct NumThreadConfig {
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub tvu_receive_threads: NonZeroUsize,
}

pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
NumThreadConfig {
ip_echo_server_threads: value_t_or_exit!(
matches,
IpEchoServerThreadsArg::NAME,
NonZeroUsize
),
replay_forks_threads: if matches.is_present("replay_slots_concurrently") {
NonZeroUsize::new(4).expect("4 is non-zero")
} else {
Expand All @@ -70,7 +57,6 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
ReplayTransactionsThreadsArg::NAME,
NonZeroUsize
),
tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
}
}

Expand Down Expand Up @@ -100,20 +86,6 @@ trait ThreadArg {
}
}

struct IpEchoServerThreadsArg;
impl ThreadArg for IpEchoServerThreadsArg {
const NAME: &'static str = "ip_echo_server_threads";
const LONG_NAME: &'static str = "ip-echo-server-threads";
const HELP: &'static str = "Number of threads to use for the IP echo server";

fn default() -> usize {
solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS.get()
}
fn min() -> usize {
solana_net_utils::MINIMUM_IP_ECHO_SERVER_THREADS.get()
}
}

struct ReplayForksThreadsArg;
impl ThreadArg for ReplayForksThreadsArg {
const NAME: &'static str = "replay_forks_threads";
Expand Down Expand Up @@ -141,18 +113,3 @@ impl ThreadArg for ReplayTransactionsThreadsArg {
get_max_thread_count()
}
}

struct TvuReceiveThreadsArg;
impl ThreadArg for TvuReceiveThreadsArg {
const NAME: &'static str = "tvu_receive_threads";
const LONG_NAME: &'static str = "tvu-receive-threads";
const HELP: &'static str =
"Number of threads (and sockets) to use for receiving shreds on the TVU port";

fn default() -> usize {
solana_gossip::cluster_info::DEFAULT_NUM_TVU_SOCKETS.get()
}
fn min() -> usize {
solana_gossip::cluster_info::MINIMUM_NUM_TVU_SOCKETS.get()
}
}
8 changes: 7 additions & 1 deletion validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,11 @@ pub fn main() {
}
let full_api = matches.is_present("full_rpc_api");

let cli::thread_args::NumThreadConfig {
replay_forks_threads,
replay_transactions_threads,
} = cli::thread_args::parse_num_threads_args(&matches);

let mut validator_config = ValidatorConfig {
require_tower: matches.is_present("require_tower"),
tower_storage,
Expand Down Expand Up @@ -1446,12 +1451,13 @@ pub fn main() {
..RuntimeConfig::default()
},
staked_nodes_overrides: staked_nodes_overrides.clone(),
replay_slots_concurrently: matches.is_present("replay_slots_concurrently"),
use_snapshot_archives_at_startup: value_t_or_exit!(
matches,
use_snapshot_archives_at_startup::cli::NAME,
UseSnapshotArchivesAtStartup
),
replay_forks_threads,
replay_transactions_threads,
..ValidatorConfig::default()
};

Expand Down

0 comments on commit 77b11ec

Please sign in to comment.