Skip to content

Commit

Permalink
Allow configuration of replay thread pools from CLI (#236)
Browse files Browse the repository at this point in the history
Bubble up the constants to the CLI that control the sizes of the
following two thread pools:
- The thread pool used to replay multiple forks in parallel
- The thread pool used to execute transactions in parallel
  • Loading branch information
steviez authored and willhickey committed Mar 20, 2024
1 parent f344917 commit f2ef574
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 24 deletions.
22 changes: 11 additions & 11 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use {
solana_measure::measure::Measure,
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rayon_threadlimit::get_max_thread_count,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
rpc_subscriptions::RpcSubscriptions,
Expand Down Expand Up @@ -80,6 +79,7 @@ use {
solana_vote_program::vote_state::VoteTransaction,
std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
result,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand All @@ -95,11 +95,9 @@ pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64;
pub const MAX_UNCONFIRMED_SLOTS: usize = 5;
pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1;
pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD;

const MAX_VOTE_SIGNATURES: usize = 200;
const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000;
// Expect this number to be small enough to minimize thread pool overhead while large enough
// to be able to replay all active forks at the same time in most cases.
const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4;
const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10;

#[derive(PartialEq, Eq, Debug)]
Expand Down Expand Up @@ -291,7 +289,8 @@ pub struct ReplayStageConfig {
// Stops voting until this slot has been reached. Should be used to avoid
// duplicate voting which can lead to slashing.
pub wait_to_vote_slot: Option<Slot>,
pub replay_slots_concurrently: bool,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}

/// Timing information for the ReplayStage main processing loop
Expand Down Expand Up @@ -574,7 +573,8 @@ impl ReplayStage {
ancestor_hashes_replay_update_sender,
tower_storage,
wait_to_vote_slot,
replay_slots_concurrently,
replay_forks_threads,
replay_transactions_threads,
} = config;

trace!("replay stage");
Expand Down Expand Up @@ -654,19 +654,19 @@ impl ReplayStage {
)
};
// Thread pool to (maybe) replay multiple threads in parallel
let replay_mode = if replay_slots_concurrently {
let replay_mode = if replay_forks_threads.get() == 1 {
ForkReplayMode::Serial
} else {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
.num_threads(replay_forks_threads.get())
.thread_name(|i| format!("solReplayFork{i:02}"))
.build()
.expect("new rayon threadpool");
ForkReplayMode::Parallel(pool)
} else {
ForkReplayMode::Serial
};
// Thread pool to replay multiple transactions within one block in parallel
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.num_threads(replay_transactions_threads.get())
.thread_name(|i| format!("solReplayTx{i:02}"))
.build()
.expect("new rayon threadpool");
Expand Down
22 changes: 19 additions & 3 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use {
std::{
collections::HashSet,
net::{SocketAddr, UdpSocket},
num::NonZeroUsize,
sync::{atomic::AtomicBool, Arc, RwLock},
thread::{self, JoinHandle},
},
Expand Down Expand Up @@ -81,7 +82,6 @@ pub struct TvuSockets {
pub ancestor_hashes_requests: UdpSocket,
}

#[derive(Default)]
pub struct TvuConfig {
pub max_ledger_shreds: Option<u64>,
pub shred_version: u16,
Expand All @@ -90,7 +90,22 @@ pub struct TvuConfig {
// Validators which should be given priority when serving repairs
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
pub wait_for_vote_to_start_leader: bool,
pub replay_slots_concurrently: bool,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}

impl Default for TvuConfig {
fn default() -> Self {
Self {
max_ledger_shreds: None,
shred_version: 0,
repair_validators: None,
repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
wait_for_vote_to_start_leader: false,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
}
}
}

impl Tvu {
Expand Down Expand Up @@ -265,7 +280,8 @@ impl Tvu {
ancestor_hashes_replay_update_sender,
tower_storage: tower_storage.clone(),
wait_to_vote_slot,
replay_slots_concurrently: tvu_config.replay_slots_concurrently,
replay_forks_threads: tvu_config.replay_forks_threads,
replay_transactions_threads: tvu_config.replay_transactions_threads,
};

let (voting_sender, voting_receiver) = unbounded();
Expand Down
14 changes: 11 additions & 3 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use {
poh_service::{self, PohService},
},
solana_program_runtime::runtime_config::RuntimeConfig,
solana_rayon_threadlimit::get_max_thread_count,
solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
Expand Down Expand Up @@ -123,6 +124,7 @@ use {
std::{
collections::{HashMap, HashSet},
net::SocketAddr,
num::NonZeroUsize,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand Down Expand Up @@ -260,14 +262,15 @@ pub struct ValidatorConfig {
pub wait_to_vote_slot: Option<Slot>,
pub ledger_column_options: LedgerColumnOptions,
pub runtime_config: RuntimeConfig,
pub replay_slots_concurrently: bool,
pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
pub block_verification_method: BlockVerificationMethod,
pub block_production_method: BlockProductionMethod,
pub generator_config: Option<GeneratorConfig>,
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
pub wen_restart_proto_path: Option<PathBuf>,
pub unified_scheduler_handler_threads: Option<usize>,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -328,14 +331,15 @@ impl Default for ValidatorConfig {
wait_to_vote_slot: None,
ledger_column_options: LedgerColumnOptions::default(),
runtime_config: RuntimeConfig::default(),
replay_slots_concurrently: false,
banking_trace_dir_byte_limit: 0,
block_verification_method: BlockVerificationMethod::default(),
block_production_method: BlockProductionMethod::default(),
generator_config: None,
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
wen_restart_proto_path: None,
unified_scheduler_handler_threads: None,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
}
}
}
Expand All @@ -346,6 +350,9 @@ impl ValidatorConfig {
enforce_ulimit_nofile: false,
rpc_config: JsonRpcConfig::default_for_test(),
block_production_method: BlockProductionMethod::ThreadLocalMultiIterator,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(get_max_thread_count())
.expect("thread count is non-zero"),
..Self::default()
}
}
Expand Down Expand Up @@ -1305,7 +1312,8 @@ impl Validator {
repair_validators: config.repair_validators.clone(),
repair_whitelist: config.repair_whitelist.clone(),
wait_for_vote_to_start_leader,
replay_slots_concurrently: config.replay_slots_concurrently,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
},
&max_slots,
block_metadata_notifier,
Expand Down
3 changes: 2 additions & 1 deletion local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
wait_to_vote_slot: config.wait_to_vote_slot,
ledger_column_options: config.ledger_column_options.clone(),
runtime_config: config.runtime_config.clone(),
replay_slots_concurrently: config.replay_slots_concurrently,
banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit,
block_verification_method: config.block_verification_method.clone(),
block_production_method: config.block_production_method.clone(),
generator_config: config.generator_config.clone(),
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
}
}

Expand Down
19 changes: 14 additions & 5 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ use {
std::{path::PathBuf, str::FromStr},
};

pub mod thread_args;
use thread_args::{thread_args, DefaultThreadArgs};

const EXCLUDE_KEY: &str = "account-index-exclude-key";
const INCLUDE_KEY: &str = "account-index-include-key";
// The default minimal snapshot download speed (bytes/second)
Expand Down Expand Up @@ -1466,11 +1469,6 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.value_name("BYTES")
.help("Maximum number of bytes written to the program log before truncation"),
)
.arg(
Arg::with_name("replay_slots_concurrently")
.long("replay-slots-concurrently")
.help("Allow concurrent replay of slots on different forks"),
)
.arg(
Arg::with_name("banking_trace_dir_byte_limit")
// expose friendly alternative name to cli than internal
Expand Down Expand Up @@ -1555,6 +1553,7 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
",
),
)
.args(&thread_args(&default_args.thread_args))
.args(&get_deprecated_arguments())
.after_help("The default subcommand is run")
.subcommand(
Expand Down Expand Up @@ -2073,6 +2072,13 @@ fn deprecated_arguments() -> Vec<DeprecatedArg> {
.long("no-rocksdb-compaction")
.takes_value(false)
.help("Disable manual compaction of the ledger database"));
add_arg!(
Arg::with_name("replay_slots_concurrently")
.long("replay-slots-concurrently")
.help("Allow concurrent replay of slots on different forks")
.conflicts_with("replay_forks_threads"),
replaced_by: "replay_forks_threads",
usage_warning: "Equivalent behavior to this flag would be --replay-forks-threads 4");
add_arg!(Arg::with_name("rocksdb_compaction_interval")
.long("rocksdb-compaction-interval-slots")
.value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS")
Expand Down Expand Up @@ -2195,6 +2201,8 @@ pub struct DefaultArgs {
pub banking_trace_dir_byte_limit: String,

pub wen_restart_path: String,

pub thread_args: DefaultThreadArgs,
}

impl DefaultArgs {
Expand Down Expand Up @@ -2277,6 +2285,7 @@ impl DefaultArgs {
wait_for_restart_window_max_delinquent_stake: "5".to_string(),
banking_trace_dir_byte_limit: BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT.to_string(),
wen_restart_path: "wen_restart_progress.proto".to_string(),
thread_args: DefaultThreadArgs::default(),
}
}
}
Expand Down
115 changes: 115 additions & 0 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//! Arguments for controlling the number of threads allocated for various tasks
use {
clap::{value_t_or_exit, Arg, ArgMatches},
solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
solana_rayon_threadlimit::get_max_thread_count,
std::{num::NonZeroUsize, ops::RangeInclusive},
};

// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
pub struct DefaultThreadArgs {
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
}

impl Default for DefaultThreadArgs {
fn default() -> Self {
Self {
replay_forks_threads: ReplayForksThreadsArg::default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(),
}
}
}

pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
vec![
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
]
}

fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
Arg::with_name(T::NAME)
.long(T::LONG_NAME)
.takes_value(true)
.value_name("NUMBER")
.default_value(default)
.validator(|num| is_within_range(num, T::range()))
.hidden(hidden_unless_forced())
.help(T::HELP)
}

pub struct NumThreadConfig {
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}

pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
NumThreadConfig {
replay_forks_threads: if matches.is_present("replay_slots_concurrently") {
NonZeroUsize::new(4).expect("4 is non-zero")
} else {
value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize)
},
replay_transactions_threads: value_t_or_exit!(
matches,
ReplayTransactionsThreadsArg::NAME,
NonZeroUsize
),
}
}

/// Configuration for CLAP arguments that control the number of threads for various functions
trait ThreadArg {
/// The argument's name
const NAME: &'static str;
/// The argument's long name
const LONG_NAME: &'static str;
/// The argument's help message
const HELP: &'static str;

/// The default number of threads
fn default() -> usize;
/// The minimum allowed number of threads (inclusive)
fn min() -> usize {
1
}
/// The maximum allowed number of threads (inclusive)
fn max() -> usize {
// By default, no thread pool should scale over the number of the machine's threads
get_max_thread_count()
}
/// The range of allowed number of threads (inclusive on both ends)
fn range() -> RangeInclusive<usize> {
RangeInclusive::new(Self::min(), Self::max())
}
}

struct ReplayForksThreadsArg;
impl ThreadArg for ReplayForksThreadsArg {
const NAME: &'static str = "replay_forks_threads";
const LONG_NAME: &'static str = "replay-forks-threads";
const HELP: &'static str = "Number of threads to use for replay of blocks on different forks";

fn default() -> usize {
// Default to single threaded fork execution
1
}
fn max() -> usize {
// Choose a value that is small enough to limit the overhead of having a large thread pool
// while also being large enough to allow replay of all active forks in most scenarios
4
}
}

struct ReplayTransactionsThreadsArg;
impl ThreadArg for ReplayTransactionsThreadsArg {
const NAME: &'static str = "replay_transactions_threads";
const LONG_NAME: &'static str = "replay-transactions-threads";
const HELP: &'static str = "Number of threads to use for transaction replay";

fn default() -> usize {
get_max_thread_count()
}
}
Loading

0 comments on commit f2ef574

Please sign in to comment.