diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 78455fea538ee4..602684695ca7eb 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -651,7 +651,7 @@ impl BankingStage { // Clear payload for next iteration payload.sanitized_transactions.clear(); - payload.write_accounts.clear(); + payload.account_locks.clear(); let ProcessTransactionsSummary { reached_max_poh_height, diff --git a/core/src/lib.rs b/core/src/lib.rs index f8b375c96d62b9..72beb7fde75991 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -47,6 +47,7 @@ pub mod poh_timing_report_service; pub mod poh_timing_reporter; pub mod progress_map; pub mod qos_service; +pub mod read_write_account_set; pub mod repair_generic_traversal; pub mod repair_response; pub mod repair_service; diff --git a/core/src/read_write_account_set.rs b/core/src/read_write_account_set.rs new file mode 100644 index 00000000000000..a0a8b41ca30819 --- /dev/null +++ b/core/src/read_write_account_set.rs @@ -0,0 +1,354 @@ +use { + solana_sdk::{ + message::{SanitizedMessage, VersionedMessage}, + pubkey::Pubkey, + }, + std::collections::HashSet, +}; + +/// Wrapper struct to check account locks for a batch of transactions. +#[derive(Debug, Default)] +pub struct ReadWriteAccountSet { + /// Set of accounts that are locked for read + read_set: HashSet, + /// Set of accounts that are locked for write + write_set: HashSet, +} + +impl ReadWriteAccountSet { + /// Check static account locks for a transaction message. + pub fn check_static_account_locks(&self, message: &VersionedMessage) -> bool { + !message + .static_account_keys() + .iter() + .enumerate() + .any(|(index, pubkey)| { + if message.is_maybe_writable(index) { + !self.can_write(pubkey) + } else { + !self.can_read(pubkey) + } + }) + } + + /// Check all account locks and if they are available, lock them. + /// Returns true if all account locks are available and false otherwise. + pub fn try_locking(&mut self, message: &SanitizedMessage) -> bool { + if self.check_sanitized_message_account_locks(message) { + self.add_sanitized_message_account_locks(message); + true + } else { + false + } + } + + /// Clears the read and write sets + pub fn clear(&mut self) { + self.read_set.clear(); + self.write_set.clear(); + } + + /// Check if a sanitized message's account locks are available. + fn check_sanitized_message_account_locks(&self, message: &SanitizedMessage) -> bool { + !message + .account_keys() + .iter() + .enumerate() + .any(|(index, pubkey)| { + if message.is_writable(index) { + !self.can_write(pubkey) + } else { + !self.can_read(pubkey) + } + }) + } + + /// Insert the read and write locks for a sanitized message. + fn add_sanitized_message_account_locks(&mut self, message: &SanitizedMessage) { + message + .account_keys() + .iter() + .enumerate() + .for_each(|(index, pubkey)| { + if message.is_writable(index) { + self.add_write(pubkey); + } else { + self.add_read(pubkey); + } + }); + } + + /// Check if an account can be read-locked + fn can_read(&self, pubkey: &Pubkey) -> bool { + !self.write_set.contains(pubkey) + } + + /// Check if an account can be write-locked + fn can_write(&self, pubkey: &Pubkey) -> bool { + !self.write_set.contains(pubkey) && !self.read_set.contains(pubkey) + } + + /// Add an account to the read-set. + /// Should only be called after `can_read()` returns true + fn add_read(&mut self, pubkey: &Pubkey) { + self.read_set.insert(*pubkey); + } + + /// Add an account to the write-set. + /// Should only be called after `can_write()` returns true + fn add_write(&mut self, pubkey: &Pubkey) { + self.write_set.insert(*pubkey); + } +} + +#[cfg(test)] +mod tests { + use { + super::ReadWriteAccountSet, + solana_address_lookup_table_program::state::{AddressLookupTable, LookupTableMeta}, + solana_ledger::genesis_utils::GenesisConfigInfo, + solana_runtime::{bank::Bank, genesis_utils::create_genesis_config}, + solana_sdk::{ + account::AccountSharedData, + hash::Hash, + message::{ + v0::{self, MessageAddressTableLookup}, + MessageHeader, VersionedMessage, + }, + pubkey::Pubkey, + signature::Keypair, + signer::Signer, + transaction::{MessageHash, SanitizedTransaction, VersionedTransaction}, + }, + std::{borrow::Cow, sync::Arc}, + }; + + fn create_test_versioned_message( + write_keys: &[Pubkey], + read_keys: &[Pubkey], + address_table_lookups: Vec, + ) -> VersionedMessage { + VersionedMessage::V0(v0::Message { + header: MessageHeader { + num_required_signatures: write_keys.len() as u8, + num_readonly_signed_accounts: 0, + num_readonly_unsigned_accounts: read_keys.len() as u8, + }, + recent_blockhash: Hash::default(), + account_keys: write_keys.iter().chain(read_keys.iter()).copied().collect(), + address_table_lookups, + instructions: vec![], + }) + } + + fn create_test_sanitized_transaction( + write_keypair: &Keypair, + read_keys: &[Pubkey], + address_table_lookups: Vec, + bank: &Bank, + ) -> SanitizedTransaction { + let message = create_test_versioned_message( + &[write_keypair.pubkey()], + read_keys, + address_table_lookups, + ); + SanitizedTransaction::try_create( + VersionedTransaction::try_new(message, &[write_keypair]).unwrap(), + MessageHash::Compute, + Some(false), + bank, + true, // require_static_program_ids + ) + .unwrap() + } + + fn create_test_address_lookup_table( + bank: Arc, + num_addresses: usize, + ) -> (Arc, Pubkey) { + let mut addresses = Vec::with_capacity(num_addresses); + addresses.resize_with(num_addresses, Pubkey::new_unique); + let address_lookup_table = AddressLookupTable { + meta: LookupTableMeta { + authority: None, + ..LookupTableMeta::default() + }, + addresses: Cow::Owned(addresses), + }; + + let address_table_key = Pubkey::new_unique(); + let data = address_lookup_table.serialize_for_tests().unwrap(); + let mut account = + AccountSharedData::new(1, data.len(), &solana_address_lookup_table_program::id()); + account.set_data(data); + bank.store_account(&address_table_key, &account); + + ( + Arc::new(Bank::new_from_parent( + &bank, + &Pubkey::new_unique(), + bank.slot() + 1, + )), + address_table_key, + ) + } + + fn create_test_bank() -> Arc { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)) + } + + // Helper function (could potentially use test_case in future). + // conflict_index = 0 means write lock conflict + // conflict_index = 1 means read lock conflict + fn test_check_static_account_locks(conflict_index: usize, add_write: bool, expectation: bool) { + let message = + create_test_versioned_message(&[Pubkey::new_unique()], &[Pubkey::new_unique()], vec![]); + + let mut account_locks = ReadWriteAccountSet::default(); + assert!(account_locks.check_static_account_locks(&message)); + + let conflict_key = message.static_account_keys().get(conflict_index).unwrap(); + if add_write { + account_locks.add_write(conflict_key); + } else { + account_locks.add_read(conflict_key); + } + assert_eq!( + expectation, + account_locks.check_static_account_locks(&message) + ); + } + + #[test] + fn test_check_static_account_locks_write_write_conflict() { + test_check_static_account_locks(0, true, false); + } + + #[test] + fn test_check_static_account_locks_read_write_conflict() { + test_check_static_account_locks(0, false, false); + } + + #[test] + fn test_check_static_account_locks_write_read_conflict() { + test_check_static_account_locks(1, true, false); + } + + #[test] + fn test_check_static_account_locks_read_read_non_conflict() { + test_check_static_account_locks(1, false, true); + } + + // Helper function (could potentially use test_case in future). + // conflict_index = 0 means write lock conflict with static key + // conflict_index = 1 means read lock conflict with static key + // conflict_index = 2 means write lock conflict with address table key + // conflict_index = 3 means read lock conflict with address table key + fn test_check_sanitized_message_account_locks( + conflict_index: usize, + add_write: bool, + expectation: bool, + ) { + let bank = create_test_bank(); + let (bank, table_address) = create_test_address_lookup_table(bank, 2); + let tx = create_test_sanitized_transaction( + &Keypair::new(), + &[Pubkey::new_unique()], + vec![MessageAddressTableLookup { + account_key: table_address, + writable_indexes: vec![0], + readonly_indexes: vec![1], + }], + &bank, + ); + let message = tx.message(); + + let mut account_locks = ReadWriteAccountSet::default(); + assert!(account_locks.check_sanitized_message_account_locks(message)); + + let conflict_key = message.account_keys().get(conflict_index).unwrap(); + if add_write { + account_locks.add_write(conflict_key); + } else { + account_locks.add_read(conflict_key); + } + assert_eq!( + expectation, + account_locks.check_sanitized_message_account_locks(message) + ); + } + + #[test] + fn test_check_sanitized_message_account_locks_write_write_conflict() { + test_check_sanitized_message_account_locks(0, true, false); // static key conflict + test_check_sanitized_message_account_locks(2, true, false); // lookup key conflict + } + + #[test] + fn test_check_sanitized_message_account_locks_read_write_conflict() { + test_check_sanitized_message_account_locks(0, false, false); // static key conflict + test_check_sanitized_message_account_locks(2, false, false); // lookup key conflict + } + + #[test] + fn test_check_sanitized_message_account_locks_write_read_conflict() { + test_check_sanitized_message_account_locks(1, true, false); // static key conflict + test_check_sanitized_message_account_locks(3, true, false); // lookup key conflict + } + + #[test] + fn test_check_sanitized_message_account_locks_read_read_non_conflict() { + test_check_sanitized_message_account_locks(1, false, true); // static key conflict + test_check_sanitized_message_account_locks(3, false, true); // lookup key conflict + } + + #[test] + pub fn test_write_write_conflict() { + let mut account_locks = ReadWriteAccountSet::default(); + let account = Pubkey::new_unique(); + assert!(account_locks.can_write(&account)); + account_locks.add_write(&account); + assert!(!account_locks.can_write(&account)); + } + + #[test] + pub fn test_read_write_conflict() { + let mut account_locks = ReadWriteAccountSet::default(); + let account = Pubkey::new_unique(); + assert!(account_locks.can_read(&account)); + account_locks.add_read(&account); + assert!(!account_locks.can_write(&account)); + assert!(account_locks.can_read(&account)); + } + + #[test] + pub fn test_write_read_conflict() { + let mut account_locks = ReadWriteAccountSet::default(); + let account = Pubkey::new_unique(); + assert!(account_locks.can_write(&account)); + account_locks.add_write(&account); + assert!(!account_locks.can_write(&account)); + assert!(!account_locks.can_read(&account)); + } + + #[test] + pub fn test_read_read_non_conflict() { + let mut account_locks = ReadWriteAccountSet::default(); + let account = Pubkey::new_unique(); + assert!(account_locks.can_read(&account)); + account_locks.add_read(&account); + assert!(account_locks.can_read(&account)); + } + + #[test] + pub fn test_write_write_different_keys() { + let mut account_locks = ReadWriteAccountSet::default(); + let account1 = Pubkey::new_unique(); + let account2 = Pubkey::new_unique(); + assert!(account_locks.can_write(&account1)); + account_locks.add_write(&account1); + assert!(account_locks.can_write(&account2)); + assert!(account_locks.can_read(&account2)); + } +} diff --git a/core/src/unprocessed_transaction_storage.rs b/core/src/unprocessed_transaction_storage.rs index 2e136f8d06f4f8..87258faf2fe9a6 100644 --- a/core/src/unprocessed_transaction_storage.rs +++ b/core/src/unprocessed_transaction_storage.rs @@ -9,6 +9,7 @@ use { }, leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, multi_iterator_scanner::{MultiIteratorScanner, ProcessingDecision}, + read_write_account_set::ReadWriteAccountSet, unprocessed_packet_batches::{ DeserializedPacket, PacketBatchInsertionMetrics, UnprocessedPacketBatches, }, @@ -18,13 +19,10 @@ use { solana_measure::measure, solana_runtime::bank::Bank, solana_sdk::{ - clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, pubkey::Pubkey, - saturating_add_assign, transaction::SanitizedTransaction, - }, - std::{ - collections::HashSet, - sync::{atomic::Ordering, Arc}, + clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, saturating_add_assign, + transaction::SanitizedTransaction, }, + std::sync::{atomic::Ordering, Arc}, }; // Step-size set to be 64, equal to the maximum batch/entry size. With the @@ -131,7 +129,7 @@ fn filter_processed_packets<'a, F>( /// multi-iterator checking function. pub struct ConsumeScannerPayload<'a> { pub reached_end_of_slot: bool, - pub write_accounts: HashSet, + pub account_locks: ReadWriteAccountSet, pub sanitized_transactions: Vec, pub slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, } @@ -149,17 +147,8 @@ fn consume_scan_should_process_packet( // Before sanitization, let's quickly check the static keys (performance optimization) let message = &packet.transaction().get_message().message; - let static_keys = message.static_account_keys(); - for key in static_keys.iter().enumerate().filter_map(|(idx, key)| { - if message.is_maybe_writable(idx) { - Some(key) - } else { - None - } - }) { - if payload.write_accounts.contains(key) { - return ProcessingDecision::Later; - } + if !payload.account_locks.check_static_account_locks(message) { + return ProcessingDecision::Later; } // Try to deserialize the packet @@ -178,29 +167,19 @@ fn consume_scan_should_process_packet( if let Some(sanitized_transaction) = maybe_sanitized_transaction { let message = sanitized_transaction.message(); - let conflicts_with_batch = message.account_keys().iter().enumerate().any(|(idx, key)| { - if message.is_writable(idx) { - payload.write_accounts.contains(key) - } else { - false - } - }); - - if conflicts_with_batch { - ProcessingDecision::Later - } else { - message - .account_keys() - .iter() - .enumerate() - .for_each(|(idx, key)| { - if message.is_writable(idx) { - payload.write_accounts.insert(*key); - } - }); - + // Check the number of locks and whether there are duplicates + if SanitizedTransaction::validate_account_locks( + message, + bank.get_transaction_account_lock_limit(), + ) + .is_err() + { + ProcessingDecision::Never + } else if payload.account_locks.try_locking(message) { payload.sanitized_transactions.push(sanitized_transaction); ProcessingDecision::Now + } else { + ProcessingDecision::Later } } else { ProcessingDecision::Never @@ -221,7 +200,7 @@ where { let payload = ConsumeScannerPayload { reached_end_of_slot: false, - write_accounts: HashSet::new(), + account_locks: ReadWriteAccountSet::default(), sanitized_transactions: Vec::with_capacity(UNPROCESSED_BUFFER_STEP_SIZE), slot_metrics_tracker, }; diff --git a/sdk/src/transaction/sanitized.rs b/sdk/src/transaction/sanitized.rs index eaad070937ab41..cb2dfe838f146b 100644 --- a/sdk/src/transaction/sanitized.rs +++ b/sdk/src/transaction/sanitized.rs @@ -277,7 +277,7 @@ impl SanitizedTransaction { } /// Validate a transaction message against locked accounts - fn validate_account_locks( + pub fn validate_account_locks( message: &SanitizedMessage, tx_account_lock_limit: usize, ) -> Result<()> {