Skip to content

Commit

Permalink
Make ReplayStage create the parallel fork replay threadpool (#137)
Browse files Browse the repository at this point in the history
ReplayStage owning the pool allows for subsequent work to configure
the size of the pool; configuring the size of the pool inside of the
lazy_static would have been a little messy
  • Loading branch information
steviez authored and willhickey committed Mar 9, 2024
1 parent f8c5e15 commit 1fcef51
Showing 1 changed file with 73 additions and 63 deletions.
136 changes: 73 additions & 63 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use {
window_service::DuplicateSlotReceiver,
},
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
lazy_static::lazy_static,
rayon::{prelude::*, ThreadPool},
solana_entry::entry::VerifyRecyclers,
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
Expand Down Expand Up @@ -102,14 +101,6 @@ const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000;
const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4;
const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10;

lazy_static! {
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
.thread_name(|i| format!("solReplay{i:02}"))
.build()
.unwrap();
}

#[derive(PartialEq, Eq, Debug)]
pub enum HeaviestForkFailures {
LockedOut(u64),
Expand All @@ -131,6 +122,11 @@ pub enum HeaviestForkFailures {
),
}

enum ForkReplayMode {
Serial,
Parallel(ThreadPool),
}

#[derive(PartialEq, Eq, Debug)]
enum ConfirmationType {
SupermajorityVoted,
Expand Down Expand Up @@ -656,6 +652,16 @@ impl ReplayStage {
r_bank_forks.get_vote_only_mode_signal(),
)
};
let replay_mode = if replay_slots_concurrently {
ForkReplayMode::Serial
} else {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
.thread_name(|i| format!("solReplay{i:02}"))
.build()
.expect("new rayon threadpool");
ForkReplayMode::Parallel(pool)
};

Self::reset_poh_recorder(
&my_pubkey,
Expand Down Expand Up @@ -717,7 +723,7 @@ impl ReplayStage {
block_metadata_notifier.clone(),
&mut replay_timing,
log_messages_bytes_limit,
replay_slots_concurrently,
&replay_mode,
&prioritization_fee_cache,
&mut purge_repair_slot_counter,
);
Expand Down Expand Up @@ -2706,6 +2712,7 @@ impl ReplayStage {
fn replay_active_banks_concurrently(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
thread_pool: &ThreadPool,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
Expand All @@ -2723,7 +2730,7 @@ impl ReplayStage {
let longest_replay_time_us = AtomicU64::new(0);

// Allow for concurrent replaying of slots from different forks.
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = PAR_THREAD_POOL.install(|| {
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = thread_pool.install(|| {
active_bank_slots
.into_par_iter()
.map(|bank_slot| {
Expand All @@ -2737,7 +2744,7 @@ impl ReplayStage {
trace!(
"Replay active bank: slot {}, thread_idx {}",
bank_slot,
PAR_THREAD_POOL.current_thread_index().unwrap_or_default()
thread_pool.current_thread_index().unwrap_or_default()
);
let mut progress_lock = progress.write().unwrap();
if progress_lock
Expand Down Expand Up @@ -3175,7 +3182,7 @@ impl ReplayStage {
block_metadata_notifier: Option<BlockMetadataNotifierArc>,
replay_timing: &mut ReplayLoopTiming,
log_messages_bytes_limit: Option<usize>,
replay_slots_concurrently: bool,
replay_mode: &ForkReplayMode,
prioritization_fee_cache: &PrioritizationFeeCache,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) -> bool /* completed a bank */ {
Expand All @@ -3186,11 +3193,17 @@ impl ReplayStage {
num_active_banks,
active_bank_slots
);
if num_active_banks > 0 {
let replay_result_vec = if num_active_banks > 1 && replay_slots_concurrently {
if active_bank_slots.is_empty() {
return false;
}

let replay_result_vec = match replay_mode {
// Skip the overhead of the threadpool if there is only one bank to play
ForkReplayMode::Parallel(thread_pool) if num_active_banks > 1 => {
Self::replay_active_banks_concurrently(
blockstore,
bank_forks,
thread_pool,
my_pubkey,
vote_account,
progress,
Expand All @@ -3203,55 +3216,52 @@ impl ReplayStage {
&active_bank_slots,
prioritization_fee_cache,
)
} else {
active_bank_slots
.iter()
.map(|bank_slot| {
Self::replay_active_bank(
blockstore,
bank_forks,
my_pubkey,
vote_account,
progress,
transaction_status_sender,
entry_notification_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
log_messages_bytes_limit,
*bank_slot,
prioritization_fee_cache,
)
})
.collect()
};
}
ForkReplayMode::Serial | ForkReplayMode::Parallel(_) => active_bank_slots
.iter()
.map(|bank_slot| {
Self::replay_active_bank(
blockstore,
bank_forks,
my_pubkey,
vote_account,
progress,
transaction_status_sender,
entry_notification_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
log_messages_bytes_limit,
*bank_slot,
prioritization_fee_cache,
)
})
.collect(),
};

Self::process_replay_results(
blockstore,
bank_forks,
progress,
transaction_status_sender,
cache_block_meta_sender,
heaviest_subtree_fork_choice,
bank_notification_sender,
rewards_recorder_sender,
rpc_subscriptions,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
unfrozen_gossip_verified_vote_hashes,
latest_validator_votes_for_frozen_banks,
cluster_slots_update_sender,
cost_update_sender,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
block_metadata_notifier,
&replay_result_vec,
purge_repair_slot_counter,
)
} else {
false
}
Self::process_replay_results(
blockstore,
bank_forks,
progress,
transaction_status_sender,
cache_block_meta_sender,
heaviest_subtree_fork_choice,
bank_notification_sender,
rewards_recorder_sender,
rpc_subscriptions,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
unfrozen_gossip_verified_vote_hashes,
latest_validator_votes_for_frozen_banks,
cluster_slots_update_sender,
cost_update_sender,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
block_metadata_notifier,
&replay_result_vec,
purge_repair_slot_counter,
)
}

#[allow(clippy::too_many_arguments)]
Expand Down

0 comments on commit 1fcef51

Please sign in to comment.