From a4a66026e17f271e23ea22e090c57a9598e88542 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 3 Nov 2023 16:02:12 +0900 Subject: [PATCH] Introduce InstalledSchedulerPool trait (#33934) * Introduce InstalledSchedulerPool * Use type alias * Remove log_prefix for now... * Simplify return_to_pool() * Simplify InstalledScheduler's context methods * Reorder trait methods semantically * Simplify Arc handling --- Cargo.lock | 34 ++++++ Cargo.toml | 1 + ledger/src/blockstore_processor.rs | 13 ++- programs/sbf/Cargo.lock | 34 ++++++ runtime/Cargo.toml | 1 + runtime/src/bank_forks.rs | 23 +++- runtime/src/installed_scheduler_pool.rs | 145 +++++++++++++++++++++++- 7 files changed, 242 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 610b5edb49c5f6..862b9ca59021dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,6 +167,20 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "aquamarine" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df752953c49ce90719c7bf1fc587bc8227aed04732ea0c0f85e5397d7fdbd1a1" +dependencies = [ + "include_dir", + "itertools", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "arc-swap" version = "1.5.0" @@ -2611,6 +2625,25 @@ dependencies = [ "version_check", ] +[[package]] +name = "include_dir" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18762faeff7122e89e0857b02f7ce6fcc0d101d5e9ad2ad7846cc01d61b7f19e" +dependencies = [ + "include_dir_macros", +] + +[[package]] +name = "include_dir_macros" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b139284b5cf57ecfa712bcc66950bb635b31aff41c188e8a4cfc758eca374a3f" +dependencies = [ + "proc-macro2", + "quote", +] + [[package]] name = "index_list" version = "0.2.7" @@ -6943,6 +6976,7 @@ dependencies = [ name = "solana-runtime" version = "1.18.0" dependencies = [ + "aquamarine", "arrayref", "assert_matches", "base64 0.21.5", diff --git a/Cargo.toml b/Cargo.toml index e6168de4d7aec8..4ae0c286356643 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,6 +132,7 @@ edition = "2021" [workspace.dependencies] Inflector = "0.11.4" +aquamarine = "0.3.2" aes-gcm-siv = "0.10.3" ahash = "0.8.6" anyhow = "1.0.75" diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index ccdfb97ece81f7..5218b55c4b9050 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1945,7 +1945,7 @@ pub mod tests { genesis_utils::{ self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, }, - installed_scheduler_pool::{MockInstalledScheduler, WaitReason}, + installed_scheduler_pool::{MockInstalledScheduler, SchedulingContext, WaitReason}, }, solana_sdk::{ account::{AccountSharedData, WritableAccount}, @@ -4527,11 +4527,17 @@ pub mod tests { .. } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let context = SchedulingContext::new(bank.clone()); let txs = create_test_transactions(&mint_keypair, &genesis_config.hash()); let mut mocked_scheduler = MockInstalledScheduler::new(); let mut seq = mockall::Sequence::new(); + mocked_scheduler + .expect_context() + .times(1) + .in_sequence(&mut seq) + .return_const(context); mocked_scheduler .expect_schedule_execution() .times(txs.len()) @@ -4542,6 +4548,11 @@ pub mod tests { .times(1) .in_sequence(&mut seq) .returning(|_| None); + mocked_scheduler + .expect_return_to_pool() + .times(1) + .in_sequence(&mut seq) + .returning(|| ()); let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler))); let batch = bank.prepare_sanitized_batch(&txs); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index a81280a23341d0..cd8e73c093d4a7 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -156,6 +156,20 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "aquamarine" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df752953c49ce90719c7bf1fc587bc8227aed04732ea0c0f85e5397d7fdbd1a1" +dependencies = [ + "include_dir", + "itertools", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "arc-swap" version = "1.5.0" @@ -2245,6 +2259,25 @@ dependencies = [ "version_check", ] +[[package]] +name = "include_dir" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18762faeff7122e89e0857b02f7ce6fcc0d101d5e9ad2ad7846cc01d61b7f19e" +dependencies = [ + "include_dir_macros", +] + +[[package]] +name = "include_dir_macros" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b139284b5cf57ecfa712bcc66950bb635b31aff41c188e8a4cfc758eca374a3f" +dependencies = [ + "proc-macro2", + "quote", +] + [[package]] name = "index_list" version = "0.2.7" @@ -5648,6 +5681,7 @@ dependencies = [ name = "solana-runtime" version = "1.18.0" dependencies = [ + "aquamarine", "arrayref", "base64 0.21.5", "bincode", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index e67ee5d2a66a59..f0509811497037 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -10,6 +10,7 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +aquamarine = { workspace = true } arrayref = { workspace = true } base64 = { workspace = true } bincode = { workspace = true } diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index ced6d5a0c61813..dabd90e4c2c835 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -4,7 +4,9 @@ use { crate::{ accounts_background_service::{AbsRequestSender, SnapshotRequest, SnapshotRequestKind}, bank::{epoch_accounts_hash_utils, Bank, SquashTiming}, - installed_scheduler_pool::BankWithScheduler, + installed_scheduler_pool::{ + BankWithScheduler, InstalledSchedulerPoolArc, SchedulingContext, + }, snapshot_config::SnapshotConfig, }, log::*, @@ -72,6 +74,7 @@ pub struct BankForks { last_accounts_hash_slot: Slot, in_vote_only_mode: Arc, highest_slot_at_startup: Slot, + scheduler_pool: Option, } impl Index for BankForks { @@ -203,6 +206,7 @@ impl BankForks { last_accounts_hash_slot: root, in_vote_only_mode: Arc::new(AtomicBool::new(false)), highest_slot_at_startup: 0, + scheduler_pool: None, })); for bank in bank_forks.read().unwrap().banks.values() { @@ -215,11 +219,26 @@ impl BankForks { bank_forks } + pub fn install_scheduler_pool(&mut self, pool: InstalledSchedulerPoolArc) { + info!("Installed new scheduler_pool into bank_forks: {:?}", pool); + assert!( + self.scheduler_pool.replace(pool).is_none(), + "Reinstalling scheduler pool isn't supported" + ); + } + pub fn insert(&mut self, mut bank: Bank) -> BankWithScheduler { bank.check_program_modification_slot = self.root.load(Ordering::Relaxed) < self.highest_slot_at_startup; - let bank = BankWithScheduler::new_without_scheduler(Arc::new(bank)); + let bank = Arc::new(bank); + let bank = if let Some(scheduler_pool) = &self.scheduler_pool { + let context = SchedulingContext::new(bank.clone()); + let scheduler = scheduler_pool.take_scheduler(context); + BankWithScheduler::new(bank, Some(scheduler)) + } else { + BankWithScheduler::new_without_scheduler(bank) + }; let prev = self.banks.insert(bank.slot(), bank.clone_with_scheduler()); assert!(prev.is_none()); let slot = bank.slot(); diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index 553a31c800e6e4..dde82f2a63f890 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -1,5 +1,24 @@ -//! Currently, there are only two things: minimal InstalledScheduler trait and an auxiliary type -//! called BankWithScheduler.. This file will be populated by later PRs to align with the filename. +//! Transaction processing glue code, mainly consisting of Object-safe traits +//! +//! [InstalledSchedulerPool] lends one of pooled [InstalledScheduler]s as wrapped in +//! [BankWithScheduler], which can be used by `ReplayStage` and `BankingStage` for transaction +//! execution. After use, the scheduler will be returned to the pool. +//! +//! [InstalledScheduler] can be fed with [SanitizedTransaction]s. Then, it schedules those +//! executions and commits those results into the associated _bank_. +//! +//! It's generally assumed that each [InstalledScheduler] is backed by multiple threads for +//! parallel transaction processing and there are multiple independent schedulers inside a single +//! instance of [InstalledSchedulerPool]. +//! +//! Dynamic dispatch was inevitable due to the desire to piggyback on +//! [BankForks](crate::bank_forks::BankForks)'s pruning for scheduler lifecycle management as the +//! common place both for `ReplayStage` and `BankingStage` and the resultant need of invoking +//! actual implementations provided by the dependent crate (`solana-unified-scheduler-pool`, which +//! in turn depends on `solana-ledger`, which in turn depends on `solana-runtime`), avoiding a +//! cyclic dependency. +//! +//! See [InstalledScheduler] for visualized interaction. use { crate::bank::Bank, @@ -7,6 +26,7 @@ use { solana_program_runtime::timings::ExecuteTimings, solana_sdk::{ hash::Hash, + slot_history::Slot, transaction::{Result, SanitizedTransaction}, }, std::{ @@ -18,6 +38,57 @@ use { #[cfg(feature = "dev-context-only-utils")] use {mockall::automock, qualifier_attr::qualifiers}; +pub trait InstalledSchedulerPool: Send + Sync + Debug { + fn take_scheduler(&self, context: SchedulingContext) -> DefaultInstalledSchedulerBox; +} + +#[cfg_attr(doc, aquamarine::aquamarine)] +/// Schedules, executes, and commits transactions under encapsulated implementation +/// +/// The following chart illustrates the ownership/reference interaction between inter-dependent +/// objects across crates: +/// +/// ```mermaid +/// graph TD +/// Bank["Arc#lt;Bank#gt;"] +/// +/// subgraph solana-runtime +/// BankForks; +/// BankWithScheduler; +/// Bank; +/// LoadExecuteAndCommitTransactions(["load_execute_and_commit_transactions()"]); +/// SchedulingContext; +/// InstalledSchedulerPool{{InstalledSchedulerPool}}; +/// InstalledScheduler{{InstalledScheduler}}; +/// end +/// +/// subgraph solana-unified-scheduler-pool +/// SchedulerPool; +/// PooledScheduler; +/// ScheduleExecution(["schedule_execution()"]); +/// end +/// +/// subgraph solana-ledger +/// ExecuteBatch(["execute_batch()"]); +/// end +/// +/// ScheduleExecution -. calls .-> ExecuteBatch; +/// BankWithScheduler -. dyn-calls .-> ScheduleExecution; +/// ExecuteBatch -. calls .-> LoadExecuteAndCommitTransactions; +/// linkStyle 0,1,2 stroke:gray,color:gray; +/// +/// BankForks -- owns --> BankWithScheduler; +/// BankForks -- owns --> InstalledSchedulerPool; +/// BankWithScheduler -- refs --> Bank; +/// BankWithScheduler -- owns --> InstalledScheduler; +/// SchedulingContext -- refs --> Bank; +/// InstalledScheduler -- owns --> SchedulingContext; +/// +/// SchedulerPool -- owns --> PooledScheduler; +/// SchedulerPool -. impls .-> InstalledSchedulerPool; +/// PooledScheduler -. impls .-> InstalledScheduler; +/// PooledScheduler -- refs --> SchedulerPool; +/// ``` #[cfg_attr(feature = "dev-context-only-utils", automock)] // suppress false clippy complaints arising from mockall-derive: // warning: `#[must_use]` has no effect when applied to a struct field @@ -27,6 +98,9 @@ use {mockall::automock, qualifier_attr::qualifiers}; allow(unused_attributes, clippy::needless_lifetimes) )] pub trait InstalledScheduler: Send + Sync + Debug + 'static { + fn id(&self) -> SchedulerId; + fn context(&self) -> &SchedulingContext; + // Calling this is illegal as soon as wait_for_termination is called. fn schedule_execution<'a>( &'a self, @@ -50,10 +124,45 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static { /// two reasons later. #[must_use] fn wait_for_termination(&mut self, reason: &WaitReason) -> Option; + + fn return_to_pool(self: Box); } pub type DefaultInstalledSchedulerBox = Box; +pub type InstalledSchedulerPoolArc = Arc; + +pub type SchedulerId = u64; + +/// A small context to propagate a bank and its scheduling mode to the scheduler subsystem. +/// +/// Note that this isn't called `SchedulerContext` because the contexts aren't associated with +/// schedulers one by one. A scheduler will use many SchedulingContexts during its lifetime. +/// "Scheduling" part of the context name refers to an abstract slice of time to schedule and +/// execute all transactions for a given bank for block verification or production. A context is +/// expected to be used by a particular scheduler only for that duration of the time and to be +/// disposed by the scheduler. Then, the scheduler may work on different banks with new +/// `SchedulingContext`s. +#[derive(Clone, Debug)] +pub struct SchedulingContext { + // mode: SchedulingMode, // this will be added later. + bank: Arc, +} + +impl SchedulingContext { + pub fn new(bank: Arc) -> Self { + Self { bank } + } + + pub fn bank(&self) -> &Arc { + &self.bank + } + + pub fn slot(&self) -> Slot { + self.bank().slot() + } +} + pub type ResultWithTimings = (Result<()>, ExecuteTimings); /// A hint from the bank about the reason the caller is waiting on its scheduler termination. @@ -117,6 +226,13 @@ pub type InstalledSchedulerRwLock = RwLock> impl BankWithScheduler { #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] pub(crate) fn new(bank: Arc, scheduler: Option) -> Self { + if let Some(bank_in_context) = scheduler + .as_ref() + .map(|scheduler| scheduler.context().bank()) + { + assert!(Arc::ptr_eq(&bank, bank_in_context)); + } + Self { inner: Arc::new(BankWithSchedulerInner { bank, @@ -229,7 +345,8 @@ impl BankWithSchedulerInner { .as_mut() .and_then(|scheduler| scheduler.wait_for_termination(&reason)); if !reason.is_paused() { - drop(scheduler.take().expect("scheduler after waiting")); + let scheduler = scheduler.take().expect("scheduler after waiting"); + scheduler.return_to_pool(); } result_with_timings } else { @@ -296,12 +413,18 @@ mod tests { }; fn setup_mocked_scheduler_with_extra( + bank: Arc, wait_reasons: impl Iterator, f: Option, ) -> DefaultInstalledSchedulerBox { let mut mock = MockInstalledScheduler::new(); let mut seq = Sequence::new(); + mock.expect_context() + .times(1) + .in_sequence(&mut seq) + .return_const(SchedulingContext::new(bank)); + for wait_reason in wait_reasons { mock.expect_wait_for_termination() .with(mockall::predicate::eq(wait_reason)) @@ -316,6 +439,10 @@ mod tests { }); } + mock.expect_return_to_pool() + .times(1) + .in_sequence(&mut seq) + .returning(|| ()); if let Some(f) = f { f(&mut mock); } @@ -324,9 +451,11 @@ mod tests { } fn setup_mocked_scheduler( + bank: Arc, wait_reasons: impl Iterator, ) -> DefaultInstalledSchedulerBox { setup_mocked_scheduler_with_extra( + bank, wait_reasons, None:: ()>, ) @@ -338,8 +467,9 @@ mod tests { let bank = Arc::new(Bank::default_for_tests()); let bank = BankWithScheduler::new( - bank, + bank.clone(), Some(setup_mocked_scheduler( + bank, [WaitReason::TerminatedToFreeze].into_iter(), )), ); @@ -370,8 +500,9 @@ mod tests { let bank = Arc::new(Bank::default_for_tests()); let bank = BankWithScheduler::new( - bank, + bank.clone(), Some(setup_mocked_scheduler( + bank, [WaitReason::DroppedFromBankForks].into_iter(), )), ); @@ -384,8 +515,9 @@ mod tests { let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42)); let bank = BankWithScheduler::new( - bank, + bank.clone(), Some(setup_mocked_scheduler( + bank, [ WaitReason::PausedForRecentBlockhash, WaitReason::TerminatedToFreeze, @@ -414,6 +546,7 @@ mod tests { )); let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let mocked_scheduler = setup_mocked_scheduler_with_extra( + bank.clone(), [WaitReason::DroppedFromBankForks].into_iter(), Some(|mocked: &mut MockInstalledScheduler| { mocked