diff --git a/Cargo.lock b/Cargo.lock index 445611d5af0216..f9c0d66b324aab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7557,12 +7557,14 @@ dependencies = [ "crossbeam-channel", "derivative", "log", + "qualifier_attr", "solana-ledger", "solana-logger", "solana-program-runtime", "solana-runtime", "solana-sdk", "solana-unified-scheduler-logic", + "solana-unified-scheduler-pool", "solana-vote", ] diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 7626215b1e1126..d7774b957a51f8 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -14,6 +14,7 @@ assert_matches = { workspace = true } crossbeam-channel = { workspace = true } derivative = { workspace = true } log = { workspace = true } +qualifier_attr = { workspace = true } solana-ledger = { workspace = true } solana-program-runtime = { workspace = true } solana-runtime = { workspace = true } @@ -25,3 +26,11 @@ solana-vote = { workspace = true } assert_matches = { workspace = true } solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } +# See order-crates-for-publishing.py for using this unusual `path = "."` +solana-unified-scheduler-pool = { path = ".", features = ["dev-context-only-utils"] } + +[features] +dev-context-only-utils = [] +bench-drop-in-accumulator = [] +bench-drop-in-scheduler = [] +bench-conflicting-execution = [] diff --git a/unified-scheduler-pool/benches/lib.rs b/unified-scheduler-pool/benches/lib.rs new file mode 100644 index 00000000000000..e56e8e32324686 --- /dev/null +++ b/unified-scheduler-pool/benches/lib.rs @@ -0,0 +1,120 @@ +#![feature(test)] + +extern crate test; + +use { + solana_program_runtime::timings::ExecuteTimings, + solana_runtime::{ + bank::Bank, + bank_forks::BankForks, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + installed_scheduler_pool::{InstalledScheduler, SchedulingContext}, + prioritization_fee_cache::PrioritizationFeeCache, + }, + solana_sdk::{ + system_transaction, + transaction::{Result, SanitizedTransaction}, + }, + solana_unified_scheduler_pool::{HandlerContext, PooledScheduler, SchedulerPool, TaskHandler}, + std::sync::Arc, + test::Bencher, +}; + +#[derive(Debug)] +struct DummyTaskHandler; + +impl TaskHandler for DummyTaskHandler { + fn handle( + _result: &mut Result<()>, + _timings: &mut ExecuteTimings, + _bank: &Arc, + _transaction: &SanitizedTransaction, + _index: usize, + _handler_context: &HandlerContext, + ) { + } +} + +fn setup_dummy_fork_graph(bank: Bank) -> Arc { + let slot = bank.slot(); + let bank_fork = BankForks::new_rw_arc(bank); + let bank = bank_fork.read().unwrap().get(slot).unwrap(); + bank.loaded_programs_cache + .write() + .unwrap() + .set_fork_graph(bank_fork); + bank +} + +fn do_bench_tx_throughput(bencher: &mut Bencher) { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + let tx0 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + let bank = Bank::new_for_tests(&genesis_config); + let bank = setup_dummy_fork_graph(bank); + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = SchedulerPool::, _>::new( + None, + None, + None, + ignored_prioritization_fee_cache, + ); + let context = SchedulingContext::new(bank.clone()); + + assert_eq!(bank.transaction_count(), 0); + let mut scheduler = pool.do_take_scheduler(context); + bencher.iter(|| { + for _ in 0..10_000 { + scheduler.schedule_execution(&(tx0, 0)); + } + scheduler.pause_for_recent_blockhash(); + scheduler.clear_session_result_with_timings(); + scheduler.restart_session(); + }); +} + +#[cfg(all( + feature = "bench-drop-in-accumulator", + not(feature = "bench-conflicting-execution") +))] +#[bench] +fn bench_tx_throughput_drop_in_accumulator(bencher: &mut Bencher) { + do_bench_tx_throughput(bencher) +} + +#[cfg(all( + feature = "bench-drop-in-scheduler", + not(feature = "bench-conflicting-execution") +))] +#[bench] +fn bench_tx_throughput_drop_in_scheduler(bencher: &mut Bencher) { + do_bench_tx_throughput(bencher) +} + +#[cfg(all( + feature = "bench-drop-in-accumulator", + feature = "bench-conflicting-execution" +))] +#[bench] +fn bench_tx_throughput_drop_in_accumulator_conflicting(bencher: &mut Bencher) { + do_bench_tx_throughput(bencher) +} + +#[cfg(all( + feature = "bench-drop-in-scheduler", + feature = "bench-conflicting-execution" +))] +#[bench] +fn bench_tx_throughput_drop_in_scheduler_conflicting(bencher: &mut Bencher) { + do_bench_tx_throughput(bencher) +} diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 45c1d45e59685a..c279bd2853d8e6 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -13,6 +13,7 @@ use { crossbeam_channel::{select, unbounded, Receiver, SendError, Sender}, derivative::Derivative, log::*, + qualifier_attr::qualifiers, solana_ledger::blockstore_processor::{ execute_batch, TransactionBatchWithIndexes, TransactionStatusSender, }, @@ -30,6 +31,7 @@ use { solana_unified_scheduler_logic::Task, solana_vote::vote_sender_types::ReplayVoteSender, std::{ + collections::VecDeque, fmt::Debug, marker::PhantomData, sync::{ @@ -82,6 +84,7 @@ where { // Some internal impl and test code want an actual concrete type, NOT the // `dyn InstalledSchedulerPool`. So don't merge this into `Self::new_dyn()`. + #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] fn new( log_messages_bytes_limit: Option, transaction_status_sender: Option, @@ -136,6 +139,7 @@ where .push(scheduler); } + #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] fn do_take_scheduler(&self, context: SchedulingContext) -> S { // pop is intentional for filo, expecting relatively warmed-up scheduler due to having been // returned recently @@ -429,6 +433,19 @@ impl PooledScheduler { initial_context, ) } + + #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] + fn clear_session_result_with_timings(&mut self) { + assert_matches!( + self.inner.thread_manager.take_session_result_with_timings(), + (Ok(_), _) + ); + } + + #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] + fn restart_session(&mut self) { + self.inner.thread_manager.start_session(&self.context); + } } impl, TH: TaskHandler> ThreadManager { @@ -547,28 +564,59 @@ impl, TH: TaskHandler> ThreadManager { // by design or by means of offloading at the last resort. move || loop { let mut is_finished = false; + + #[cfg(not(feature = "bench-conflicting-execution"))] + let is_conflicting = false; + #[cfg(feature = "bench-conflicting-execution")] + let is_conflicting = true; + let mut tasks = VecDeque::with_capacity(10000); + while !is_finished { select! { recv(finished_task_receiver) -> executed_task => { let executed_task = executed_task.unwrap(); active_task_count = active_task_count.checked_sub(1).unwrap(); - executed_task_sender - .send(ExecutedTaskPayload::Payload(executed_task)) - .unwrap(); + + if is_conflicting { + if let Some(task) = tasks.pop_front() { + runnable_task_sender + .send_payload(task) + .unwrap(); + } + } + + #[cfg(feature = "bench-drop-in-accumulator")] + { + executed_task_sender + .send(ExecutedTaskPayload::Payload(executed_task)) + .unwrap(); + } + #[cfg(feature = "bench-drop-in-scheduler")] + { + assert_matches!(executed_task.result_with_timings, (Ok(_), _)); + drop(executed_task); + } }, recv(new_task_receiver) -> message => { - match message.unwrap() { + let Ok(message) = message else { + break; + }; + match message { NewTaskPayload::Payload(task) => { assert!(!session_ending); // so, we're NOT scheduling at all here; rather, just execute // tx straight off. the inter-tx locking deps aren't needed to // be resolved in the case of single-threaded FIFO like this. + if !is_conflicting || active_task_count == 0 { + runnable_task_sender + .send_payload(task) + .unwrap(); + } else { + tasks.push_back(task); + } active_task_count = active_task_count.checked_add(1).unwrap(); - runnable_task_sender - .send_payload(task) - .unwrap(); } NewTaskPayload::OpenSubchannel(context) => { // signal about new SchedulingContext to both handler and