Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define InstalledScheduler::wait_for_termination() #33922

Merged
merged 3 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 36 additions & 1 deletion core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ use {
block_error::BlockError,
blockstore::Blockstore,
blockstore_processor::{
self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender,
self, BlockstoreProcessorError, ConfirmationProgress, ExecuteBatchesInternalMetrics,
TransactionStatusSender,
},
entry_notifier_service::EntryNotifierSender,
leader_schedule_cache::LeaderScheduleCache,
Expand Down Expand Up @@ -2815,6 +2816,40 @@ impl ReplayStage {
.expect("Bank fork progress entry missing for completed bank");

let replay_stats = bank_progress.replay_stats.clone();

if let Some((result, completed_execute_timings)) =
bank.wait_for_completed_scheduler()
{
let metrics = ExecuteBatchesInternalMetrics::new_with_timings_from_all_threads(
completed_execute_timings,
);
replay_stats
.write()
.unwrap()
.batch_execute
.accumulate(metrics);

if let Err(err) = result {
Self::mark_dead_slot(
blockstore,
bank,
bank_forks.read().unwrap().root(),
&BlockstoreProcessorError::InvalidTransaction(err),
rpc_subscriptions,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
epoch_slots_frozen_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
);
// don't try to run the remaining normal processing for the completed bank
continue;
}
}

let r_replay_stats = replay_stats.read().unwrap();
let replay_progress = bank_progress.replay_progress.clone();
let r_replay_progress = replay_progress.read().unwrap();
Expand Down
1 change: 1 addition & 0 deletions ledger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ lazy_static = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
lru = { workspace = true }
mockall = { workspace = true }
num_cpus = { workspace = true }
num_enum = { workspace = true }
prost = { workspace = true }
Expand Down
34 changes: 31 additions & 3 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,27 @@ fn execute_batch(
}

#[derive(Default)]
struct ExecuteBatchesInternalMetrics {
pub struct ExecuteBatchesInternalMetrics {
execution_timings_per_thread: HashMap<usize, ThreadExecuteTimings>,
total_batches_len: u64,
execute_batches_us: u64,
}

impl ExecuteBatchesInternalMetrics {
pub fn new_with_timings_from_all_threads(execute_timings: ExecuteTimings) -> Self {
const DUMMY_THREAD_INDEX: usize = 999;
let mut new = Self::default();
new.execution_timings_per_thread.insert(
DUMMY_THREAD_INDEX,
ThreadExecuteTimings {
execute_timings,
..ThreadExecuteTimings::default()
},
);
new
}
}

fn execute_batches_internal(
bank: &Arc<Bank>,
batches: &[TransactionBatchWithIndexes],
Expand Down Expand Up @@ -1068,7 +1083,7 @@ pub struct BatchExecutionTiming {
}

impl BatchExecutionTiming {
fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) {
pub fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) {
let Self {
totals,
wall_clock_us,
Expand Down Expand Up @@ -1382,6 +1397,9 @@ fn process_bank_0(
&mut ExecuteTimings::default(),
)
.expect("Failed to process bank 0 from ledger. Did you forget to provide a snapshot?");
if let Some((result, _timings)) = bank0.wait_for_completed_scheduler() {
result.unwrap();
}
bank0.freeze();
if blockstore.is_primary_access() {
blockstore.insert_bank_hash(bank0.slot(), bank0.hash(), false);
Expand Down Expand Up @@ -1784,6 +1802,9 @@ fn process_single_slot(
err
})?;

if let Some((result, _timings)) = bank.wait_for_completed_scheduler() {
result?
}
bank.freeze(); // all banks handled by this routine are created from complete slots
if blockstore.is_primary_access() {
blockstore.insert_bank_hash(bank.slot(), bank.hash(), false);
Expand Down Expand Up @@ -1924,7 +1945,7 @@ pub mod tests {
genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
},
installed_scheduler_pool::MockInstalledScheduler,
installed_scheduler_pool::{MockInstalledScheduler, WaitReason},
},
solana_sdk::{
account::{AccountSharedData, WritableAccount},
Expand Down Expand Up @@ -4510,10 +4531,17 @@ pub mod tests {
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());

let mut mocked_scheduler = MockInstalledScheduler::new();
let mut seq = mockall::Sequence::new();
mocked_scheduler
.expect_schedule_execution()
.times(txs.len())
.returning(|_| ());
mocked_scheduler
.expect_wait_for_termination()
.with(mockall::predicate::eq(WaitReason::DroppedFromBankForks))
.times(1)
.in_sequence(&mut seq)
.returning(|_| None);
let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler)));

let batch = bank.prepare_sanitized_batch(&txs);
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 21 additions & 10 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use {
builtins::{BuiltinPrototype, BUILTINS},
epoch_rewards_hasher::hash_rewards_into_partitions,
epoch_stakes::{EpochStakes, NodeVoteAccounts},
installed_scheduler_pool::{BankWithScheduler, InstalledSchedulerRwLock},
runtime_config::RuntimeConfig,
serde_snapshot::BankIncrementalSnapshotPersistence,
snapshot_hash::SnapshotHash,
Expand Down Expand Up @@ -220,7 +221,7 @@ mod metrics;
mod serde_snapshot;
mod sysvar_cache;
#[cfg(test)]
mod tests;
pub(crate) mod tests;
mod transaction_account_state_info;

pub const SECONDS_PER_YEAR: f64 = 365.25 * 24.0 * 60.0 * 60.0;
Expand Down Expand Up @@ -4185,7 +4186,11 @@ impl Bank {
/// Register a new recent blockhash in the bank's recent blockhash queue. Called when a bank
/// reaches its max tick height. Can be called by tests to get new blockhashes for transaction
/// processing without advancing to a new bank slot.
pub fn register_recent_blockhash(&self, blockhash: &Hash) {
fn register_recent_blockhash(&self, blockhash: &Hash, scheduler: &InstalledSchedulerRwLock) {
Copy link
Contributor Author

@ryoqun ryoqun Oct 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub isn't needed anymore as of #33880..

also, this internal fn should ever be called by register_tick. even more so after this new scheduler arg. in other words, blocking must be done if needed with proper scheduler plumbing.

// This is needed because recent_blockhash updates necessitate synchronizations for
// consistent tx check_age handling.
BankWithScheduler::wait_for_paused_scheduler(self, scheduler);
Comment on lines +4190 to +4192
Copy link
Contributor Author

@ryoqun ryoqun Oct 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is indeed very subtle yet crucial. there's tests for this: test_scheduler_schedule_execution_recent_blockhash_edge_case_{with,without}_race in #33070

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm trying to just summarize the reason this is necessary.

Essentially, in the loop in blcokstore_processor.rs, scheduling no longer blocks our transaction processing. This means we will end up scheduling until we hit the final tick, and which point we register the new bank hash.

We need to make sure the blockhash_queue of our bank is not updated until after all the scheduled transactions have completed, because if a tx w/ recent blockhash equal to the current blockhash was scheduled we'd get inconsistent results based on when it was executed not scheduled.

I think that is correct, but please let me know if it is not.
If that is the case, why did we choose this approach instead of checking blockhash at schedule time instead?
afaik blockhash queue is only updated at the end of a block, i.e. when we should not be scheduling any more txs for that slot.

Copy link
Contributor Author

@ryoqun ryoqun Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially, in the loop in blcokstore_processor.rs, scheduling no longer blocks our transaction processing. This means we will end up scheduling until we hit the final tick, and which point we register the new bank hash.

We need to make sure the blockhash_queue of our bank is not updated until after all the scheduled transactions have completed, because if a tx w/ recent blockhash equal to the current blockhash was scheduled we'd get inconsistent results based on when it was executed not scheduled.

that's correct understanding.

If that is the case, why did we choose this approach instead of checking blockhash at schedule time instead?
afaik blockhash queue is only updated at the end of a block, i.e. when we should not be scheduling any more txs for that slot.

there's two reasons:

  • impl simplify and risk reduction (i.e. extracting Bank::check_age() is too dangerous without some rigourious code audit, which i want to avoid (esp considering nonces). Certainly, this is nice optimization opportunity. maybe future work).
  • the recent blockhashes are exposed to runtime as a sysvar (SysvarRecentB1ockHashes11111111111111111111) in theory. In theory here means this isn't the case due to the existence of SysvarCache, which is updated only at the start of slot. However, I don't want to rely on some peculiar behavior of the cache not to be fragile. Again, this is fixable, but this will be of (distant) future work).

Alternatively, we can route the recentblockhash update via the scheduler like this:

enum SchedulerTask {
  ExecuteTransaction(txes...),
  UpdateRecentBlockhash(hash),
}

so that this particular blocking call can be consolidated into replaystage's wait_for_completed_scheduler(). but again, this is out-of-scope for now due to the need of auditing any potential affected access timings to the bank's recent blockhashes and its implications.


// Only acquire the write lock for the blockhash queue on block boundaries because
// readers can starve this write lock acquisition and ticks would be slowed down too
// much if the write lock is acquired for each tick.
Expand All @@ -4197,7 +4202,10 @@ impl Bank {
// gating this under #[cfg(feature = "dev-context-only-utils")] isn't easy due to
// solana-program-test's usage...
pub fn register_unique_recent_blockhash_for_test(&self) {
self.register_recent_blockhash(&Hash::new_unique())
self.register_recent_blockhash(
&Hash::new_unique(),
&BankWithScheduler::no_scheduler_available(),
)
}

/// Tell the bank which Entry IDs exist on the ledger. This function assumes subsequent calls
Expand All @@ -4206,14 +4214,14 @@ impl Bank {
///
/// This is NOT thread safe because if tick height is updated by two different threads, the
/// block boundary condition could be missed.
pub fn register_tick(&self, hash: &Hash) {
pub fn register_tick(&self, hash: &Hash, scheduler: &InstalledSchedulerRwLock) {
assert!(
!self.freeze_started(),
"register_tick() working on a bank that is already frozen or is undergoing freezing!"
);

if self.is_block_boundary(self.tick_height.load(Relaxed) + 1) {
self.register_recent_blockhash(hash);
self.register_recent_blockhash(hash, scheduler);
}

// ReplayStage will start computing the accounts delta hash when it
Expand All @@ -4226,18 +4234,17 @@ impl Bank {

#[cfg(feature = "dev-context-only-utils")]
pub fn register_tick_for_test(&self, hash: &Hash) {
// currently meaningless wrapper; upcoming pr will make it an actual helper...
self.register_tick(hash)
self.register_tick(hash, &BankWithScheduler::no_scheduler_available())
}

#[cfg(feature = "dev-context-only-utils")]
pub fn register_default_tick_for_test(&self) {
self.register_tick(&Hash::default())
self.register_tick_for_test(&Hash::default())
}

#[cfg(feature = "dev-context-only-utils")]
pub fn register_unique_tick(&self) {
self.register_tick(&Hash::new_unique())
self.register_tick_for_test(&Hash::new_unique())
}

pub fn is_complete(&self) -> bool {
Expand Down Expand Up @@ -8008,10 +8015,14 @@ impl Bank {
}

pub fn fill_bank_with_ticks_for_tests(&self) {
self.do_fill_bank_with_ticks_for_tests(&BankWithScheduler::no_scheduler_available())
}

pub(crate) fn do_fill_bank_with_ticks_for_tests(&self, scheduler: &InstalledSchedulerRwLock) {
if self.tick_height.load(Relaxed) < self.max_tick_height {
let last_blockhash = self.last_blockhash();
while self.last_blockhash() == last_blockhash {
self.register_tick(&Hash::new_unique())
self.register_tick(&Hash::new_unique(), scheduler)
}
} else {
warn!("Bank already reached max tick height, cannot fill it with more ticks");
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/bank/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ fn test_bank_new() {
assert_eq!(rent.lamports_per_byte_year, 5);
}

fn create_simple_test_bank(lamports: u64) -> Bank {
pub(crate) fn create_simple_test_bank(lamports: u64) -> Bank {
let (genesis_config, _mint_keypair) = create_genesis_config(lamports);
Bank::new_for_tests(&genesis_config)
}
Expand Down
Loading