Skip to content

Commit

Permalink
Make ReplayStage own the threadpool for tx replay (#190)
Browse files Browse the repository at this point in the history
The threadpool used to replay multiple transactions in parallel is
currently global state via a lazy_static definition. Making this pool
owned by ReplayStage will enable subsequent work to make the pool
size configurable on the CLI.

This makes `ReplayStage` create and hold the threadpool which is passed
down to blockstore_processor::confirm_slot().

blockstore_processor::process_blockstore_from_root() now creates its'
own threadpool as well; however, this pool is only alive while for
the scope of that function and does not persist the lifetime of the
process.
  • Loading branch information
steviez authored and willhickey committed Mar 16, 2024
1 parent e8b5f8f commit 5540809
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 22 deletions.
36 changes: 30 additions & 6 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use {
solana_measure::measure::Measure,
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rayon_threadlimit::get_max_thread_count,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
rpc_subscriptions::RpcSubscriptions,
Expand Down Expand Up @@ -652,16 +653,23 @@ impl ReplayStage {
r_bank_forks.get_vote_only_mode_signal(),
)
};
// Thread pool to (maybe) replay multiple threads in parallel
let replay_mode = if replay_slots_concurrently {
ForkReplayMode::Serial
} else {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
.thread_name(|i| format!("solReplay{i:02}"))
.thread_name(|i| format!("solReplayFork{i:02}"))
.build()
.expect("new rayon threadpool");
ForkReplayMode::Parallel(pool)
};
// Thread pool to replay multiple transactions within one block in parallel
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.thread_name(|i| format!("solReplayTx{i:02}"))
.build()
.expect("new rayon threadpool");

Self::reset_poh_recorder(
&my_pubkey,
Expand Down Expand Up @@ -724,6 +732,7 @@ impl ReplayStage {
&mut replay_timing,
log_messages_bytes_limit,
&replay_mode,
&replay_tx_thread_pool,
&prioritization_fee_cache,
&mut purge_repair_slot_counter,
);
Expand Down Expand Up @@ -2136,6 +2145,7 @@ impl ReplayStage {
fn replay_blockstore_into_bank(
bank: &BankWithScheduler,
blockstore: &Blockstore,
replay_tx_thread_pool: &ThreadPool,
replay_stats: &RwLock<ReplaySlotStats>,
replay_progress: &RwLock<ConfirmationProgress>,
transaction_status_sender: Option<&TransactionStatusSender>,
Expand All @@ -2154,6 +2164,7 @@ impl ReplayStage {
blockstore_processor::confirm_slot(
blockstore,
bank,
replay_tx_thread_pool,
&mut w_replay_stats,
&mut w_replay_progress,
false,
Expand Down Expand Up @@ -2712,7 +2723,8 @@ impl ReplayStage {
fn replay_active_banks_concurrently(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
thread_pool: &ThreadPool,
fork_thread_pool: &ThreadPool,
replay_tx_thread_pool: &ThreadPool,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
Expand All @@ -2730,7 +2742,7 @@ impl ReplayStage {
let longest_replay_time_us = AtomicU64::new(0);

// Allow for concurrent replaying of slots from different forks.
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = thread_pool.install(|| {
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = fork_thread_pool.install(|| {
active_bank_slots
.into_par_iter()
.map(|bank_slot| {
Expand All @@ -2744,7 +2756,7 @@ impl ReplayStage {
trace!(
"Replay active bank: slot {}, thread_idx {}",
bank_slot,
thread_pool.current_thread_index().unwrap_or_default()
fork_thread_pool.current_thread_index().unwrap_or_default()
);
let mut progress_lock = progress.write().unwrap();
if progress_lock
Expand Down Expand Up @@ -2797,6 +2809,7 @@ impl ReplayStage {
let blockstore_result = Self::replay_blockstore_into_bank(
&bank,
blockstore,
replay_tx_thread_pool,
&replay_stats,
&replay_progress,
transaction_status_sender,
Expand Down Expand Up @@ -2826,6 +2839,7 @@ impl ReplayStage {
fn replay_active_bank(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
replay_tx_thread_pool: &ThreadPool,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
Expand Down Expand Up @@ -2884,6 +2898,7 @@ impl ReplayStage {
let blockstore_result = Self::replay_blockstore_into_bank(
&bank,
blockstore,
replay_tx_thread_pool,
&bank_progress.replay_stats,
&bank_progress.replay_progress,
transaction_status_sender,
Expand Down Expand Up @@ -3183,6 +3198,7 @@ impl ReplayStage {
replay_timing: &mut ReplayLoopTiming,
log_messages_bytes_limit: Option<usize>,
replay_mode: &ForkReplayMode,
replay_tx_thread_pool: &ThreadPool,
prioritization_fee_cache: &PrioritizationFeeCache,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) -> bool /* completed a bank */ {
Expand All @@ -3199,11 +3215,12 @@ impl ReplayStage {

let replay_result_vec = match replay_mode {
// Skip the overhead of the threadpool if there is only one bank to play
ForkReplayMode::Parallel(thread_pool) if num_active_banks > 1 => {
ForkReplayMode::Parallel(fork_thread_pool) if num_active_banks > 1 => {
Self::replay_active_banks_concurrently(
blockstore,
bank_forks,
thread_pool,
fork_thread_pool,
replay_tx_thread_pool,
my_pubkey,
vote_account,
progress,
Expand All @@ -3223,6 +3240,7 @@ impl ReplayStage {
Self::replay_active_bank(
blockstore,
bank_forks,
replay_tx_thread_pool,
my_pubkey,
vote_account,
progress,
Expand Down Expand Up @@ -5034,9 +5052,15 @@ pub(crate) mod tests {
blockstore.insert_shreds(shreds, None, false).unwrap();
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let exit = Arc::new(AtomicBool::new(false));
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(|i| format!("solReplayTest{i:02}"))
.build()
.expect("new rayon threadpool");
let res = ReplayStage::replay_blockstore_into_bank(
&bank1,
&blockstore,
&replay_tx_thread_pool,
&bank1_progress.replay_stats,
&bank1_progress.replay_progress,
None,
Expand Down
Loading

0 comments on commit 5540809

Please sign in to comment.