diff --git a/core/tests/unified_scheduler.rs b/core/tests/unified_scheduler.rs index a6e40296510609..a458e776fbc42c 100644 --- a/core/tests/unified_scheduler.rs +++ b/core/tests/unified_scheduler.rs @@ -108,7 +108,7 @@ fn test_scheduler_waited_by_drop_bank_service() { // been started let lock_to_stall = LOCK_TO_STALL.lock().unwrap(); pruned_bank - .schedule_transaction_executions([(&tx, &0)].into_iter()) + .schedule_transaction_executions([(tx, 0)].into_iter()) .unwrap(); drop(pruned_bank); assert_eq!(pool_raw.pooled_scheduler_count(), 0); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 986e1a2549b13b..3eedf03e4d0083 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -62,7 +62,7 @@ use { solana_vote::vote_account::VoteAccountsHashMap, std::{ collections::{HashMap, HashSet}, - ops::Index, + ops::{Index, Range}, path::PathBuf, result, sync::{ @@ -70,6 +70,7 @@ use { Arc, Mutex, RwLock, }, time::{Duration, Instant}, + vec::Drain, }, thiserror::Error, ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen}, @@ -82,6 +83,14 @@ pub struct TransactionBatchWithIndexes<'a, 'b, Tx: SVMMessage> { pub transaction_indexes: Vec, } +// `TransactionBatchWithIndexes` but without the `Drop` that prevents +// us from nicely unwinding these with manual unlocking. +pub struct LockedTransactionsWithIndexes { + lock_results: Vec>, + transactions: Vec, + starting_index: usize, +} + struct ReplayEntry { entry: EntryType, starting_index: usize, @@ -360,7 +369,7 @@ fn execute_batches_internal( fn process_batches( bank: &BankWithScheduler, replay_tx_thread_pool: &ThreadPool, - batches: &[TransactionBatchWithIndexes], + locked_entries: impl ExactSizeIterator>, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, batch_execution_timing: &mut BatchExecutionTiming, @@ -370,7 +379,7 @@ fn process_batches( if bank.has_installed_scheduler() { debug!( "process_batches()/schedule_batches_for_execution({} batches)", - batches.len() + locked_entries.len() ); // Scheduling usually succeeds (immediately returns `Ok(())`) here without being blocked on // the actual transaction executions. @@ -383,27 +392,27 @@ fn process_batches( // propagated eventually via the blocking fn called // BankWithScheduler::wait_for_completed_scheduler(). // - // To recite, the returned error is completely unrelated to the argument's `batches` at the - // hand. While being awkward, the _async_ unified scheduler is abusing this existing error - // propagation code path to the replay stage for compatibility and ease of integration, - // exploiting the fact that the replay stage doesn't care _which transaction the returned - // error is originating from_. + // To recite, the returned error is completely unrelated to the argument's `locked_entries` + // at the hand. While being awkward, the _async_ unified scheduler is abusing this existing + // error propagation code path to the replay stage for compatibility and ease of + // integration, exploiting the fact that the replay stage doesn't care _which transaction + // the returned error is originating from_. // // In the future, more proper error propagation mechanism will be introduced once after we // fully transition to the unified scheduler for the block verification. That one would be // a push based one from the unified scheduler to the replay stage to eliminate the current // overhead: 1 read lock per batch in // `BankWithScheduler::schedule_transaction_executions()`. - schedule_batches_for_execution(bank, batches) + schedule_batches_for_execution(bank, locked_entries) } else { debug!( "process_batches()/rebatch_and_execute_batches({} batches)", - batches.len() + locked_entries.len() ); rebatch_and_execute_batches( bank, replay_tx_thread_pool, - batches, + locked_entries, transaction_status_sender, replay_vote_sender, batch_execution_timing, @@ -415,38 +424,45 @@ fn process_batches( fn schedule_batches_for_execution( bank: &BankWithScheduler, - batches: &[TransactionBatchWithIndexes], + locked_entries: impl Iterator>, ) -> Result<()> { - for TransactionBatchWithIndexes { - batch, - transaction_indexes, - } in batches + // Track the first error encountered in the loop below, if any. + // This error will be propagated to the replay stage, or Ok(()). + let mut first_err = Ok(()); + + for LockedTransactionsWithIndexes { + lock_results, + transactions, + starting_index, + } in locked_entries { - bank.schedule_transaction_executions( - batch - .sanitized_transactions() - .iter() - .zip(transaction_indexes.iter()), - )?; + // unlock before sending to scheduler. + bank.unlock_accounts(transactions.iter().zip(lock_results.iter())); + // give ownership to scheduler. capture the first error, but continue the loop + // to unlock. + // scheduling is skipped if we have already detected an error in this loop + let indexes = starting_index..starting_index + transactions.len(); + first_err = first_err.and_then(|()| { + bank.schedule_transaction_executions(transactions.into_iter().zip_eq(indexes)) + }); } - Ok(()) + first_err } fn rebatch_transactions<'a>( lock_results: &'a [Result<()>], bank: &'a Arc, sanitized_txs: &'a [SanitizedTransaction], - start: usize, - end: usize, + range: Range, transaction_indexes: &'a [usize], ) -> TransactionBatchWithIndexes<'a, 'a, SanitizedTransaction> { - let txs = &sanitized_txs[start..=end]; - let results = &lock_results[start..=end]; + let txs = &sanitized_txs[range.clone()]; + let results = &lock_results[range.clone()]; let mut tx_batch = TransactionBatch::new(results.to_vec(), bank, OwnedOrBorrowed::Borrowed(txs)); - tx_batch.set_needs_unlock(false); + tx_batch.set_needs_unlock(true); // unlock on drop for easier clean up - let transaction_indexes = transaction_indexes[start..=end].to_vec(); + let transaction_indexes = transaction_indexes[range].to_vec(); TransactionBatchWithIndexes { batch: tx_batch, transaction_indexes, @@ -456,29 +472,37 @@ fn rebatch_transactions<'a>( fn rebatch_and_execute_batches( bank: &Arc, replay_tx_thread_pool: &ThreadPool, - batches: &[TransactionBatchWithIndexes], + locked_entries: impl ExactSizeIterator>, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, timing: &mut BatchExecutionTiming, log_messages_bytes_limit: Option, prioritization_fee_cache: &PrioritizationFeeCache, ) -> Result<()> { - if batches.is_empty() { + if locked_entries.len() == 0 { return Ok(()); } - let ((lock_results, sanitized_txs), transaction_indexes): ((Vec<_>, Vec<_>), Vec<_>) = batches - .iter() - .flat_map(|batch| { - batch - .batch - .lock_results() - .iter() - .cloned() - .zip(batch.batch.sanitized_transactions().to_vec()) - .zip(batch.transaction_indexes.to_vec()) - }) - .unzip(); + // Flatten the locked entries. Store the original entry lengths to avoid rebatching logic + // for small entries. + let mut original_entry_lengths = Vec::with_capacity(locked_entries.len()); + let ((lock_results, sanitized_txs), transaction_indexes): ((Vec<_>, Vec<_>), Vec<_>) = + locked_entries + .flat_map( + |LockedTransactionsWithIndexes { + lock_results, + transactions, + starting_index, + }| { + let num_transactions = transactions.len(); + original_entry_lengths.push(num_transactions); + lock_results + .into_iter() + .zip_eq(transactions) + .zip_eq(starting_index..starting_index + num_transactions) + }, + ) + .unzip(); let mut minimal_tx_cost = u64::MAX; let mut total_cost: u64 = 0; @@ -508,8 +532,7 @@ fn rebatch_and_execute_batches( &lock_results, bank, &sanitized_txs, - slice_start, - index, + slice_start..next_index, &transaction_indexes, ); slice_start = next_index; @@ -519,7 +542,24 @@ fn rebatch_and_execute_batches( }); &tx_batches[..] } else { - batches + let mut slice_start = 0; + for num_transactions in original_entry_lengths { + let next_index = slice_start + num_transactions; + // this is more of a "re-construction" of the original batches than + // a rebatching. But the logic is the same, with the transfer of + // unlocking responsibility to the batch. + let tx_batch = rebatch_transactions( + &lock_results, + bank, + &sanitized_txs, + slice_start..next_index, + &transaction_indexes, + ); + slice_start = next_index; + tx_batches.push(tx_batch); + } + + &tx_batches[..] }; let execute_batches_internal_metrics = execute_batches_internal( @@ -561,7 +601,7 @@ pub fn process_entries_for_tests( let mut entry_starting_index: usize = bank.transaction_count().try_into().unwrap(); let mut batch_timing = BatchExecutionTiming::default(); - let mut replay_entries: Vec<_> = entry::verify_transactions( + let replay_entries: Vec<_> = entry::verify_transactions( entries, &replay_tx_thread_pool, Arc::new(verify_transaction), @@ -583,7 +623,7 @@ pub fn process_entries_for_tests( let result = process_entries( bank, &replay_tx_thread_pool, - &mut replay_entries, + replay_entries, transaction_status_sender, replay_vote_sender, &mut batch_timing, @@ -598,7 +638,7 @@ pub fn process_entries_for_tests( fn process_entries( bank: &BankWithScheduler, replay_tx_thread_pool: &ThreadPool, - entries: &mut [ReplayEntry], + entries: Vec, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, batch_timing: &mut BatchExecutionTiming, @@ -624,78 +664,44 @@ fn process_entries( process_batches( bank, replay_tx_thread_pool, - &batches, + batches.drain(..), transaction_status_sender, replay_vote_sender, batch_timing, log_messages_bytes_limit, prioritization_fee_cache, )?; - batches.clear(); - for hash in &tick_hashes { - bank.register_tick(hash); + for hash in tick_hashes.drain(..) { + bank.register_tick(&hash); } - tick_hashes.clear(); } } EntryType::Transactions(transactions) => { - let starting_index = *starting_index; - let transaction_indexes = - (starting_index..starting_index.saturating_add(transactions.len())).collect(); - loop { - // try to lock the accounts - let batch = bank.prepare_sanitized_batch(transactions); - let first_lock_err = first_err(batch.lock_results()); - - // if locking worked - if first_lock_err.is_ok() { - batches.push(TransactionBatchWithIndexes { - batch, - transaction_indexes, - }); - // done with this entry - break; - } - // else we failed to lock, 2 possible reasons - if batches.is_empty() { - // An entry has account lock conflicts with *itself*, which should not happen - // if generated by a properly functioning leader - datapoint_error!( - "validator_process_entry_error", - ( - "error", - format!( - "Lock accounts error, entry conflicts with itself, txs: \ - {transactions:?}" - ), - String - ) - ); - // bail - first_lock_err?; - } else { - // else we have an entry that conflicts with a prior entry - // execute the current queue and try to process this entry again + queue_batches_with_lock_retry( + bank, + starting_index, + transactions, + &mut batches, + |batches| { process_batches( bank, replay_tx_thread_pool, - &batches, + batches, transaction_status_sender, replay_vote_sender, batch_timing, log_messages_bytes_limit, prioritization_fee_cache, - )?; - batches.clear(); - } - } + ) + }, + )?; } } } process_batches( bank, replay_tx_thread_pool, - &batches, + batches.into_iter(), transaction_status_sender, replay_vote_sender, batch_timing, @@ -703,11 +709,82 @@ fn process_entries( prioritization_fee_cache, )?; for hash in tick_hashes { - bank.register_tick(hash); + bank.register_tick(&hash); } Ok(()) } +/// If an entry can be locked without failure, the transactions are pushed +/// as a batch to `batches`. If the lock fails, the transactions are unlocked +/// and the batches are processed. +/// The locking process is retried, and if it fails again the block is marked +/// as dead. +/// If the lock retry succeeds, then the batch is pushed into `batches`. +fn queue_batches_with_lock_retry( + bank: &Bank, + starting_index: usize, + transactions: Vec, + batches: &mut Vec>, + mut process_batches: impl FnMut( + Drain>, + ) -> Result<()>, +) -> Result<()> { + // try to lock the accounts + let lock_results = bank.try_lock_accounts(&transactions); + let first_lock_err = first_err(&lock_results); + if first_lock_err.is_ok() { + batches.push(LockedTransactionsWithIndexes { + lock_results, + transactions, + starting_index, + }); + return Ok(()); + } + + // We need to unlock the transactions that succeeded to lock before the + // retry. + bank.unlock_accounts(transactions.iter().zip(lock_results.iter())); + + // We failed to lock, there are 2 possible reasons: + // 1. A batch already in `batches` holds the lock. + // 2. The batch is "self-conflicting" (i.e. the batch has account lock conflicts with itself) + + // Use the callback to process batches, and clear them. + // Clearing the batches will `Drop` the batches which will unlock the accounts. + process_batches(batches.drain(..))?; + + // Retry the lock + let lock_results = bank.try_lock_accounts(&transactions); + match first_err(&lock_results) { + Ok(()) => { + batches.push(LockedTransactionsWithIndexes { + lock_results, + transactions, + starting_index, + }); + Ok(()) + } + Err(err) => { + // We still may have succeeded to lock some accounts, unlock them. + bank.unlock_accounts(transactions.iter().zip(lock_results.iter())); + + // An entry has account lock conflicts with *itself*, which should not happen + // if generated by a properly functioning leader + datapoint_error!( + "validator_process_entry_error", + ( + "error", + format!( + "Lock accounts error, entry conflicts with itself, txs: {transactions:?}" + ), + String + ) + ); + Err(err) + } + } +} + #[derive(Error, Debug)] pub enum BlockstoreProcessorError { #[error("failed to load entries, error: {0}")] @@ -1608,7 +1685,7 @@ fn confirm_slot_entries( .expect("Transaction verification generates entries"); let mut replay_timer = Measure::start("replay_elapsed"); - let mut replay_entries: Vec<_> = entries + let replay_entries: Vec<_> = entries .into_iter() .zip(entry_tx_starting_indexes) .map(|(entry, tx_starting_index)| ReplayEntry { @@ -1619,7 +1696,7 @@ fn confirm_slot_entries( let process_result = process_entries( bank, replay_tx_thread_pool, - &mut replay_entries, + replay_entries, transaction_status_sender, replay_vote_sender, batch_execute_timing, @@ -4826,32 +4903,18 @@ pub mod tests { } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let txs = create_test_transactions(&mint_keypair, &genesis_config.hash()); - let batch = bank.prepare_sanitized_batch(&txs); - assert!(batch.needs_unlock()); + let lock_results = bank.try_lock_accounts(&txs); + assert!(lock_results.iter().all(Result::is_ok)); + let transaction_indexes = vec![42, 43, 44]; - let batch2 = rebatch_transactions( - batch.lock_results(), - &bank, - batch.sanitized_transactions(), - 0, - 0, - &transaction_indexes, - ); - assert!(batch.needs_unlock()); - assert!(!batch2.batch.needs_unlock()); - assert_eq!(batch2.transaction_indexes, vec![42]); + let batch = rebatch_transactions(&lock_results, &bank, &txs, 0..1, &transaction_indexes); + assert!(batch.batch.needs_unlock()); + assert_eq!(batch.transaction_indexes, vec![42]); - let batch3 = rebatch_transactions( - batch.lock_results(), - &bank, - batch.sanitized_transactions(), - 1, - 2, - &transaction_indexes, - ); - assert!(!batch3.batch.needs_unlock()); - assert_eq!(batch3.transaction_indexes, vec![43, 44]); + let batch2 = rebatch_transactions(&lock_results, &bank, &txs, 1..3, &transaction_indexes); + assert!(batch2.batch.needs_unlock()); + assert_eq!(batch2.transaction_indexes, vec![43, 44]); } fn do_test_schedule_batches_for_execution(should_succeed: bool) { @@ -4879,14 +4942,14 @@ pub mod tests { mocked_scheduler .expect_schedule_execution() .times(txs.len()) - .returning(|(_, _)| Ok(())); + .returning(|_, _| Ok(())); } else { // mocked_scheduler isn't async; so short-circuiting behavior is quite visible in that // .times(1) is called instead of .times(txs.len()), not like the succeeding case mocked_scheduler .expect_schedule_execution() .times(1) - .returning(|(_, _)| Err(SchedulerAborted)); + .returning(|_, _| Err(SchedulerAborted)); mocked_scheduler .expect_recover_error_after_abort() .times(1) @@ -4911,10 +4974,10 @@ pub mod tests { }); let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler))); - let batch = bank.prepare_sanitized_batch(&txs); - let batch_with_indexes = TransactionBatchWithIndexes { - batch, - transaction_indexes: (0..txs.len()).collect(), + let locked_entry = LockedTransactionsWithIndexes { + lock_results: bank.try_lock_accounts(&txs), + transactions: txs, + starting_index: 0, }; let replay_tx_thread_pool = create_thread_pool(1); @@ -4923,7 +4986,7 @@ pub mod tests { let result = process_batches( &bank, &replay_tx_thread_pool, - &[batch_with_indexes], + [locked_entry].into_iter(), None, None, &mut batch_execution_timing, diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 77661a4bc43cf2..9cc0ba9393b410 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -3351,17 +3351,24 @@ impl Bank { )) } + /// Attempt to take locks on the accounts in a transaction batch + pub fn try_lock_accounts(&self, txs: &[SanitizedTransaction]) -> Vec> { + let tx_account_lock_limit = self.get_transaction_account_lock_limit(); + self.rc + .accounts + .lock_accounts(txs.iter(), tx_account_lock_limit) + } + /// Prepare a locked transaction batch from a list of sanitized transactions. pub fn prepare_sanitized_batch<'a, 'b>( &'a self, txs: &'b [SanitizedTransaction], ) -> TransactionBatch<'a, 'b, SanitizedTransaction> { - let tx_account_lock_limit = self.get_transaction_account_lock_limit(); - let lock_results = self - .rc - .accounts - .lock_accounts(txs.iter(), tx_account_lock_limit); - TransactionBatch::new(lock_results, self, OwnedOrBorrowed::Borrowed(txs)) + TransactionBatch::new( + self.try_lock_accounts(txs), + self, + OwnedOrBorrowed::Borrowed(txs), + ) } /// Prepare a locked transaction batch from a list of sanitized transactions, and their cost diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index ee04859e3ea15e..db332228f03175 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -134,11 +134,7 @@ impl Debug for TimeoutListener { #[cfg_attr(feature = "dev-context-only-utils", automock)] // suppress false clippy complaints arising from mockall-derive: // warning: `#[must_use]` has no effect when applied to a struct field -// warning: the following explicit lifetimes could be elided: 'a -#[cfg_attr( - feature = "dev-context-only-utils", - allow(unused_attributes, clippy::needless_lifetimes) -)] +#[cfg_attr(feature = "dev-context-only-utils", allow(unused_attributes))] pub trait InstalledScheduler: Send + Sync + Debug + 'static { fn id(&self) -> SchedulerId; fn context(&self) -> &SchedulingContext; @@ -165,10 +161,8 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static { /// optimize the fast code-path of normal transaction scheduling to be multi-threaded at the /// cost of far slower error code-path while giving implementors increased flexibility by /// having &mut. - fn schedule_execution<'a>( - &'a self, - transaction_with_index: &'a (&'a SanitizedTransaction, usize), - ) -> ScheduleResult; + fn schedule_execution(&self, transaction: SanitizedTransaction, index: usize) + -> ScheduleResult; /// Return the error which caused the scheduler to abort. /// @@ -444,10 +438,9 @@ impl BankWithScheduler { /// /// Calling this will panic if the installed scheduler is Unavailable (the bank is /// wait_for_termination()-ed or the unified scheduler is disabled in the first place). - // 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet... - pub fn schedule_transaction_executions<'a>( + pub fn schedule_transaction_executions( &self, - transactions_with_indexes: impl ExactSizeIterator, + transactions_with_indexes: impl ExactSizeIterator, ) -> Result<()> { trace!( "schedule_transaction_executions(): {} txs", @@ -455,8 +448,8 @@ impl BankWithScheduler { ); let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| { - for (sanitized_transaction, &index) in transactions_with_indexes { - scheduler.schedule_execution(&(sanitized_transaction, index))?; + for (sanitized_transaction, index) in transactions_with_indexes { + scheduler.schedule_execution(sanitized_transaction, index)?; } Ok(()) }); @@ -856,12 +849,12 @@ mod tests { mocked .expect_schedule_execution() .times(1) - .returning(|(_, _)| Ok(())); + .returning(|_, _| Ok(())); } else { mocked .expect_schedule_execution() .times(1) - .returning(|(_, _)| Err(SchedulerAborted)); + .returning(|_, _| Err(SchedulerAborted)); mocked .expect_recover_error_after_abort() .times(1) @@ -871,7 +864,7 @@ mod tests { ); let bank = BankWithScheduler::new(bank, Some(mocked_scheduler)); - let result = bank.schedule_transaction_executions([(&tx0, &0)].into_iter()); + let result = bank.schedule_transaction_executions([(tx0, 0)].into_iter()); if should_succeed { assert_matches!(result, Ok(())); } else { diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 8c2745e138fd33..105e0d93326d3e 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -1411,9 +1411,10 @@ impl InstalledScheduler for PooledScheduler { fn schedule_execution( &self, - &(transaction, index): &(&SanitizedTransaction, usize), + transaction: SanitizedTransaction, + index: usize, ) -> ScheduleResult { - let task = SchedulingStateMachine::create_task(transaction.clone(), index, &mut |pubkey| { + let task = SchedulingStateMachine::create_task(transaction, index, &mut |pubkey| { self.inner.usage_queue_loader.load(pubkey) }); self.inner.thread_manager.send_task(task) @@ -1776,25 +1777,25 @@ mod tests { pool.register_timeout_listener(bank.create_timeout_listener()); let tx_before_stale = - &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 2, genesis_config.hash(), )); - bank.schedule_transaction_executions([(tx_before_stale, &0)].into_iter()) + bank.schedule_transaction_executions([(tx_before_stale, 0)].into_iter()) .unwrap(); sleepless_testing::at(TestCheckPoint::BeforeTimeoutListenerTriggered); sleepless_testing::at(TestCheckPoint::AfterTimeoutListenerTriggered); let tx_after_stale = - &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 2, genesis_config.hash(), )); - bank.schedule_transaction_executions([(tx_after_stale, &1)].into_iter()) + bank.schedule_transaction_executions([(tx_after_stale, 1)].into_iter()) .unwrap(); // Observe second occurrence of TimeoutListenerTriggered(1), which indicates a new timeout @@ -1896,26 +1897,26 @@ mod tests { pool.register_timeout_listener(bank.create_timeout_listener()); let tx_before_stale = - &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 2, genesis_config.hash(), )); - bank.schedule_transaction_executions([(tx_before_stale, &0)].into_iter()) + bank.schedule_transaction_executions([(tx_before_stale, 0)].into_iter()) .unwrap(); sleepless_testing::at(TestCheckPoint::BeforeTimeoutListenerTriggered); sleepless_testing::at(TestCheckPoint::AfterSchedulerThreadAborted); sleepless_testing::at(TestCheckPoint::AfterTimeoutListenerTriggered); let tx_after_stale = - &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 2, genesis_config.hash(), )); - let result = bank.schedule_transaction_executions([(tx_after_stale, &1)].into_iter()); + let result = bank.schedule_transaction_executions([(tx_after_stale, 1)].into_iter()); assert_matches!(result, Err(TransactionError::AccountNotFound)); let (result, _timings) = bank.wait_for_completed_scheduler().unwrap(); @@ -1960,7 +1961,7 @@ mod tests { .. } = create_genesis_config(10_000); - let tx = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + let tx = SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 2, @@ -1979,7 +1980,7 @@ mod tests { ); let context = SchedulingContext::new(bank.clone()); let scheduler = pool.do_take_scheduler(context); - scheduler.schedule_execution(&(tx, 0)).unwrap(); + scheduler.schedule_execution(tx, 0).unwrap(); match abort_case { AbortCase::Unhandled => { @@ -2082,13 +2083,13 @@ mod tests { for i in 0..MAX_TASK_COUNT { let tx = - &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 2, genesis_config.hash(), )); - scheduler.schedule_execution(&(tx, i)).unwrap(); + scheduler.schedule_execution(tx, i).unwrap(); } // Make sure ThreadManager::drop() is properly short-circuiting for non-aborting scheduler. @@ -2233,7 +2234,7 @@ mod tests { mint_keypair, .. } = create_genesis_config(10_000); - let tx0 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + let tx0 = SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 2, @@ -2248,7 +2249,7 @@ mod tests { assert_eq!(bank.transaction_count(), 0); let scheduler = pool.take_scheduler(context); - scheduler.schedule_execution(&(tx0, 0)).unwrap(); + scheduler.schedule_execution(tx0, 0).unwrap(); let bank = BankWithScheduler::new(bank, Some(scheduler)); assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); assert_eq!(bank.transaction_count(), 1); @@ -2294,19 +2295,19 @@ mod tests { let unfunded_keypair = Keypair::new(); let bad_tx = - &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &unfunded_keypair, &solana_sdk::pubkey::new_rand(), 2, genesis_config.hash(), )); assert_eq!(bank.transaction_count(), 0); - scheduler.schedule_execution(&(bad_tx, 0)).unwrap(); + scheduler.schedule_execution(bad_tx, 0).unwrap(); sleepless_testing::at(TestCheckPoint::AfterTaskHandled); assert_eq!(bank.transaction_count(), 0); let good_tx_after_bad_tx = - &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 3, @@ -2314,7 +2315,7 @@ mod tests { )); // make sure this tx is really a good one to execute. assert_matches!( - bank.simulate_transaction_unchecked(good_tx_after_bad_tx, false) + bank.simulate_transaction_unchecked(&good_tx_after_bad_tx, false) .result, Ok(_) ); @@ -2322,7 +2323,7 @@ mod tests { let bank = BankWithScheduler::new(bank, Some(scheduler)); if extra_tx_after_failure { assert_matches!( - bank.schedule_transaction_executions([(good_tx_after_bad_tx, &1)].into_iter()), + bank.schedule_transaction_executions([(good_tx_after_bad_tx, 1)].into_iter()), Err(TransactionError::AccountNotFound) ); } @@ -2425,13 +2426,13 @@ mod tests { for index in 0..TX_COUNT { // Use 2 non-conflicting txes to exercise the channel disconnected case as well. let tx = - &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &Keypair::new(), &solana_sdk::pubkey::new_rand(), 1, genesis_config.hash(), )); - scheduler.schedule_execution(&(tx, index)).unwrap(); + scheduler.schedule_execution(tx, index).unwrap(); } // finally unblock the scheduler thread; otherwise the above schedule_execution could // return SchedulerAborted... @@ -2499,13 +2500,13 @@ mod tests { for i in 0..10 { let tx = - &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 2, genesis_config.hash(), )); - scheduler.schedule_execution(&(tx, i)).unwrap(); + scheduler.schedule_execution(tx, i).unwrap(); } // finally unblock the scheduler thread; otherwise the above schedule_execution could // return SchedulerAborted... @@ -2563,13 +2564,13 @@ mod tests { } = create_genesis_config(10_000); // tx0 and tx1 is definitely conflicting to write-lock the mint address - let tx0 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + let tx0 = SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 2, genesis_config.hash(), )); - let tx1 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + let tx1 = SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 2, @@ -2594,10 +2595,10 @@ mod tests { // Stall handling tx0 and tx1 let lock_to_stall = LOCK_TO_STALL.lock().unwrap(); scheduler - .schedule_execution(&(tx0, STALLED_TRANSACTION_INDEX)) + .schedule_execution(tx0, STALLED_TRANSACTION_INDEX) .unwrap(); scheduler - .schedule_execution(&(tx1, BLOCKED_TRANSACTION_INDEX)) + .schedule_execution(tx1, BLOCKED_TRANSACTION_INDEX) .unwrap(); // Wait a bit for the scheduler thread to decide to block tx1 @@ -2656,7 +2657,7 @@ mod tests { // Create a dummy tx and two contexts let dummy_tx = - &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_sdk::pubkey::new_rand(), 2, @@ -2672,7 +2673,9 @@ mod tests { .take(10000) { let scheduler = pool.take_scheduler(context.clone()); - scheduler.schedule_execution(&(dummy_tx, index)).unwrap(); + scheduler + .schedule_execution(dummy_tx.clone(), index) + .unwrap(); scheduler.wait_for_termination(false).1.return_to_pool(); } } @@ -2714,9 +2717,10 @@ mod tests { fn schedule_execution( &self, - &(transaction, index): &(&SanitizedTransaction, usize), + transaction: SanitizedTransaction, + index: usize, ) -> ScheduleResult { - let transaction_and_index = (transaction.clone(), index); + let transaction_and_index = (transaction, index); let context = self.context().clone(); let pool = self.3.clone(); @@ -2854,7 +2858,7 @@ mod tests { assert_eq!(bank.transaction_count(), 0); // schedule but not immediately execute transaction - bank.schedule_transaction_executions([(&very_old_valid_tx, &0)].into_iter()) + bank.schedule_transaction_executions([(very_old_valid_tx, 0)].into_iter()) .unwrap(); // this calls register_recent_blockhash internally bank.fill_bank_with_ticks_for_tests(); @@ -2917,7 +2921,7 @@ mod tests { ); // mangle the transfer tx to try to lock fee_payer (= mint_keypair) address twice! tx.message.account_keys.push(tx.message.account_keys[0]); - let tx = &SanitizedTransaction::from_transaction_for_tests(tx); + let tx = SanitizedTransaction::from_transaction_for_tests(tx); // this internally should call SanitizedTransaction::get_account_locks(). let result = &mut Ok(()); @@ -2930,7 +2934,7 @@ mod tests { prioritization_fee_cache, }; - DefaultTaskHandler::handle(result, timings, bank, tx, 0, handler_context); + DefaultTaskHandler::handle(result, timings, bank, &tx, 0, handler_context); assert_matches!(result, Err(TransactionError::AccountLoadedTwice)); } }