Skip to content

Commit

Permalink
Define InstalledScheduler::wait_for_termination() (#33922)
Browse files Browse the repository at this point in the history
* Define InstalledScheduler::wait_for_termination()

* Rename to wait_for_scheduler_termination

* Comment wait_for_termination and WaitReason better
  • Loading branch information
ryoqun authored Oct 31, 2023
1 parent b2cec5a commit 136ab21
Show file tree
Hide file tree
Showing 8 changed files with 401 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 36 additions & 1 deletion core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ use {
block_error::BlockError,
blockstore::Blockstore,
blockstore_processor::{
self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender,
self, BlockstoreProcessorError, ConfirmationProgress, ExecuteBatchesInternalMetrics,
TransactionStatusSender,
},
entry_notifier_service::EntryNotifierSender,
leader_schedule_cache::LeaderScheduleCache,
Expand Down Expand Up @@ -2815,6 +2816,40 @@ impl ReplayStage {
.expect("Bank fork progress entry missing for completed bank");

let replay_stats = bank_progress.replay_stats.clone();

if let Some((result, completed_execute_timings)) =
bank.wait_for_completed_scheduler()
{
let metrics = ExecuteBatchesInternalMetrics::new_with_timings_from_all_threads(
completed_execute_timings,
);
replay_stats
.write()
.unwrap()
.batch_execute
.accumulate(metrics);

if let Err(err) = result {
Self::mark_dead_slot(
blockstore,
bank,
bank_forks.read().unwrap().root(),
&BlockstoreProcessorError::InvalidTransaction(err),
rpc_subscriptions,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
epoch_slots_frozen_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
);
// don't try to run the remaining normal processing for the completed bank
continue;
}
}

let r_replay_stats = replay_stats.read().unwrap();
let replay_progress = bank_progress.replay_progress.clone();
let r_replay_progress = replay_progress.read().unwrap();
Expand Down
1 change: 1 addition & 0 deletions ledger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ lazy_static = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
lru = { workspace = true }
mockall = { workspace = true }
num_cpus = { workspace = true }
num_enum = { workspace = true }
prost = { workspace = true }
Expand Down
34 changes: 31 additions & 3 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,27 @@ fn execute_batch(
}

#[derive(Default)]
struct ExecuteBatchesInternalMetrics {
pub struct ExecuteBatchesInternalMetrics {
execution_timings_per_thread: HashMap<usize, ThreadExecuteTimings>,
total_batches_len: u64,
execute_batches_us: u64,
}

impl ExecuteBatchesInternalMetrics {
pub fn new_with_timings_from_all_threads(execute_timings: ExecuteTimings) -> Self {
const DUMMY_THREAD_INDEX: usize = 999;
let mut new = Self::default();
new.execution_timings_per_thread.insert(
DUMMY_THREAD_INDEX,
ThreadExecuteTimings {
execute_timings,
..ThreadExecuteTimings::default()
},
);
new
}
}

fn execute_batches_internal(
bank: &Arc<Bank>,
batches: &[TransactionBatchWithIndexes],
Expand Down Expand Up @@ -1068,7 +1083,7 @@ pub struct BatchExecutionTiming {
}

impl BatchExecutionTiming {
fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) {
pub fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) {
let Self {
totals,
wall_clock_us,
Expand Down Expand Up @@ -1382,6 +1397,9 @@ fn process_bank_0(
&mut ExecuteTimings::default(),
)
.expect("Failed to process bank 0 from ledger. Did you forget to provide a snapshot?");
if let Some((result, _timings)) = bank0.wait_for_completed_scheduler() {
result.unwrap();
}
bank0.freeze();
if blockstore.is_primary_access() {
blockstore.insert_bank_hash(bank0.slot(), bank0.hash(), false);
Expand Down Expand Up @@ -1784,6 +1802,9 @@ fn process_single_slot(
err
})?;

if let Some((result, _timings)) = bank.wait_for_completed_scheduler() {
result?
}
bank.freeze(); // all banks handled by this routine are created from complete slots
if blockstore.is_primary_access() {
blockstore.insert_bank_hash(bank.slot(), bank.hash(), false);
Expand Down Expand Up @@ -1924,7 +1945,7 @@ pub mod tests {
genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
},
installed_scheduler_pool::MockInstalledScheduler,
installed_scheduler_pool::{MockInstalledScheduler, WaitReason},
},
solana_sdk::{
account::{AccountSharedData, WritableAccount},
Expand Down Expand Up @@ -4510,10 +4531,17 @@ pub mod tests {
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());

let mut mocked_scheduler = MockInstalledScheduler::new();
let mut seq = mockall::Sequence::new();
mocked_scheduler
.expect_schedule_execution()
.times(txs.len())
.returning(|_| ());
mocked_scheduler
.expect_wait_for_termination()
.with(mockall::predicate::eq(WaitReason::DroppedFromBankForks))
.times(1)
.in_sequence(&mut seq)
.returning(|_| None);
let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler)));

let batch = bank.prepare_sanitized_batch(&txs);
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 21 additions & 10 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use {
builtins::{BuiltinPrototype, BUILTINS},
epoch_rewards_hasher::hash_rewards_into_partitions,
epoch_stakes::{EpochStakes, NodeVoteAccounts},
installed_scheduler_pool::{BankWithScheduler, InstalledSchedulerRwLock},
runtime_config::RuntimeConfig,
serde_snapshot::BankIncrementalSnapshotPersistence,
snapshot_hash::SnapshotHash,
Expand Down Expand Up @@ -220,7 +221,7 @@ mod metrics;
mod serde_snapshot;
mod sysvar_cache;
#[cfg(test)]
mod tests;
pub(crate) mod tests;
mod transaction_account_state_info;

pub const SECONDS_PER_YEAR: f64 = 365.25 * 24.0 * 60.0 * 60.0;
Expand Down Expand Up @@ -4185,7 +4186,11 @@ impl Bank {
/// Register a new recent blockhash in the bank's recent blockhash queue. Called when a bank
/// reaches its max tick height. Can be called by tests to get new blockhashes for transaction
/// processing without advancing to a new bank slot.
pub fn register_recent_blockhash(&self, blockhash: &Hash) {
fn register_recent_blockhash(&self, blockhash: &Hash, scheduler: &InstalledSchedulerRwLock) {
// This is needed because recent_blockhash updates necessitate synchronizations for
// consistent tx check_age handling.
BankWithScheduler::wait_for_paused_scheduler(self, scheduler);

// Only acquire the write lock for the blockhash queue on block boundaries because
// readers can starve this write lock acquisition and ticks would be slowed down too
// much if the write lock is acquired for each tick.
Expand All @@ -4197,7 +4202,10 @@ impl Bank {
// gating this under #[cfg(feature = "dev-context-only-utils")] isn't easy due to
// solana-program-test's usage...
pub fn register_unique_recent_blockhash_for_test(&self) {
self.register_recent_blockhash(&Hash::new_unique())
self.register_recent_blockhash(
&Hash::new_unique(),
&BankWithScheduler::no_scheduler_available(),
)
}

/// Tell the bank which Entry IDs exist on the ledger. This function assumes subsequent calls
Expand All @@ -4206,14 +4214,14 @@ impl Bank {
///
/// This is NOT thread safe because if tick height is updated by two different threads, the
/// block boundary condition could be missed.
pub fn register_tick(&self, hash: &Hash) {
pub fn register_tick(&self, hash: &Hash, scheduler: &InstalledSchedulerRwLock) {
assert!(
!self.freeze_started(),
"register_tick() working on a bank that is already frozen or is undergoing freezing!"
);

if self.is_block_boundary(self.tick_height.load(Relaxed) + 1) {
self.register_recent_blockhash(hash);
self.register_recent_blockhash(hash, scheduler);
}

// ReplayStage will start computing the accounts delta hash when it
Expand All @@ -4226,18 +4234,17 @@ impl Bank {

#[cfg(feature = "dev-context-only-utils")]
pub fn register_tick_for_test(&self, hash: &Hash) {
// currently meaningless wrapper; upcoming pr will make it an actual helper...
self.register_tick(hash)
self.register_tick(hash, &BankWithScheduler::no_scheduler_available())
}

#[cfg(feature = "dev-context-only-utils")]
pub fn register_default_tick_for_test(&self) {
self.register_tick(&Hash::default())
self.register_tick_for_test(&Hash::default())
}

#[cfg(feature = "dev-context-only-utils")]
pub fn register_unique_tick(&self) {
self.register_tick(&Hash::new_unique())
self.register_tick_for_test(&Hash::new_unique())
}

pub fn is_complete(&self) -> bool {
Expand Down Expand Up @@ -8008,10 +8015,14 @@ impl Bank {
}

pub fn fill_bank_with_ticks_for_tests(&self) {
self.do_fill_bank_with_ticks_for_tests(&BankWithScheduler::no_scheduler_available())
}

pub(crate) fn do_fill_bank_with_ticks_for_tests(&self, scheduler: &InstalledSchedulerRwLock) {
if self.tick_height.load(Relaxed) < self.max_tick_height {
let last_blockhash = self.last_blockhash();
while self.last_blockhash() == last_blockhash {
self.register_tick(&Hash::new_unique())
self.register_tick(&Hash::new_unique(), scheduler)
}
} else {
warn!("Bank already reached max tick height, cannot fill it with more ticks");
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/bank/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ fn test_bank_new() {
assert_eq!(rent.lamports_per_byte_year, 5);
}

fn create_simple_test_bank(lamports: u64) -> Bank {
pub(crate) fn create_simple_test_bank(lamports: u64) -> Bank {
let (genesis_config, _mint_keypair) = create_genesis_config(lamports);
Bank::new_for_tests(&genesis_config)
}
Expand Down
Loading

0 comments on commit 136ab21

Please sign in to comment.