Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ReplayStage own the threadpool for tx replay #190

Merged
merged 4 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use {
solana_measure::measure::Measure,
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rayon_threadlimit::get_max_thread_count,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
rpc_subscriptions::RpcSubscriptions,
Expand Down Expand Up @@ -652,6 +653,7 @@ impl ReplayStage {
r_bank_forks.get_vote_only_mode_signal(),
)
};
// Thread pool to (maybe) replay multiple threads in parallel
let replay_mode = if replay_slots_concurrently {
ForkReplayMode::Serial
} else {
Expand All @@ -662,6 +664,12 @@ impl ReplayStage {
.expect("new rayon threadpool");
steviez marked this conversation as resolved.
Show resolved Hide resolved
ForkReplayMode::Parallel(pool)
};
// Thread pool to replay multiple transactions within one block in parallel
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.thread_name(|i| format!("solReplayTx{i:02}"))
.build()
.expect("new rayon threadpool");

Self::reset_poh_recorder(
&my_pubkey,
Expand Down Expand Up @@ -724,6 +732,7 @@ impl ReplayStage {
&mut replay_timing,
log_messages_bytes_limit,
&replay_mode,
&replay_tx_thread_pool,
&prioritization_fee_cache,
&mut purge_repair_slot_counter,
);
Expand Down Expand Up @@ -2136,6 +2145,7 @@ impl ReplayStage {
fn replay_blockstore_into_bank(
bank: &BankWithScheduler,
blockstore: &Blockstore,
replay_tx_thread_pool: &ThreadPool,
replay_stats: &RwLock<ReplaySlotStats>,
replay_progress: &RwLock<ConfirmationProgress>,
transaction_status_sender: Option<&TransactionStatusSender>,
Expand All @@ -2154,6 +2164,7 @@ impl ReplayStage {
blockstore_processor::confirm_slot(
blockstore,
bank,
replay_tx_thread_pool,
&mut w_replay_stats,
&mut w_replay_progress,
false,
Expand Down Expand Up @@ -2712,7 +2723,8 @@ impl ReplayStage {
fn replay_active_banks_concurrently(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
thread_pool: &ThreadPool,
fork_thread_pool: &ThreadPool,
replay_tx_thread_pool: &ThreadPool,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
Expand All @@ -2730,7 +2742,7 @@ impl ReplayStage {
let longest_replay_time_us = AtomicU64::new(0);

// Allow for concurrent replaying of slots from different forks.
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = thread_pool.install(|| {
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = fork_thread_pool.install(|| {
active_bank_slots
.into_par_iter()
.map(|bank_slot| {
Expand All @@ -2744,7 +2756,7 @@ impl ReplayStage {
trace!(
"Replay active bank: slot {}, thread_idx {}",
bank_slot,
thread_pool.current_thread_index().unwrap_or_default()
fork_thread_pool.current_thread_index().unwrap_or_default()
);
let mut progress_lock = progress.write().unwrap();
if progress_lock
Expand Down Expand Up @@ -2797,6 +2809,7 @@ impl ReplayStage {
let blockstore_result = Self::replay_blockstore_into_bank(
&bank,
blockstore,
replay_tx_thread_pool,
&replay_stats,
&replay_progress,
transaction_status_sender,
Expand Down Expand Up @@ -2826,6 +2839,7 @@ impl ReplayStage {
fn replay_active_bank(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
replay_tx_thread_pool: &ThreadPool,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
Expand Down Expand Up @@ -2884,6 +2898,7 @@ impl ReplayStage {
let blockstore_result = Self::replay_blockstore_into_bank(
&bank,
blockstore,
replay_tx_thread_pool,
&bank_progress.replay_stats,
&bank_progress.replay_progress,
transaction_status_sender,
Expand Down Expand Up @@ -3183,6 +3198,7 @@ impl ReplayStage {
replay_timing: &mut ReplayLoopTiming,
log_messages_bytes_limit: Option<usize>,
replay_mode: &ForkReplayMode,
replay_tx_thread_pool: &ThreadPool,
prioritization_fee_cache: &PrioritizationFeeCache,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) -> bool /* completed a bank */ {
Expand All @@ -3199,11 +3215,12 @@ impl ReplayStage {

let replay_result_vec = match replay_mode {
// Skip the overhead of the threadpool if there is only one bank to play
ForkReplayMode::Parallel(thread_pool) if num_active_banks > 1 => {
ForkReplayMode::Parallel(fork_thread_pool) if num_active_banks > 1 => {
Self::replay_active_banks_concurrently(
blockstore,
bank_forks,
thread_pool,
fork_thread_pool,
replay_tx_thread_pool,
my_pubkey,
vote_account,
progress,
Expand All @@ -3223,6 +3240,7 @@ impl ReplayStage {
Self::replay_active_bank(
blockstore,
bank_forks,
replay_tx_thread_pool,
my_pubkey,
vote_account,
progress,
Expand Down Expand Up @@ -5034,9 +5052,15 @@ pub(crate) mod tests {
blockstore.insert_shreds(shreds, None, false).unwrap();
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let exit = Arc::new(AtomicBool::new(false));
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(|i| format!("solReplayTest{i:02}"))
.build()
.expect("new rayon threadpool");
let res = ReplayStage::replay_blockstore_into_bank(
&bank1,
&blockstore,
&replay_tx_thread_pool,
&bank1_progress.replay_stats,
&bank1_progress.replay_progress,
None,
Expand Down
Loading
Loading