Skip to content

Commit

Permalink
Revert "Allow configuration of replay thread pools from CLI (#236)"
Browse files Browse the repository at this point in the history
This reverts commit 973d05c098b24364ad29a5066e78884c5c3a7537.
  • Loading branch information
willhickey committed Mar 22, 2024
1 parent b7f209f commit 32ac7fe
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 179 deletions.
22 changes: 11 additions & 11 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use {
solana_measure::measure::Measure,
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rayon_threadlimit::get_max_thread_count,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
rpc_subscriptions::RpcSubscriptions,
Expand Down Expand Up @@ -79,7 +80,6 @@ use {
solana_vote_program::vote_state::VoteTransaction,
std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
result,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand All @@ -95,9 +95,11 @@ pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64;
pub const MAX_UNCONFIRMED_SLOTS: usize = 5;
pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1;
pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD;

const MAX_VOTE_SIGNATURES: usize = 200;
const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000;
// Expect this number to be small enough to minimize thread pool overhead while large enough
// to be able to replay all active forks at the same time in most cases.
const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4;
const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10;

#[derive(PartialEq, Eq, Debug)]
Expand Down Expand Up @@ -289,8 +291,7 @@ pub struct ReplayStageConfig {
// Stops voting until this slot has been reached. Should be used to avoid
// duplicate voting which can lead to slashing.
pub wait_to_vote_slot: Option<Slot>,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub replay_slots_concurrently: bool,
}

/// Timing information for the ReplayStage main processing loop
Expand Down Expand Up @@ -573,8 +574,7 @@ impl ReplayStage {
ancestor_hashes_replay_update_sender,
tower_storage,
wait_to_vote_slot,
replay_forks_threads,
replay_transactions_threads,
replay_slots_concurrently,
} = config;

trace!("replay stage");
Expand Down Expand Up @@ -654,19 +654,19 @@ impl ReplayStage {
)
};
// Thread pool to (maybe) replay multiple threads in parallel
let replay_mode = if replay_forks_threads.get() == 1 {
ForkReplayMode::Serial
} else {
let replay_mode = if replay_slots_concurrently {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(replay_forks_threads.get())
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
.thread_name(|i| format!("solReplayFork{i:02}"))
.build()
.expect("new rayon threadpool");
ForkReplayMode::Parallel(pool)
} else {
ForkReplayMode::Serial
};
// Thread pool to replay multiple transactions within one block in parallel
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(replay_transactions_threads.get())
.num_threads(get_max_thread_count())
.thread_name(|i| format!("solReplayTx{i:02}"))
.build()
.expect("new rayon threadpool");
Expand Down
22 changes: 3 additions & 19 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ use {
std::{
collections::HashSet,
net::{SocketAddr, UdpSocket},
num::NonZeroUsize,
sync::{atomic::AtomicBool, Arc, RwLock},
thread::{self, JoinHandle},
},
Expand Down Expand Up @@ -82,6 +81,7 @@ pub struct TvuSockets {
pub ancestor_hashes_requests: UdpSocket,
}

#[derive(Default)]
pub struct TvuConfig {
pub max_ledger_shreds: Option<u64>,
pub shred_version: u16,
Expand All @@ -90,22 +90,7 @@ pub struct TvuConfig {
// Validators which should be given priority when serving repairs
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
pub wait_for_vote_to_start_leader: bool,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}

impl Default for TvuConfig {
fn default() -> Self {
Self {
max_ledger_shreds: None,
shred_version: 0,
repair_validators: None,
repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
wait_for_vote_to_start_leader: false,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
}
}
pub replay_slots_concurrently: bool,
}

impl Tvu {
Expand Down Expand Up @@ -280,8 +265,7 @@ impl Tvu {
ancestor_hashes_replay_update_sender,
tower_storage: tower_storage.clone(),
wait_to_vote_slot,
replay_forks_threads: tvu_config.replay_forks_threads,
replay_transactions_threads: tvu_config.replay_transactions_threads,
replay_slots_concurrently: tvu_config.replay_slots_concurrently,
};

let (voting_sender, voting_receiver) = unbounded();
Expand Down
14 changes: 3 additions & 11 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ use {
poh_service::{self, PohService},
},
solana_program_runtime::runtime_config::RuntimeConfig,
solana_rayon_threadlimit::get_max_thread_count,
solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
Expand Down Expand Up @@ -124,7 +123,6 @@ use {
std::{
collections::{HashMap, HashSet},
net::SocketAddr,
num::NonZeroUsize,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand Down Expand Up @@ -262,15 +260,14 @@ pub struct ValidatorConfig {
pub wait_to_vote_slot: Option<Slot>,
pub ledger_column_options: LedgerColumnOptions,
pub runtime_config: RuntimeConfig,
pub replay_slots_concurrently: bool,
pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
pub block_verification_method: BlockVerificationMethod,
pub block_production_method: BlockProductionMethod,
pub generator_config: Option<GeneratorConfig>,
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
pub wen_restart_proto_path: Option<PathBuf>,
pub unified_scheduler_handler_threads: Option<usize>,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -331,15 +328,14 @@ impl Default for ValidatorConfig {
wait_to_vote_slot: None,
ledger_column_options: LedgerColumnOptions::default(),
runtime_config: RuntimeConfig::default(),
replay_slots_concurrently: false,
banking_trace_dir_byte_limit: 0,
block_verification_method: BlockVerificationMethod::default(),
block_production_method: BlockProductionMethod::default(),
generator_config: None,
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
wen_restart_proto_path: None,
unified_scheduler_handler_threads: None,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
}
}
}
Expand All @@ -350,9 +346,6 @@ impl ValidatorConfig {
enforce_ulimit_nofile: false,
rpc_config: JsonRpcConfig::default_for_test(),
block_production_method: BlockProductionMethod::ThreadLocalMultiIterator,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(get_max_thread_count())
.expect("thread count is non-zero"),
..Self::default()
}
}
Expand Down Expand Up @@ -1312,8 +1305,7 @@ impl Validator {
repair_validators: config.repair_validators.clone(),
repair_whitelist: config.repair_whitelist.clone(),
wait_for_vote_to_start_leader,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
replay_slots_concurrently: config.replay_slots_concurrently,
},
&max_slots,
block_metadata_notifier,
Expand Down
3 changes: 1 addition & 2 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,14 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
wait_to_vote_slot: config.wait_to_vote_slot,
ledger_column_options: config.ledger_column_options.clone(),
runtime_config: config.runtime_config.clone(),
replay_slots_concurrently: config.replay_slots_concurrently,
banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit,
block_verification_method: config.block_verification_method.clone(),
block_production_method: config.block_production_method.clone(),
generator_config: config.generator_config.clone(),
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
}
}

Expand Down
19 changes: 5 additions & 14 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ use {
std::{path::PathBuf, str::FromStr},
};

pub mod thread_args;
use thread_args::{thread_args, DefaultThreadArgs};

const EXCLUDE_KEY: &str = "account-index-exclude-key";
const INCLUDE_KEY: &str = "account-index-include-key";
// The default minimal snapshot download speed (bytes/second)
Expand Down Expand Up @@ -1469,6 +1466,11 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.value_name("BYTES")
.help("Maximum number of bytes written to the program log before truncation"),
)
.arg(
Arg::with_name("replay_slots_concurrently")
.long("replay-slots-concurrently")
.help("Allow concurrent replay of slots on different forks"),
)
.arg(
Arg::with_name("banking_trace_dir_byte_limit")
// expose friendly alternative name to cli than internal
Expand Down Expand Up @@ -1553,7 +1555,6 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
",
),
)
.args(&thread_args(&default_args.thread_args))
.args(&get_deprecated_arguments())
.after_help("The default subcommand is run")
.subcommand(
Expand Down Expand Up @@ -2072,13 +2073,6 @@ fn deprecated_arguments() -> Vec<DeprecatedArg> {
.long("no-rocksdb-compaction")
.takes_value(false)
.help("Disable manual compaction of the ledger database"));
add_arg!(
Arg::with_name("replay_slots_concurrently")
.long("replay-slots-concurrently")
.help("Allow concurrent replay of slots on different forks")
.conflicts_with("replay_forks_threads"),
replaced_by: "replay_forks_threads",
usage_warning: "Equivalent behavior to this flag would be --replay-forks-threads 4");
add_arg!(Arg::with_name("rocksdb_compaction_interval")
.long("rocksdb-compaction-interval-slots")
.value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS")
Expand Down Expand Up @@ -2201,8 +2195,6 @@ pub struct DefaultArgs {
pub banking_trace_dir_byte_limit: String,

pub wen_restart_path: String,

pub thread_args: DefaultThreadArgs,
}

impl DefaultArgs {
Expand Down Expand Up @@ -2285,7 +2277,6 @@ impl DefaultArgs {
wait_for_restart_window_max_delinquent_stake: "5".to_string(),
banking_trace_dir_byte_limit: BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT.to_string(),
wen_restart_path: "wen_restart_progress.proto".to_string(),
thread_args: DefaultThreadArgs::default(),
}
}
}
Expand Down
115 changes: 0 additions & 115 deletions validator/src/cli/thread_args.rs

This file was deleted.

Loading

0 comments on commit 32ac7fe

Please sign in to comment.