diff --git a/ledger/benches/blockstore.rs b/ledger/benches/blockstore.rs index 27296d412d7ab3..853f4f0ff11e31 100644 --- a/ledger/benches/blockstore.rs +++ b/ledger/benches/blockstore.rs @@ -10,7 +10,8 @@ use { blockstore::{entries_to_test_shreds, Blockstore}, get_tmp_ledger_path_auto_delete, }, - solana_sdk::{clock::Slot, hash::Hash}, + solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signature}, + solana_transaction_status::TransactionStatusMeta, std::path::Path, test::Bencher, }; @@ -154,3 +155,121 @@ fn bench_insert_data_shred_big(bench: &mut Bencher) { blockstore.insert_shreds(shreds, None, false).unwrap(); }); } + +#[bench] +#[ignore] +fn bench_write_transaction_memos(b: &mut Bencher) { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = + Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..64).map(|_| Signature::new_unique()).collect(); + b.iter(|| { + for (slot, signature) in signatures.iter().enumerate() { + blockstore + .write_transaction_memos( + signature, + slot as u64, + "bench_write_transaction_memos".to_string(), + ) + .unwrap(); + } + }); +} + +#[bench] +#[ignore] +fn bench_add_transaction_memos_to_batch(b: &mut Bencher) { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = + Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..64).map(|_| Signature::new_unique()).collect(); + b.iter(|| { + let mut memos_batch = blockstore.get_write_batch().unwrap(); + + for (slot, signature) in signatures.iter().enumerate() { + blockstore + .add_transaction_memos_to_batch( + signature, + slot as u64, + "bench_write_transaction_memos".to_string(), + &mut memos_batch, + ) + .unwrap(); + } + + blockstore.write_batch(memos_batch).unwrap(); + }); +} + +#[bench] +#[ignore] +fn bench_write_transaction_status(b: &mut Bencher) { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = + Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..64).map(|_| Signature::new_unique()).collect(); + let keys_with_writable: Vec> = (0..64) + .map(|_| { + vec![ + (Pubkey::new_unique(), true), + (Pubkey::new_unique(), false), + (Pubkey::new_unique(), true), + (Pubkey::new_unique(), false), + ] + }) + .collect(); + let slot = 5; + + b.iter(|| { + for (tx_idx, signature) in signatures.iter().enumerate() { + blockstore + .write_transaction_status( + slot, + *signature, + keys_with_writable[tx_idx].iter().map(|(k, v)| (k, *v)), + TransactionStatusMeta::default(), + tx_idx, + ) + .unwrap(); + } + }); +} + +#[bench] +#[ignore] +fn bench_add_transaction_status_to_batch(b: &mut Bencher) { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = + Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..64).map(|_| Signature::new_unique()).collect(); + let keys_with_writable: Vec> = (0..64) + .map(|_| { + vec![ + (Pubkey::new_unique(), true), + (Pubkey::new_unique(), false), + (Pubkey::new_unique(), true), + (Pubkey::new_unique(), false), + ] + }) + .collect(); + let slot = 5; + + b.iter(|| { + let mut status_batch = blockstore.get_write_batch().unwrap(); + + for (tx_idx, signature) in signatures.iter().enumerate() { + blockstore + .add_transaction_status_to_batch( + slot, + *signature, + keys_with_writable[tx_idx].iter().map(|(k, v)| (k, *v)), + TransactionStatusMeta::default(), + tx_idx, + &mut status_batch, + ) + .unwrap(); + } + + blockstore.write_batch(status_batch).unwrap(); + }); +} diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 80ce6b83dce1a3..07d9a6d0cb5404 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -2892,14 +2892,19 @@ impl Blockstore { } } - pub fn write_transaction_status<'a>( + #[inline] + fn write_transaction_status_helper<'a, F>( &self, slot: Slot, signature: Signature, keys_with_writable: impl Iterator, status: TransactionStatusMeta, transaction_index: usize, - ) -> Result<()> { + mut write_fn: F, + ) -> Result<()> + where + F: FnMut(&Pubkey, Slot, u32, Signature, bool) -> Result<()>, + { let status = status.into(); let transaction_index = u32::try_from(transaction_index) .map_err(|_| BlockstoreError::TransactionIndexOverflow)?; @@ -2907,15 +2912,59 @@ impl Blockstore { .put_protobuf((signature, slot), &status)?; for (address, writeable) in keys_with_writable { - self.address_signatures_cf.put( - (*address, slot, transaction_index, signature), - &AddressSignatureMeta { writeable }, - )?; + write_fn(address, slot, transaction_index, signature, writeable)?; } Ok(()) } + pub fn write_transaction_status<'a>( + &self, + slot: Slot, + signature: Signature, + keys_with_writable: impl Iterator, + status: TransactionStatusMeta, + transaction_index: usize, + ) -> Result<()> { + self.write_transaction_status_helper( + slot, + signature, + keys_with_writable, + status, + transaction_index, + |address, slot, tx_index, signature, writeable| { + self.address_signatures_cf.put( + (*address, slot, tx_index, signature), + &AddressSignatureMeta { writeable }, + ) + }, + ) + } + + pub fn add_transaction_status_to_batch<'a>( + &self, + slot: Slot, + signature: Signature, + keys_with_writable: impl Iterator, + status: TransactionStatusMeta, + transaction_index: usize, + db_write_batch: &mut WriteBatch<'_>, + ) -> Result<()> { + self.write_transaction_status_helper( + slot, + signature, + keys_with_writable, + status, + transaction_index, + |address, slot, tx_index, signature, writeable| { + db_write_batch.put::( + (*address, slot, tx_index, signature), + &AddressSignatureMeta { writeable }, + ) + }, + ) + } + pub fn read_transaction_memos( &self, signature: Signature, @@ -2943,6 +2992,16 @@ impl Blockstore { self.transaction_memos_cf.put((*signature, slot), &memos) } + pub fn add_transaction_memos_to_batch( + &self, + signature: &Signature, + slot: Slot, + memos: String, + db_write_batch: &mut WriteBatch<'_>, + ) -> Result<()> { + db_write_batch.put::((*signature, slot), &memos) + } + /// Acquires the `lowest_cleanup_slot` lock and returns a tuple of the held lock /// and lowest available slot. /// @@ -4573,6 +4632,14 @@ impl Blockstore { *index_meta_time_us += total_start.as_us(); res } + + pub fn get_write_batch(&self) -> std::result::Result, BlockstoreError> { + self.db.batch() + } + + pub fn write_batch(&self, write_batch: WriteBatch) -> Result<()> { + self.db.write(write_batch) + } } // Update the `completed_data_indexes` with a new shred `new_shred_index`. If a @@ -12253,4 +12320,145 @@ pub mod tests { ); } } + + #[test] + fn test_write_transaction_memos() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let signature: Signature = Signature::new_unique(); + + blockstore + .write_transaction_memos(&signature, 4, "test_write_transaction_memos".to_string()) + .unwrap(); + + let memo = blockstore + .read_transaction_memos(signature, 4) + .expect("Expected to find memo"); + assert_eq!(memo, Some("test_write_transaction_memos".to_string())); + } + + #[test] + fn test_add_transaction_memos_to_batch() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..2).map(|_| Signature::new_unique()).collect(); + let mut memos_batch = blockstore.get_write_batch().unwrap(); + + blockstore + .add_transaction_memos_to_batch( + &signatures[0], + 4, + "test_write_transaction_memos1".to_string(), + &mut memos_batch, + ) + .unwrap(); + + blockstore + .add_transaction_memos_to_batch( + &signatures[1], + 5, + "test_write_transaction_memos2".to_string(), + &mut memos_batch, + ) + .unwrap(); + + blockstore.write_batch(memos_batch).unwrap(); + + let memo1 = blockstore + .read_transaction_memos(signatures[0], 4) + .expect("Expected to find memo"); + assert_eq!(memo1, Some("test_write_transaction_memos1".to_string())); + + let memo2 = blockstore + .read_transaction_memos(signatures[1], 5) + .expect("Expected to find memo"); + assert_eq!(memo2, Some("test_write_transaction_memos2".to_string())); + } + + #[test] + fn test_write_transaction_status() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..2).map(|_| Signature::new_unique()).collect(); + let keys_with_writable: Vec<(Pubkey, bool)> = + vec![(Pubkey::new_unique(), true), (Pubkey::new_unique(), false)]; + let slot = 5; + + blockstore + .write_transaction_status( + slot, + signatures[0], + keys_with_writable + .iter() + .map(|&(ref pubkey, writable)| (pubkey, writable)), + TransactionStatusMeta { + fee: 4200, + ..TransactionStatusMeta::default() + }, + 0, + ) + .unwrap(); + + let tx_status = blockstore + .read_transaction_status((signatures[0], slot)) + .unwrap() + .unwrap(); + assert_eq!(tx_status.fee, 4200); + } + + #[test] + fn test_add_transaction_status_to_batch() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..2).map(|_| Signature::new_unique()).collect(); + let keys_with_writable: Vec> = (0..2) + .map(|_| vec![(Pubkey::new_unique(), true), (Pubkey::new_unique(), false)]) + .collect(); + let slot = 5; + let mut status_batch = blockstore.get_write_batch().unwrap(); + + for (tx_idx, signature) in signatures.iter().enumerate() { + blockstore + .add_transaction_status_to_batch( + slot, + *signature, + keys_with_writable[tx_idx].iter().map(|(k, v)| (k, *v)), + TransactionStatusMeta { + fee: 5700 + tx_idx as u64, + status: if tx_idx % 2 == 0 { + Ok(()) + } else { + Err(TransactionError::InsufficientFundsForFee) + }, + ..TransactionStatusMeta::default() + }, + tx_idx, + &mut status_batch, + ) + .unwrap(); + } + + blockstore.write_batch(status_batch).unwrap(); + + let tx_status1 = blockstore + .read_transaction_status((signatures[0], slot)) + .unwrap() + .unwrap(); + assert_eq!(tx_status1.fee, 5700); + assert_eq!(tx_status1.status, Ok(())); + + let tx_status2 = blockstore + .read_transaction_status((signatures[1], slot)) + .unwrap() + .unwrap(); + assert_eq!(tx_status2.fee, 5701); + assert_eq!( + tx_status2.status, + Err(TransactionError::InsufficientFundsForFee) + ); + } } diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index 314f8b9a4f5fda..3ae6e2f130d1dd 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -73,6 +73,8 @@ impl TransactionStatusService { token_balances, transaction_indexes, }) => { + let mut status_and_memos_batch = blockstore.get_write_batch().unwrap(); + for ( transaction, commit_result, @@ -159,8 +161,15 @@ impl TransactionStatusService { if enable_rpc_transaction_history { if let Some(memos) = extract_and_fmt_memos(transaction.message()) { blockstore - .write_transaction_memos(transaction.signature(), slot, memos) - .expect("Expect database write to succeed: TransactionMemos"); + .add_transaction_memos_to_batch( + transaction.signature(), + slot, + memos, + &mut status_and_memos_batch, + ) + .expect( + "Expect database batch accumulation to succeed: TransactionMemos", + ); } let message = transaction.message(); @@ -171,16 +180,24 @@ impl TransactionStatusService { .map(|(index, key)| (key, message.is_writable(index))); blockstore - .write_transaction_status( + .add_transaction_status_to_batch( slot, *transaction.signature(), keys_with_writable, transaction_status_meta, transaction_index, + &mut status_and_memos_batch, ) - .expect("Expect database write to succeed: TransactionStatus"); + .expect( + "Expect database batch accumulation to succeed: TransactionStatus", + ); } } + + if enable_rpc_transaction_history { + blockstore.write_batch(status_and_memos_batch) + .expect("Expect database batched writes to succeed: TransactionStatus + TransactionMemos"); + } } TransactionStatusMessage::Freeze(slot) => { max_complete_transaction_status_slot.fetch_max(slot, Ordering::SeqCst); @@ -420,4 +437,132 @@ pub(crate) mod tests { result.transaction.signature() ); } + + #[test] + fn test_batch_transaction_status_and_memos() { + let genesis_config = create_genesis_config(2).genesis_config; + let (bank, _bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + + let (transaction_status_sender, transaction_status_receiver) = unbounded(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let blockstore = Arc::new(blockstore); + + let transaction1 = build_test_transaction_legacy(); + let transaction1 = VersionedTransaction::from(transaction1); + let transaction1 = SanitizedTransaction::try_create( + transaction1, + MessageHash::Compute, + None, + SimpleAddressLoader::Disabled, + &ReservedAccountKeys::empty_key_set(), + ) + .unwrap(); + + let transaction2 = build_test_transaction_legacy(); + let transaction2 = VersionedTransaction::from(transaction2); + let transaction2 = SanitizedTransaction::try_create( + transaction2, + MessageHash::Compute, + None, + SimpleAddressLoader::Disabled, + &ReservedAccountKeys::empty_key_set(), + ) + .unwrap(); + + let expected_transaction1 = transaction1.clone(); + let expected_transaction2 = transaction2.clone(); + + let commit_result = Ok(CommittedTransaction { + status: Ok(()), + log_messages: None, + inner_instructions: None, + return_data: None, + executed_units: 0, + fee_details: FeeDetails::default(), + rent_debits: RentDebits::default(), + loaded_account_stats: TransactionLoadedAccountsStats::default(), + }); + + let balances = TransactionBalancesSet { + pre_balances: vec![vec![123456], vec![234567]], + post_balances: vec![vec![234567], vec![345678]], + }; + + let token_balances = TransactionTokenBalancesSet { + pre_token_balances: vec![vec![], vec![]], + post_token_balances: vec![vec![], vec![]], + }; + + let slot = bank.slot(); + let transaction_index1: usize = bank.transaction_count().try_into().unwrap(); + let transaction_index2: usize = transaction_index1 + 1; + + let transaction_status_batch = TransactionStatusBatch { + slot, + transactions: vec![transaction1, transaction2], + commit_results: vec![commit_result.clone(), commit_result], + balances: balances.clone(), + token_balances, + transaction_indexes: vec![transaction_index1, transaction_index2], + }; + + let test_notifier = Arc::new(TestTransactionNotifier::new()); + + let exit = Arc::new(AtomicBool::new(false)); + let transaction_status_service = TransactionStatusService::new( + transaction_status_receiver, + Arc::new(AtomicU64::default()), + true, + Some(test_notifier.clone()), + blockstore, + false, + exit.clone(), + ); + + transaction_status_sender + .send(TransactionStatusMessage::Batch(transaction_status_batch)) + .unwrap(); + sleep(Duration::from_millis(5000)); + + exit.store(true, Ordering::Relaxed); + transaction_status_service.join().unwrap(); + assert_eq!(test_notifier.notifications.len(), 2); + + let key1 = TestNotifierKey { + slot, + transaction_index: transaction_index1, + signature: *expected_transaction1.signature(), + }; + let key2 = TestNotifierKey { + slot, + transaction_index: transaction_index2, + signature: *expected_transaction2.signature(), + }; + + assert!(test_notifier.notifications.contains_key(&key1)); + assert!(test_notifier.notifications.contains_key(&key2)); + + let result1 = test_notifier.notifications.get(&key1).unwrap(); + let result2 = test_notifier.notifications.get(&key2).unwrap(); + + assert_eq!( + expected_transaction1.signature(), + result1.transaction.signature() + ); + assert_eq!( + expected_transaction1.message_hash(), + result1.transaction.message_hash() + ); + + assert_eq!( + expected_transaction2.signature(), + result2.transaction.signature() + ); + assert_eq!( + expected_transaction2.message_hash(), + result2.transaction.message_hash() + ); + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 2a9a955768177a..18287e1a671576 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -346,6 +346,8 @@ pub struct TransactionSimulationResult { pub return_data: Option, pub inner_instructions: Option>, } + +#[derive(Clone)] pub struct TransactionBalancesSet { pub pre_balances: TransactionBalances, pub post_balances: TransactionBalances,