diff --git a/Cargo.lock b/Cargo.lock index 650b369d205c5c..5a22bae6ffcc6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5828,6 +5828,7 @@ dependencies = [ "solana-streamer", "solana-svm", "solana-tpu-client", + "solana-transaction-metrics-tracker", "solana-transaction-status", "solana-turbine", "solana-unified-scheduler-pool", @@ -7190,9 +7191,11 @@ dependencies = [ "rand 0.8.5", "rustls", "solana-logger", + "solana-measure", "solana-metrics", "solana-perf", "solana-sdk", + "solana-transaction-metrics-tracker", "thiserror", "tokio", "x509-parser", @@ -7359,6 +7362,20 @@ dependencies = [ "solana-version", ] +[[package]] +name = "solana-transaction-metrics-tracker" +version = "1.19.0" +dependencies = [ + "Inflector", + "base64 0.21.7", + "bincode", + "lazy_static", + "log", + "rand 0.8.5", + "solana-perf", + "solana-sdk", +] + [[package]] name = "solana-transaction-status" version = "1.19.0" diff --git a/Cargo.toml b/Cargo.toml index 66436c9cfb3fd8..e4979a0841abd5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,6 +106,7 @@ members = [ "tokens", "tpu-client", "transaction-dos", + "transaction-metrics-tracker", "transaction-status", "turbine", "udp-client", @@ -378,6 +379,7 @@ solana-system-program = { path = "programs/system", version = "=1.19.0" } solana-test-validator = { path = "test-validator", version = "=1.19.0" } solana-thin-client = { path = "thin-client", version = "=1.19.0" } solana-tpu-client = { path = "tpu-client", version = "=1.19.0", default-features = false } +solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=1.19.0" } solana-transaction-status = { path = "transaction-status", version = "=1.19.0" } solana-turbine = { path = "turbine", version = "=1.19.0" } solana-udp-client = { path = "udp-client", version = "=1.19.0" } diff --git a/accounts-db/src/accounts.rs b/accounts-db/src/accounts.rs index 371db9eb08c095..4294d8c81e64bf 100644 --- a/accounts-db/src/accounts.rs +++ b/accounts-db/src/accounts.rs @@ -23,7 +23,9 @@ use { nonce_info::{NonceFull, NonceInfo}, pubkey::Pubkey, slot_hashes::SlotHashes, - transaction::{Result, SanitizedTransaction, TransactionAccountLocks, TransactionError}, + transaction::{ + ExtendedSanitizedTransaction, Result, TransactionAccountLocks, TransactionError, + }, transaction_context::TransactionAccount, }, solana_svm::{ @@ -31,10 +33,7 @@ use { }, std::{ cmp::Reverse, - collections::{ - hash_map::{self}, - BinaryHeap, HashMap, HashSet, - }, + collections::{hash_map, BinaryHeap, HashMap, HashSet}, ops::RangeBounds, sync::{ atomic::{AtomicUsize, Ordering}, @@ -568,11 +567,11 @@ impl Accounts { #[allow(clippy::needless_collect)] pub fn lock_accounts<'a>( &self, - txs: impl Iterator, + txs: impl Iterator, tx_account_lock_limit: usize, ) -> Vec> { let tx_account_locks_results: Vec> = txs - .map(|tx| tx.get_account_locks(tx_account_lock_limit)) + .map(|tx| tx.transaction.get_account_locks(tx_account_lock_limit)) .collect(); self.lock_accounts_inner(tx_account_locks_results) } @@ -581,14 +580,14 @@ impl Accounts { #[allow(clippy::needless_collect)] pub fn lock_accounts_with_results<'a>( &self, - txs: impl Iterator, + txs: impl Iterator, results: impl Iterator>, tx_account_lock_limit: usize, ) -> Vec> { let tx_account_locks_results: Vec> = txs .zip(results) .map(|(tx, result)| match result { - Ok(()) => tx.get_account_locks(tx_account_lock_limit), + Ok(()) => tx.transaction.get_account_locks(tx_account_lock_limit), Err(err) => Err(err), }) .collect(); @@ -618,13 +617,13 @@ impl Accounts { #[allow(clippy::needless_collect)] pub fn unlock_accounts<'a>( &self, - txs: impl Iterator, + txs: impl Iterator, results: &[Result<()>], ) { let keys: Vec<_> = txs .zip(results) .filter(|(_, res)| res.is_ok()) - .map(|(tx, _)| tx.get_account_locks_unchecked()) + .map(|(tx, _)| tx.transaction.get_account_locks_unchecked()) .collect(); let mut account_locks = self.account_locks.lock().unwrap(); debug!("bank unlock accounts"); @@ -639,7 +638,7 @@ impl Accounts { pub fn store_cached( &self, slot: Slot, - txs: &[SanitizedTransaction], + txs: &[ExtendedSanitizedTransaction], res: &[TransactionExecutionResult], loaded: &mut [TransactionLoadResult], durable_nonce: &DurableNonce, @@ -666,14 +665,14 @@ impl Accounts { #[allow(clippy::too_many_arguments)] fn collect_accounts_to_store<'a>( &self, - txs: &'a [SanitizedTransaction], + txs: &'a [ExtendedSanitizedTransaction], execution_results: &'a [TransactionExecutionResult], load_results: &'a mut [TransactionLoadResult], durable_nonce: &DurableNonce, lamports_per_signature: u64, ) -> ( Vec<(&'a Pubkey, &'a AccountSharedData)>, - Vec>, + Vec>, ) { let mut accounts = Vec::with_capacity(load_results.len()); let mut transactions = Vec::with_capacity(load_results.len()); @@ -701,7 +700,7 @@ impl Accounts { } }; - let message = tx.message(); + let message = tx.transaction.message(); let loaded_transaction = tx_load_result.as_mut().unwrap(); let mut fee_payer_index = None; for (i, (address, account)) in (0..message.account_keys().len()) diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 1f3c36876f4531..093f1cf53e706a 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -93,7 +93,7 @@ use { rent_collector::RentCollector, saturating_add_assign, timing::AtomicInterval, - transaction::SanitizedTransaction, + transaction::ExtendedSanitizedTransaction, }, std::{ borrow::{Borrow, Cow}, @@ -6621,7 +6621,7 @@ impl AccountsDb { &self, slot: Slot, accounts_and_meta_to_store: &impl StorableAccounts<'b, T>, - txn_iter: Box> + 'a>, + txn_iter: Box> + 'a>, mut write_version_producer: P, ) -> Vec where @@ -6639,7 +6639,7 @@ impl AccountsDb { self.notify_account_at_accounts_update( slot, &account, - txn, + &txn.map(|txn| &txn.transaction), accounts_and_meta_to_store.pubkey(i), &mut write_version_producer, ); @@ -6671,7 +6671,7 @@ impl AccountsDb { hashes: Option>>, mut write_version_producer: P, store_to: &StoreTo, - transactions: Option<&[Option<&'a SanitizedTransaction>]>, + transactions: Option<&[Option<&'a ExtendedSanitizedTransaction>]>, ) -> Vec { let mut calc_stored_meta_time = Measure::start("calc_stored_meta"); let slot = accounts.target_slot(); @@ -6686,14 +6686,15 @@ impl AccountsDb { match store_to { StoreTo::Cache => { - let txn_iter: Box>> = - match transactions { - Some(transactions) => { - assert_eq!(transactions.len(), accounts.len()); - Box::new(transactions.iter()) - } - None => Box::new(std::iter::repeat(&None).take(accounts.len())), - }; + let txn_iter: Box< + dyn std::iter::Iterator>, + > = match transactions { + Some(transactions) => { + assert_eq!(transactions.len(), accounts.len()); + Box::new(transactions.iter()) + } + None => Box::new(std::iter::repeat(&None).take(accounts.len())), + }; self.write_accounts_to_cache(slot, accounts, txn_iter, write_version_producer) } @@ -8377,7 +8378,7 @@ impl AccountsDb { pub fn store_cached<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>( &self, accounts: impl StorableAccounts<'a, T>, - transactions: Option<&'a [Option<&'a SanitizedTransaction>]>, + transactions: Option<&'a [Option<&'a ExtendedSanitizedTransaction>]>, ) { self.store( accounts, @@ -8394,7 +8395,7 @@ impl AccountsDb { >( &self, accounts: impl StorableAccounts<'a, T>, - transactions: Option<&'a [Option<&'a SanitizedTransaction>]>, + transactions: Option<&'a [Option<&'a ExtendedSanitizedTransaction>]>, ) { self.store( accounts, @@ -8422,7 +8423,7 @@ impl AccountsDb { &self, accounts: impl StorableAccounts<'a, T>, store_to: &StoreTo, - transactions: Option<&'a [Option<&'a SanitizedTransaction>]>, + transactions: Option<&'a [Option<&'a ExtendedSanitizedTransaction>]>, reclaim: StoreReclaims, update_index_thread_selection: UpdateIndexThreadSelection, ) { @@ -8613,7 +8614,7 @@ impl AccountsDb { accounts: impl StorableAccounts<'a, T>, hashes: Option>>, store_to: &StoreTo, - transactions: Option<&'a [Option<&'a SanitizedTransaction>]>, + transactions: Option<&'a [Option<&'a ExtendedSanitizedTransaction>]>, reclaim: StoreReclaims, update_index_thread_selection: UpdateIndexThreadSelection, ) { @@ -8668,7 +8669,7 @@ impl AccountsDb { write_version_producer: Option>>, store_to: &StoreTo, reset_accounts: bool, - transactions: Option<&[Option<&SanitizedTransaction>]>, + transactions: Option<&[Option<&ExtendedSanitizedTransaction>]>, reclaim: StoreReclaims, update_index_thread_selection: UpdateIndexThreadSelection, ) -> StoreAccountsTiming { diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index b3028c0132ed48..973c591e622edb 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -23,7 +23,10 @@ use { message::{Message, SanitizedMessage}, pubkey::Pubkey, signature::Signature, - transaction::{self, MessageHash, SanitizedTransaction, VersionedTransaction}, + transaction::{ + self, ExtendedSanitizedTransaction, MessageHash, SanitizedTransaction, + VersionedTransaction, + }, }, solana_send_transaction_service::{ send_transaction_service::{SendTransactionService, TransactionInfo}, @@ -194,7 +197,10 @@ fn simulate_transaction( units_consumed, return_data, inner_instructions, - } = bank.simulate_transaction_unchecked(&sanitized_transaction, false); + } = bank.simulate_transaction_unchecked( + &ExtendedSanitizedTransaction::from(sanitized_transaction), + false, + ); let simulation_details = TransactionSimulationDetails { logs, diff --git a/core/Cargo.toml b/core/Cargo.toml index e2a936cdabc4c1..1fd25ec38a8d3b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -67,6 +67,7 @@ solana-send-transaction-service = { workspace = true } solana-streamer = { workspace = true } solana-svm = { workspace = true } solana-tpu-client = { workspace = true } +solana-transaction-metrics-tracker = { workspace = true } solana-transaction-status = { workspace = true } solana-turbine = { workspace = true } solana-unified-scheduler-pool = { workspace = true } diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index f4ac6c6040eda8..2c31c9fd8c6d88 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -29,7 +29,7 @@ use { message::SanitizedMessage, saturating_add_assign, timing::timestamp, - transaction::{self, AddressLoader, SanitizedTransaction, TransactionError}, + transaction::{self, AddressLoader, ExtendedSanitizedTransaction, TransactionError}, }, solana_svm::{ account_loader::{validate_fee_payer, TransactionCheckResult}, @@ -208,6 +208,32 @@ impl Consumer { .slot_metrics_tracker .increment_retryable_packets_count(retryable_transaction_indexes.len() as u64); + // Now we track the performance for the interested transactions which is not in the retryable_transaction_indexes + // We assume the retryable_transaction_indexes is already sorted. + let mut retryable_idx = 0; + for (index, packet) in packets_to_process.iter().enumerate() { + if packet.original_packet().meta().is_perf_track_packet() { + if let Some(start_time) = packet.start_time() { + if retryable_idx >= retryable_transaction_indexes.len() + || retryable_transaction_indexes[retryable_idx] != index + { + let duration = Instant::now().duration_since(*start_time); + + debug!( + "Banking stage processing took {duration:?} for transaction {:?}", + packet.transaction().get_signatures().first() + ); + payload + .slot_metrics_tracker + .increment_process_sampled_packets_us(duration.as_micros() as u64); + } else { + // This packet is retried, advance the retry index to the next, as the next packet's index will + // certainly be > than this. + retryable_idx += 1; + } + } + } + } Some(retryable_transaction_indexes) } @@ -215,7 +241,7 @@ impl Consumer { &self, bank: &Arc, bank_creation_time: &Instant, - sanitized_transactions: &[SanitizedTransaction], + sanitized_transactions: &[ExtendedSanitizedTransaction], banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> ProcessTransactionsSummary { @@ -271,7 +297,7 @@ impl Consumer { &self, bank: &Arc, bank_creation_time: &Instant, - transactions: &[SanitizedTransaction], + transactions: &[ExtendedSanitizedTransaction], ) -> ProcessTransactionsSummary { let mut chunk_start = 0; let mut all_retryable_tx_indexes = vec![]; @@ -405,7 +431,7 @@ impl Consumer { pub fn process_and_record_transactions( &self, bank: &Arc, - txs: &[SanitizedTransaction], + txs: &[ExtendedSanitizedTransaction], chunk_offset: usize, ) -> ProcessTransactionBatchOutput { let mut error_counters = TransactionErrorMetrics::default(); @@ -433,7 +459,7 @@ impl Consumer { pub fn process_and_record_aged_transactions( &self, bank: &Arc, - txs: &[SanitizedTransaction], + txs: &[ExtendedSanitizedTransaction], max_slot_ages: &[Slot], ) -> ProcessTransactionBatchOutput { // Need to filter out transactions since they were sanitized earlier. @@ -445,7 +471,7 @@ impl Consumer { // Re-sanitized transaction should be equal to the original transaction, // but whether it will pass sanitization needs to be checked. let resanitized_tx = - bank.fully_verify_transaction(tx.to_versioned_transaction())?; + bank.fully_verify_transaction(tx.transaction.to_versioned_transaction())?; if resanitized_tx != *tx { // Sanitization before/after epoch give different transaction data - do not execute. return Err(TransactionError::ResanitizationNeeded); @@ -453,7 +479,7 @@ impl Consumer { } else { // Any transaction executed between sanitization time and now may have closed the lookup table(s). // Above re-sanitization already loads addresses, so don't need to re-check in that case. - let lookup_tables = tx.message().message_address_table_lookups(); + let lookup_tables = tx.transaction.message().message_address_table_lookups(); if !lookup_tables.is_empty() { bank.load_addresses(lookup_tables)?; } @@ -466,7 +492,7 @@ impl Consumer { fn process_and_record_transactions_with_pre_results( &self, bank: &Arc, - txs: &[SanitizedTransaction], + txs: &[ExtendedSanitizedTransaction], chunk_offset: usize, pre_results: impl Iterator>, ) -> ProcessTransactionBatchOutput { @@ -582,6 +608,7 @@ impl Consumer { .filter_map(|transaction| { let round_compute_unit_price_enabled = false; // TODO get from working_bank.feature_set transaction + .transaction .get_compute_budget_details(round_compute_unit_price_enabled) .map(|details| details.compute_unit_price) }) @@ -622,7 +649,7 @@ impl Consumer { .zip(batch.sanitized_transactions()) .filter_map(|(execution_result, tx)| { if execution_result.was_executed() { - Some(tx.to_versioned_transaction()) + Some(tx.transaction.to_versioned_transaction()) } else { None } @@ -786,7 +813,7 @@ impl Consumer { /// * `pending_indexes` - identifies which indexes in the `transactions` list are still pending fn filter_pending_packets_from_pending_txs( bank: &Bank, - transactions: &[SanitizedTransaction], + transactions: &[ExtendedSanitizedTransaction], pending_indexes: &[usize], ) -> Vec { let filter = diff --git a/core/src/banking_stage/forward_packet_batches_by_accounts.rs b/core/src/banking_stage/forward_packet_batches_by_accounts.rs index 54efb27ce56129..ab8f52820e8eac 100644 --- a/core/src/banking_stage/forward_packet_batches_by_accounts.rs +++ b/core/src/banking_stage/forward_packet_batches_by_accounts.rs @@ -6,7 +6,7 @@ use { cost_tracker::{CostTracker, CostTrackerError}, }, solana_perf::packet::Packet, - solana_sdk::{feature_set::FeatureSet, transaction::SanitizedTransaction}, + solana_sdk::{feature_set::FeatureSet, transaction::ExtendedSanitizedTransaction}, std::sync::Arc, }; @@ -58,11 +58,11 @@ impl ForwardBatch { fn try_add( &mut self, - sanitized_transaction: &SanitizedTransaction, + sanitized_transaction: &ExtendedSanitizedTransaction, immutable_packet: Arc, feature_set: &FeatureSet, ) -> Result { - let tx_cost = CostModel::calculate_cost(sanitized_transaction, feature_set); + let tx_cost = CostModel::calculate_cost(&sanitized_transaction.transaction, feature_set); let res = self.cost_tracker.try_add(&tx_cost); if res.is_ok() { self.forwardable_packets.push(immutable_packet); @@ -112,7 +112,7 @@ impl ForwardPacketBatchesByAccounts { /// packets are filled into first available 'batch' that have space to fit it. pub fn try_add_packet( &mut self, - sanitized_transaction: &SanitizedTransaction, + sanitized_transaction: &ExtendedSanitizedTransaction, immutable_packet: Arc, feature_set: &FeatureSet, ) -> bool { diff --git a/core/src/banking_stage/immutable_deserialized_packet.rs b/core/src/banking_stage/immutable_deserialized_packet.rs index 26ede7045d3480..6eb5d68ecaaca5 100644 --- a/core/src/banking_stage/immutable_deserialized_packet.rs +++ b/core/src/banking_stage/immutable_deserialized_packet.rs @@ -13,7 +13,7 @@ use { VersionedTransaction, }, }, - std::{cmp::Ordering, mem::size_of, sync::Arc}, + std::{cmp::Ordering, mem::size_of, sync::Arc, time::Instant}, thiserror::Error, }; @@ -41,10 +41,16 @@ pub struct ImmutableDeserializedPacket { message_hash: Hash, is_simple_vote: bool, compute_budget_details: ComputeBudgetDetails, + banking_stage_start_time: Option, } impl ImmutableDeserializedPacket { pub fn new(packet: Packet) -> Result { + let banking_stage_start_time = packet + .meta() + .is_perf_track_packet() + .then_some(Instant::now()); + let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?; let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?; let message_bytes = packet_message(&packet)?; @@ -67,6 +73,7 @@ impl ImmutableDeserializedPacket { message_hash, is_simple_vote, compute_budget_details, + banking_stage_start_time, }) } @@ -98,6 +105,10 @@ impl ImmutableDeserializedPacket { self.compute_budget_details.clone() } + pub fn start_time(&self) -> &Option { + &self.banking_stage_start_time + } + // This function deserializes packets into transactions, computes the blake3 hash of transaction // messages, and verifies secp256k1 instructions. pub fn build_sanitized_transaction( diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index a62e5bf9b3e455..fecec111bd660c 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -11,6 +11,7 @@ use { clock::{Slot, UnixTimestamp}, program_utils::limited_deserialize, pubkey::Pubkey, + transaction::ExtendedSanitizedTransaction, }, solana_vote_program::vote_instruction::VoteInstruction, std::{ @@ -286,7 +287,7 @@ impl LatestUnprocessedVotes { ) { if forward_packet_batches_by_accounts.try_add_packet( - &sanitized_vote_transaction, + &ExtendedSanitizedTransaction::from(sanitized_vote_transaction), deserialized_vote_packet, &bank.feature_set, ) { diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index 88ea6b5ee340cf..1c255ca019bfe7 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -936,6 +936,17 @@ impl LeaderSlotMetricsTracker { ); } } + + pub(crate) fn increment_process_sampled_packets_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + leader_slot_metrics + .timing_metrics + .process_packets_timings + .process_sampled_packets_us_hist + .increment(us) + .unwrap(); + } + } } #[cfg(test)] diff --git a/core/src/banking_stage/leader_slot_timing_metrics.rs b/core/src/banking_stage/leader_slot_timing_metrics.rs index 7727b6cf6c6563..34ce64b31c34f3 100644 --- a/core/src/banking_stage/leader_slot_timing_metrics.rs +++ b/core/src/banking_stage/leader_slot_timing_metrics.rs @@ -244,6 +244,9 @@ pub(crate) struct ProcessPacketsTimings { // Time spent running the cost model in processing transactions before executing // transactions pub cost_model_us: u64, + + // banking stage processing time histogram for sampled packets + pub process_sampled_packets_us_hist: histogram::Histogram, } impl ProcessPacketsTimings { @@ -264,6 +267,28 @@ impl ProcessPacketsTimings { i64 ), ("cost_model_us", self.cost_model_us, i64), + ( + "process_sampled_packets_us_90pct", + self.process_sampled_packets_us_hist + .percentile(90.0) + .unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_min", + self.process_sampled_packets_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_max", + self.process_sampled_packets_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_mean", + self.process_sampled_packets_us_hist.mean().unwrap_or(0), + i64 + ), ); } } diff --git a/core/src/banking_stage/qos_service.rs b/core/src/banking_stage/qos_service.rs index 77f05c73a3bc12..86d61ee2d19204 100644 --- a/core/src/banking_stage/qos_service.rs +++ b/core/src/banking_stage/qos_service.rs @@ -12,7 +12,7 @@ use { clock::Slot, feature_set::FeatureSet, saturating_add_assign, - transaction::{self, SanitizedTransaction, TransactionError}, + transaction::{self, ExtendedSanitizedTransaction, TransactionError}, }, std::sync::atomic::{AtomicU64, Ordering}, }; @@ -40,7 +40,7 @@ impl QosService { pub fn select_and_accumulate_transaction_costs( &self, bank: &Bank, - transactions: &[SanitizedTransaction], + transactions: &[ExtendedSanitizedTransaction], pre_results: impl Iterator>, ) -> (Vec>, usize) { let transaction_costs = @@ -67,13 +67,15 @@ impl QosService { fn compute_transaction_costs<'a>( &self, feature_set: &FeatureSet, - transactions: impl Iterator, + transactions: impl Iterator, pre_results: impl Iterator>, ) -> Vec> { let mut compute_cost_time = Measure::start("compute_cost_time"); let txs_costs: Vec<_> = transactions .zip(pre_results) - .map(|(tx, pre_result)| pre_result.map(|()| CostModel::calculate_cost(tx, feature_set))) + .map(|(tx, pre_result)| { + pre_result.map(|()| CostModel::calculate_cost(&tx.transaction, feature_set)) + }) .collect(); compute_cost_time.stop(); self.metrics @@ -92,7 +94,7 @@ impl QosService { /// and a count of the number of transactions that would fit in the block fn select_transactions_per_cost<'a>( &self, - transactions: impl Iterator, + transactions: impl Iterator, transactions_costs: impl Iterator>, bank: &Bank, ) -> (Vec>, usize) { diff --git a/core/src/banking_stage/scheduler_messages.rs b/core/src/banking_stage/scheduler_messages.rs index 172087e2cf8e82..359f4cac710b71 100644 --- a/core/src/banking_stage/scheduler_messages.rs +++ b/core/src/banking_stage/scheduler_messages.rs @@ -1,6 +1,6 @@ use { super::immutable_deserialized_packet::ImmutableDeserializedPacket, - solana_sdk::{clock::Slot, transaction::SanitizedTransaction}, + solana_sdk::{clock::Slot, transaction::ExtendedSanitizedTransaction}, std::{fmt::Display, sync::Arc}, }; @@ -41,7 +41,7 @@ impl Display for TransactionId { pub struct ConsumeWork { pub batch_id: TransactionBatchId, pub ids: Vec, - pub transactions: Vec, + pub transactions: Vec, pub max_age_slots: Vec, } diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index d983fcf4d163c3..95408800852b84 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -18,7 +18,7 @@ use { solana_measure::measure_us, solana_sdk::{ pubkey::Pubkey, saturating_add_assign, slot_history::Slot, - transaction::SanitizedTransaction, + transaction::ExtendedSanitizedTransaction, }, }; @@ -64,8 +64,8 @@ impl PrioGraphScheduler { pub(crate) fn schedule( &mut self, container: &mut TransactionStateContainer, - pre_graph_filter: impl Fn(&[&SanitizedTransaction], &mut [bool]), - pre_lock_filter: impl Fn(&SanitizedTransaction) -> bool, + pre_graph_filter: impl Fn(&[&ExtendedSanitizedTransaction], &mut [bool]), + pre_lock_filter: impl Fn(&ExtendedSanitizedTransaction) -> bool, ) -> Result { let num_threads = self.consume_work_senders.len(); let mut batches = Batches::new(num_threads); @@ -161,15 +161,15 @@ impl PrioGraphScheduler { } // Check if this transaction conflicts with any blocked transactions - if !blocking_locks.check_locks(transaction.message()) { - blocking_locks.take_locks(transaction.message()); + if !blocking_locks.check_locks(transaction.transaction.message()) { + blocking_locks.take_locks(transaction.transaction.message()); unschedulable_ids.push(id); saturating_add_assign!(num_unschedulable, 1); continue; } // Schedule the transaction if it can be. - let transaction_locks = transaction.get_account_locks_unchecked(); + let transaction_locks = transaction.transaction.get_account_locks_unchecked(); let Some(thread_id) = self.account_locks.try_lock_accounts( transaction_locks.writable.into_iter(), transaction_locks.readonly.into_iter(), @@ -182,7 +182,7 @@ impl PrioGraphScheduler { ) }, ) else { - blocking_locks.take_locks(transaction.message()); + blocking_locks.take_locks(transaction.transaction.message()); unschedulable_ids.push(id); saturating_add_assign!(num_unschedulable, 1); continue; @@ -329,11 +329,11 @@ impl PrioGraphScheduler { fn complete_batch( &mut self, batch_id: TransactionBatchId, - transactions: &[SanitizedTransaction], + transactions: &[ExtendedSanitizedTransaction], ) { let thread_id = self.in_flight_tracker.complete_batch(batch_id); for transaction in transactions { - let account_locks = transaction.get_account_locks_unchecked(); + let account_locks = transaction.transaction.get_account_locks_unchecked(); self.account_locks.unlock_accounts( account_locks.writable.into_iter(), account_locks.readonly.into_iter(), @@ -392,7 +392,7 @@ impl PrioGraphScheduler { /// on `ThreadAwareAccountLocks::try_lock_accounts`. fn select_thread( thread_set: ThreadSet, - batches_per_thread: &[Vec], + batches_per_thread: &[Vec], in_flight_per_thread: &[usize], ) -> ThreadId { thread_set @@ -412,7 +412,7 @@ impl PrioGraphScheduler { fn get_transaction_account_access( transaction: &SanitizedTransactionTTL, ) -> impl Iterator + '_ { - let message = transaction.transaction.message(); + let message = transaction.transaction.transaction.message(); message .account_keys() .iter() @@ -442,7 +442,7 @@ pub(crate) struct SchedulingSummary { struct Batches { ids: Vec>, - transactions: Vec>, + transactions: Vec>, max_age_slots: Vec>, total_cus: Vec, } @@ -462,7 +462,7 @@ impl Batches { thread_id: ThreadId, ) -> ( Vec, - Vec, + Vec, Vec, u64, ) { diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 12e8f7bf8bf0bf..600c7ef6c3f3a3 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -31,7 +31,7 @@ use { }, fee::FeeBudgetLimits, saturating_add_assign, - transaction::SanitizedTransaction, + transaction::{ExtendedSanitizedTransaction, SanitizedTransaction}, }, solana_svm::transaction_error_metrics::TransactionErrorMetrics, std::{ @@ -189,7 +189,11 @@ impl SchedulerController { Ok(()) } - fn pre_graph_filter(transactions: &[&SanitizedTransaction], results: &mut [bool], bank: &Bank) { + fn pre_graph_filter( + transactions: &[&ExtendedSanitizedTransaction], + results: &mut [bool], + bank: &Bank, + ) { let lock_results = vec![Ok(()); transactions.len()]; let mut error_counters = TransactionErrorMetrics::default(); let check_results = bank.check_transactions( @@ -204,7 +208,11 @@ impl SchedulerController { .zip(transactions) .map(|((result, _nonce, _lamports), tx)| { result?; // if there's already error do nothing - Consumer::check_fee_payer_unlocked(bank, tx.message(), &mut error_counters) + Consumer::check_fee_payer_unlocked( + bank, + tx.transaction.message(), + &mut error_counters, + ) }) .collect(); @@ -387,7 +395,12 @@ impl SchedulerController { }) .filter_map(|tx| { process_compute_budget_instructions(tx.message().program_instructions_iter()) - .map(|compute_budget| (tx, compute_budget.into())) + .map(|compute_budget| { + ( + ExtendedSanitizedTransaction::from(tx), + compute_budget.into(), + ) + }) .ok() }) .unzip(); @@ -478,13 +491,13 @@ impl SchedulerController { /// Any difference in the prioritization is negligible for /// the current transaction costs. fn calculate_priority_and_cost( - transaction: &SanitizedTransaction, + transaction: &ExtendedSanitizedTransaction, fee_budget_limits: &FeeBudgetLimits, bank: &Bank, ) -> (u64, u64) { - let cost = CostModel::calculate_cost(transaction, &bank.feature_set).sum(); + let cost = CostModel::calculate_cost(&transaction.transaction, &bank.feature_set).sum(); let fee = bank.fee_structure.calculate_fee( - transaction.message(), + transaction.transaction.message(), 5_000, // this just needs to be non-zero fee_budget_limits, bank.feature_set diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state.rs b/core/src/banking_stage/transaction_scheduler/transaction_state.rs index 727140545ab656..7ae094258a9bad 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state.rs @@ -1,8 +1,8 @@ -use solana_sdk::{clock::Slot, transaction::SanitizedTransaction}; +use solana_sdk::{clock::Slot, transaction::ExtendedSanitizedTransaction}; /// Simple wrapper type to tie a sanitized transaction to max age slot. pub(crate) struct SanitizedTransactionTTL { - pub(crate) transaction: SanitizedTransaction, + pub(crate) transaction: ExtendedSanitizedTransaction, pub(crate) max_age_slot: Slot, } diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index fcc68050b72d4c..7c50e045b89f62 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -20,8 +20,11 @@ use { solana_measure::{measure, measure_us}, solana_runtime::bank::Bank, solana_sdk::{ - clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, feature_set::FeatureSet, hash::Hash, - saturating_add_assign, transaction::SanitizedTransaction, + clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, + feature_set::FeatureSet, + hash::Hash, + saturating_add_assign, + transaction::{ExtendedSanitizedTransaction, SanitizedTransaction}, }, solana_svm::transaction_error_metrics::TransactionErrorMetrics, std::{ @@ -135,7 +138,7 @@ fn filter_processed_packets<'a, F>( pub struct ConsumeScannerPayload<'a> { pub reached_end_of_slot: bool, pub account_locks: ReadWriteAccountSet, - pub sanitized_transactions: Vec, + pub sanitized_transactions: Vec, pub slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, pub message_hash_to_transaction: &'a mut HashMap, pub error_counters: TransactionErrorMetrics, @@ -208,7 +211,9 @@ fn consume_scan_should_process_packet( return ProcessingDecision::Later; } - payload.sanitized_transactions.push(sanitized_transaction); + payload + .sanitized_transactions + .push(ExtendedSanitizedTransaction::from(sanitized_transaction)); ProcessingDecision::Now } else { payload @@ -635,7 +640,7 @@ impl ThreadLocalUnprocessedPackets { (sanitized_transactions, transaction_to_packet_indexes), packet_conversion_time, ): ( - (Vec, Vec), + (Vec, Vec), _, ) = measure!( self.sanitize_unforwarded_packets( @@ -762,18 +767,25 @@ impl ThreadLocalUnprocessedPackets { packets_to_process: &[Arc], bank: &Bank, total_dropped_packets: &mut usize, - ) -> (Vec, Vec) { + ) -> (Vec, Vec) { // Get ref of ImmutableDeserializedPacket let deserialized_packets = packets_to_process.iter().map(|p| &**p); - let (transactions, transaction_to_packet_indexes): (Vec, Vec) = - deserialized_packets - .enumerate() - .filter_map(|(packet_index, deserialized_packet)| { - deserialized_packet - .build_sanitized_transaction(&bank.feature_set, bank.vote_only_bank(), bank) - .map(|transaction| (transaction, packet_index)) - }) - .unzip(); + let (transactions, transaction_to_packet_indexes): ( + Vec, + Vec, + ) = deserialized_packets + .enumerate() + .filter_map(|(packet_index, deserialized_packet)| { + deserialized_packet + .build_sanitized_transaction(&bank.feature_set, bank.vote_only_bank(), bank) + .map(|transaction| { + ( + ExtendedSanitizedTransaction::from(transaction), + packet_index, + ) + }) + }) + .unzip(); let filtered_count = packets_to_process.len().saturating_sub(transactions.len()); saturating_add_assign!(*total_dropped_packets, filtered_count); @@ -783,7 +795,7 @@ impl ThreadLocalUnprocessedPackets { /// Checks sanitized transactions against bank, returns valid transaction indexes fn filter_invalid_transactions( - transactions: &[SanitizedTransaction], + transactions: &[ExtendedSanitizedTransaction], bank: &Bank, total_dropped_packets: &mut usize, ) -> Vec { @@ -821,7 +833,7 @@ impl ThreadLocalUnprocessedPackets { fn add_filtered_packets_to_forward_buffer( forward_buffer: &mut ForwardPacketBatchesByAccounts, packets_to_process: &[Arc], - transactions: &[SanitizedTransaction], + transactions: &[ExtendedSanitizedTransaction], transaction_to_packet_indexes: &[usize], forwardable_transaction_indexes: &[usize], total_dropped_packets: &mut usize, @@ -924,6 +936,7 @@ impl ThreadLocalUnprocessedPackets { .iter() .map(|p| (*p).clone()) .collect_vec(); + let retryable_packets = if let Some(retryable_transaction_indexes) = processing_function(&packets_to_process, payload) { diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index e5e06a3bc701c9..f41d2b1d192f16 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -18,8 +18,9 @@ use { count_discarded_packets, count_packets_in_batches, count_valid_packets, shrink_batches, }, }, - solana_sdk::timing, + solana_sdk::{signature::Signature, timing}, solana_streamer::streamer::{self, StreamerError}, + solana_transaction_metrics_tracker::get_signature_from_packet, std::{ thread::{self, Builder, JoinHandle}, time::Instant, @@ -78,8 +79,9 @@ struct SigVerifierStats { verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch discard_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch dedup_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch - batches_hist: histogram::Histogram, // number of packet batches per verify call - packets_hist: histogram::Histogram, // number of packets per verify call + process_sampled_packets_us_hist: histogram::Histogram, // per-packet time do do overall verify for sampled packets + batches_hist: histogram::Histogram, // number of packet batches per verify call + packets_hist: histogram::Histogram, // number of packets per verify call num_deduper_saturations: usize, total_batches: usize, total_packets: usize, @@ -93,6 +95,7 @@ struct SigVerifierStats { total_discard_random_time_us: usize, total_verify_time_us: usize, total_shrink_time_us: usize, + perf_track_overhead_us: usize, } impl SigVerifierStats { @@ -181,6 +184,28 @@ impl SigVerifierStats { self.dedup_packets_pp_us_hist.mean().unwrap_or(0), i64 ), + ( + "process_sampled_packets_us_90pct", + self.process_sampled_packets_us_hist + .percentile(90.0) + .unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_min", + self.process_sampled_packets_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_max", + self.process_sampled_packets_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_mean", + self.process_sampled_packets_us_hist.mean().unwrap_or(0), + i64 + ), ( "batches_90pct", self.batches_hist.percentile(90.0).unwrap_or(0), @@ -214,6 +239,7 @@ impl SigVerifierStats { ), ("total_verify_time_us", self.total_verify_time_us, i64), ("total_shrink_time_us", self.total_shrink_time_us, i64), + ("perf_track_overhead_us", self.perf_track_overhead_us, i64), ); } } @@ -296,8 +322,26 @@ impl SigVerifyStage { verifier: &mut T, stats: &mut SigVerifierStats, ) -> Result<(), T::SendType> { + let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default(); + let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; + let mut start_perf_track_measure = Measure::start("start_perf_track"); + // track sigverify start time for interested packets + for batch in &batches { + for packet in batch.iter() { + if packet.meta().is_perf_track_packet() { + let signature = get_signature_from_packet(packet); + if let Ok(signature) = signature { + packet_perf_measure.push((*signature, Instant::now())); + } + } + } + } + start_perf_track_measure.stop(); + + stats.perf_track_overhead_us = start_perf_track_measure.as_us() as usize; + let batches_len = batches.len(); debug!( "@{:?} verifier: verifying: {}", @@ -370,6 +414,22 @@ impl SigVerifyStage { (num_packets as f32 / verify_time.as_s()) ); + let mut perf_track_end_measure = Measure::start("perf_track_end"); + for (signature, start_time) in packet_perf_measure.iter() { + let duration = Instant::now().duration_since(*start_time); + debug!( + "Sigverify took {duration:?} for transaction {:?}", + Signature::from(*signature) + ); + stats + .process_sampled_packets_us_hist + .increment(duration.as_micros() as u64) + .unwrap(); + } + + perf_track_end_measure.stop(); + stats.perf_track_overhead_us += perf_track_end_measure.as_us() as usize; + stats .recv_batches_us_hist .increment(recv_duration.as_micros() as u64) diff --git a/entry/src/entry.rs b/entry/src/entry.rs index af3fdca9518e83..0029148dcee18a 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -27,7 +27,7 @@ use { packet::Meta, timing, transaction::{ - Result, SanitizedTransaction, Transaction, TransactionError, + ExtendedSanitizedTransaction, Result, Transaction, TransactionError, TransactionVerificationMode, VersionedTransaction, }, }, @@ -163,7 +163,7 @@ impl From<&Entry> for EntrySummary { /// Typed entry to distinguish between transaction and tick entries pub enum EntryType { - Transactions(Vec), + Transactions(Vec), Tick(Hash), } @@ -405,7 +405,7 @@ impl EntryVerificationState { pub fn verify_transactions( entries: Vec, - verify: Arc Result + Send + Sync>, + verify: Arc Result + Send + Sync>, ) -> Result> { PAR_THREAD_POOL.install(|| { entries @@ -432,7 +432,10 @@ pub fn start_verify_transactions( skip_verification: bool, verify_recyclers: VerifyRecyclers, verify: Arc< - dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + dyn Fn( + VersionedTransaction, + TransactionVerificationMode, + ) -> Result + Send + Sync, >, @@ -469,7 +472,10 @@ fn start_verify_transactions_cpu( entries: Vec, skip_verification: bool, verify: Arc< - dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + dyn Fn( + VersionedTransaction, + TransactionVerificationMode, + ) -> Result + Send + Sync, >, @@ -498,13 +504,16 @@ fn start_verify_transactions_gpu( entries: Vec, verify_recyclers: VerifyRecyclers, verify: Arc< - dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + dyn Fn( + VersionedTransaction, + TransactionVerificationMode, + ) -> Result + Send + Sync, >, ) -> Result { let verify_func = { - move |versioned_tx: VersionedTransaction| -> Result { + move |versioned_tx: VersionedTransaction| -> Result { verify( versioned_tx, TransactionVerificationMode::HashAndVerifyPrecompiles, @@ -514,7 +523,7 @@ fn start_verify_transactions_gpu( let entries = verify_transactions(entries, Arc::new(verify_func))?; - let entry_txs: Vec<&SanitizedTransaction> = entries + let entry_txs: Vec<&ExtendedSanitizedTransaction> = entries .iter() .filter_map(|entry_type| match entry_type { EntryType::Tick(_) => None, @@ -552,7 +561,7 @@ fn start_verify_transactions_gpu( } let entry_tx_iter = slice .into_par_iter() - .map(|tx| tx.to_versioned_transaction()); + .map(|tx| tx.transaction.to_versioned_transaction()); let res = packet_batch .par_iter_mut() diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 867761639d95d3..0c3298abc37c76 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3130,7 +3130,7 @@ impl Blockstore { // Attempt to verify transaction and load addresses from the current bank, // or manually scan the transaction for addresses if the transaction. if let Ok(tx) = bank.fully_verify_transaction(tx.clone()) { - add_to_set(&result, tx.message().account_keys().iter()); + add_to_set(&result, tx.transaction.message().account_keys().iter()); } else { add_to_set(&result, tx.message.static_account_keys()); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 2e172870d6e5f7..9f7a95cd85d825 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -50,7 +50,7 @@ use { signature::{Keypair, Signature}, timing, transaction::{ - Result, SanitizedTransaction, TransactionError, TransactionVerificationMode, + ExtendedSanitizedTransaction, Result, TransactionError, TransactionVerificationMode, VersionedTransaction, }, }, @@ -117,7 +117,7 @@ fn get_first_error( { if let Err(ref err) = result { if first_err.is_none() { - first_err = Some((result.clone(), *transaction.signature())); + first_err = Some((result.clone(), *transaction.transaction.signature())); } warn!( "Unexpected validator error: {:?}, transaction: {:?}", @@ -378,7 +378,7 @@ fn schedule_batches_for_execution( fn rebatch_transactions<'a>( lock_results: &'a [Result<()>], bank: &'a Arc, - sanitized_txs: &'a [SanitizedTransaction], + sanitized_txs: &'a [ExtendedSanitizedTransaction], start: usize, end: usize, transaction_indexes: &'a [usize], @@ -426,7 +426,7 @@ fn rebatch_and_execute_batches( let tx_costs = sanitized_txs .iter() .map(|tx| { - let tx_cost = CostModel::calculate_cost(tx, &bank.feature_set); + let tx_cost = CostModel::calculate_cost(&tx.transaction, &bank.feature_set); let cost = tx_cost.sum(); minimal_tx_cost = std::cmp::min(minimal_tx_cost, cost); total_cost = total_cost.saturating_add(cost); @@ -507,7 +507,7 @@ pub fn process_entries_for_tests( ) -> Result<()> { let verify_transaction = { let bank = bank.clone_with_scheduler(); - move |versioned_tx: VersionedTransaction| -> Result { + move |versioned_tx: VersionedTransaction| -> Result { bank.verify_transaction(versioned_tx, TransactionVerificationMode::FullVerification) } }; @@ -1286,7 +1286,7 @@ fn confirm_slot_entries( let bank = bank.clone_with_scheduler(); move |versioned_tx: VersionedTransaction, verification_mode: TransactionVerificationMode| - -> Result { + -> Result { bank.verify_transaction(versioned_tx, verification_mode) } }; @@ -1833,7 +1833,7 @@ pub enum TransactionStatusMessage { pub struct TransactionStatusBatch { pub bank: Arc, - pub transactions: Vec, + pub transactions: Vec, pub execution_results: Vec>, pub balances: TransactionBalancesSet, pub token_balances: TransactionTokenBalancesSet, @@ -1850,7 +1850,7 @@ impl TransactionStatusSender { pub fn send_transaction_status_batch( &self, bank: Arc, - transactions: Vec, + transactions: Vec, execution_results: Vec, balances: TransactionBalancesSet, token_balances: TransactionTokenBalancesSet, diff --git a/ledger/src/token_balances.rs b/ledger/src/token_balances.rs index 204bd4335972aa..ff133127a14b4a 100644 --- a/ledger/src/token_balances.rs +++ b/ledger/src/token_balances.rs @@ -43,13 +43,15 @@ pub fn collect_token_balances( let mut collect_time = Measure::start("collect_token_balances"); for transaction in batch.sanitized_transactions() { - let account_keys = transaction.message().account_keys(); + let account_keys = transaction.transaction.message().account_keys(); let has_token_program = account_keys.iter().any(is_known_spl_token_id); let mut transaction_balances: Vec = vec![]; if has_token_program { for (index, account_id) in account_keys.iter().enumerate() { - if transaction.message().is_invoked(index) || is_known_spl_token_id(account_id) { + if transaction.transaction.message().is_invoked(index) + || is_known_spl_token_id(account_id) + { continue; } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 2829cf27b6da6f..3b7807c777ba80 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -4908,6 +4908,7 @@ dependencies = [ "solana-streamer", "solana-svm", "solana-tpu-client", + "solana-transaction-metrics-tracker", "solana-transaction-status", "solana-turbine", "solana-unified-scheduler-pool", @@ -6264,9 +6265,11 @@ dependencies = [ "quinn-proto", "rand 0.8.5", "rustls", + "solana-measure", "solana-metrics", "solana-perf", "solana-sdk", + "solana-transaction-metrics-tracker", "thiserror", "tokio", "x509-parser", @@ -6369,6 +6372,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-transaction-metrics-tracker" +version = "1.19.0" +dependencies = [ + "Inflector", + "base64 0.21.7", + "bincode", + "lazy_static", + "log", + "rand 0.8.5", + "solana-perf", + "solana-sdk", +] + [[package]] name = "solana-transaction-status" version = "1.19.0" diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index caeb0953109fbb..7f2fb064dafc5e 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -3280,7 +3280,10 @@ pub mod rpc_accounts_scan { pub mod rpc_full { use { super::*, - solana_sdk::message::{SanitizedVersionedMessage, VersionedMessage}, + solana_sdk::{ + message::{SanitizedVersionedMessage, VersionedMessage}, + transaction::ExtendedSanitizedTransaction, + }, solana_transaction_status::UiInnerInstructions, }; #[rpc] @@ -3693,7 +3696,8 @@ pub mod rpc_full { units_consumed, return_data, inner_instructions: _, // Always `None` due to `enable_cpi_recording = false` - } = preflight_bank.simulate_transaction(&transaction, false) + } = preflight_bank + .simulate_transaction(&ExtendedSanitizedTransaction::from(transaction), false) { match err { TransactionError::BlockhashNotFound => { @@ -3769,8 +3773,9 @@ pub mod rpc_full { } let transaction = sanitize_transaction(unsanitized_tx, bank)?; + let transaction = ExtendedSanitizedTransaction::from(transaction); if sig_verify { - verify_transaction(&transaction, &bank.feature_set)?; + verify_transaction(&transaction.transaction, &bank.feature_set)?; } let TransactionSimulationResult { @@ -3782,7 +3787,7 @@ pub mod rpc_full { inner_instructions, } = bank.simulate_transaction(&transaction, enable_cpi_recording); - let account_keys = transaction.message().account_keys(); + let account_keys = transaction.transaction.message().account_keys(); let number_of_accounts = account_keys.len(); let accounts = if let Some(config_accounts) = config_accounts { diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index 8730fb2ed0f3d8..29a20542379ba3 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -110,15 +110,16 @@ impl TransactionStatusService { } Some(DurableNonceFee::Invalid) => None, None => bank.get_lamports_per_signature_for_blockhash( - transaction.message().recent_blockhash(), + transaction.transaction.message().recent_blockhash(), ), } .expect("lamports_per_signature must be available"); let fee = bank.get_fee_for_message_with_lamports_per_signature( - transaction.message(), + transaction.transaction.message(), lamports_per_signature, ); - let tx_account_locks = transaction.get_account_locks_unchecked(); + let tx_account_locks = + transaction.transaction.get_account_locks_unchecked(); let inner_instructions = inner_instructions.map(|inner_instructions| { map_inner_instructions(inner_instructions).collect() @@ -138,7 +139,7 @@ impl TransactionStatusService { }) .collect(), ); - let loaded_addresses = transaction.get_loaded_addresses(); + let loaded_addresses = transaction.transaction.get_loaded_addresses(); let mut transaction_status_meta = TransactionStatusMeta { status, fee, @@ -158,9 +159,9 @@ impl TransactionStatusService { transaction_notifier.notify_transaction( slot, transaction_index, - transaction.signature(), + transaction.transaction.signature(), &transaction_status_meta, - &transaction, + &transaction.transaction, ); } @@ -172,16 +173,22 @@ impl TransactionStatusService { } if enable_rpc_transaction_history { - if let Some(memos) = extract_and_fmt_memos(transaction.message()) { + if let Some(memos) = + extract_and_fmt_memos(transaction.transaction.message()) + { blockstore - .write_transaction_memos(transaction.signature(), slot, memos) + .write_transaction_memos( + transaction.transaction.signature(), + slot, + memos, + ) .expect("Expect database write to succeed: TransactionMemos"); } blockstore .write_transaction_status( slot, - *transaction.signature(), + *transaction.transaction.signature(), tx_account_locks.writable, tx_account_locks.readonly, transaction_status_meta, diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 3ea316f857a2bc..684cdb67daaeae 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -151,8 +151,9 @@ use { sysvar::{self, last_restart_slot::LastRestartSlot, Sysvar, SysvarId}, timing::years_as_slots, transaction::{ - self, MessageHash, Result, SanitizedTransaction, Transaction, TransactionError, - TransactionVerificationMode, VersionedTransaction, MAX_TX_ACCOUNT_LOCKS, + self, ExtendedSanitizedTransaction, MessageHash, Result, SanitizedTransaction, + Transaction, TransactionError, TransactionVerificationMode, VersionedTransaction, + MAX_TX_ACCOUNT_LOCKS, }, transaction_context::{TransactionAccount, TransactionReturnData}, }, @@ -4077,7 +4078,7 @@ impl Bank { fn update_transaction_statuses( &self, - sanitized_txs: &[SanitizedTransaction], + sanitized_txs: &[ExtendedSanitizedTransaction], execution_results: &[TransactionExecutionResult], ) { let mut status_cache = self.status_cache.write().unwrap(); @@ -4087,8 +4088,8 @@ impl Bank { // Add the message hash to the status cache to ensure that this message // won't be processed again with a different signature. status_cache.insert( - tx.message().recent_blockhash(), - tx.message_hash(), + tx.transaction.message().recent_blockhash(), + tx.transaction.message_hash(), self.slot(), details.status.clone(), ); @@ -4096,8 +4097,8 @@ impl Bank { // can be queried by transaction signature over RPC. In the future, this should // only be added for API nodes because voting validators don't need to do this. status_cache.insert( - tx.message().recent_blockhash(), - tx.signature(), + tx.transaction.message().recent_blockhash(), + tx.transaction.signature(), self.slot(), details.status.clone(), ); @@ -4198,7 +4199,14 @@ impl Bank { pub fn prepare_entry_batch(&self, txs: Vec) -> Result { let sanitized_txs = txs .into_iter() - .map(|tx| SanitizedTransaction::try_create(tx, MessageHash::Compute, None, self)) + .map(|tx| { + SanitizedTransaction::try_create(tx, MessageHash::Compute, None, self).map(|txn| { + ExtendedSanitizedTransaction { + transaction: txn, + start_time: None, + } + }) + }) .collect::>>()?; let tx_account_lock_limit = self.get_transaction_account_lock_limit(); let lock_results = self @@ -4215,7 +4223,7 @@ impl Bank { /// Prepare a locked transaction batch from a list of sanitized transactions. pub fn prepare_sanitized_batch<'a, 'b>( &'a self, - txs: &'b [SanitizedTransaction], + txs: &'b [ExtendedSanitizedTransaction], ) -> TransactionBatch<'a, 'b> { let tx_account_lock_limit = self.get_transaction_account_lock_limit(); let lock_results = self @@ -4229,7 +4237,7 @@ impl Bank { /// limited packing status pub fn prepare_sanitized_batch_with_results<'a, 'b>( &'a self, - transactions: &'b [SanitizedTransaction], + transactions: &'b [ExtendedSanitizedTransaction], transaction_results: impl Iterator>, ) -> TransactionBatch<'a, 'b> { // this lock_results could be: Ok, AccountInUse, WouldExceedBlockMaxLimit or WouldExceedAccountMaxLimit @@ -4245,10 +4253,11 @@ impl Bank { /// Prepare a transaction batch from a single transaction without locking accounts pub fn prepare_unlocked_batch_from_single_tx<'a>( &'a self, - transaction: &'a SanitizedTransaction, + transaction: &'a ExtendedSanitizedTransaction, ) -> TransactionBatch<'_, '_> { let tx_account_lock_limit = self.get_transaction_account_lock_limit(); let lock_result = transaction + .transaction .get_account_locks(tx_account_lock_limit) .map(|_| ()); let mut batch = TransactionBatch::new( @@ -4263,7 +4272,7 @@ impl Bank { /// Run transactions against a frozen bank without committing the results pub fn simulate_transaction( &self, - transaction: &SanitizedTransaction, + transaction: &ExtendedSanitizedTransaction, enable_cpi_recording: bool, ) -> TransactionSimulationResult { assert!(self.is_frozen(), "simulation bank must be frozen"); @@ -4275,10 +4284,10 @@ impl Bank { /// is frozen, enabling use in single-Bank test frameworks pub fn simulate_transaction_unchecked( &self, - transaction: &SanitizedTransaction, + transaction: &ExtendedSanitizedTransaction, enable_cpi_recording: bool, ) -> TransactionSimulationResult { - let account_keys = transaction.message().account_keys(); + let account_keys = transaction.transaction.message().account_keys(); let number_of_accounts = account_keys.len(); let account_overrides = self.get_account_overrides_for_simulation(&account_keys); let batch = self.prepare_unlocked_batch_from_single_tx(transaction); @@ -4391,7 +4400,7 @@ impl Bank { fn check_age( &self, - sanitized_txs: &[impl core::borrow::Borrow], + sanitized_txs: &[impl core::borrow::Borrow], lock_results: &[Result<()>], max_age: usize, error_counters: &mut TransactionErrorMetrics, @@ -4405,7 +4414,7 @@ impl Bank { .zip(lock_results) .map(|(tx, lock_res)| match lock_res { Ok(()) => self.check_transaction_age( - tx.borrow(), + &tx.borrow().transaction, max_age, &next_durable_nonce, &hash_queue, @@ -4457,7 +4466,7 @@ impl Bank { fn check_status_cache( &self, - sanitized_txs: &[impl core::borrow::Borrow], + sanitized_txs: &[impl core::borrow::Borrow], lock_results: Vec, error_counters: &mut TransactionErrorMetrics, ) -> Vec { @@ -4468,7 +4477,7 @@ impl Bank { .map(|(sanitized_tx, (lock_result, nonce, lamports))| { let sanitized_tx = sanitized_tx.borrow(); if lock_result.is_ok() - && self.is_transaction_already_processed(sanitized_tx, &rcache) + && self.is_transaction_already_processed(&sanitized_tx.transaction, &rcache) { error_counters.already_processed += 1; return (Err(TransactionError::AlreadyProcessed), None, None); @@ -4521,7 +4530,7 @@ impl Bank { pub fn check_transactions( &self, - sanitized_txs: &[impl core::borrow::Borrow], + sanitized_txs: &[impl core::borrow::Borrow], lock_results: &[Result<()>], max_age: usize, error_counters: &mut TransactionErrorMetrics, @@ -4534,7 +4543,7 @@ impl Bank { let mut balances: TransactionBalances = vec![]; for transaction in batch.sanitized_transactions() { let mut transaction_balances: Vec = vec![]; - for account_key in transaction.message().account_keys().iter() { + for account_key in transaction.transaction.message().account_keys().iter() { transaction_balances.push(self.get_balance(account_key)); } balances.push(transaction_balances); @@ -4635,7 +4644,7 @@ impl Bank { let mut collect_logs_time = Measure::start("collect_logs_time"); for (execution_result, tx) in sanitized_output.execution_results.iter().zip(sanitized_txs) { if let Some(debug_keys) = &self.transaction_debug_keys { - for key in tx.message().account_keys().iter() { + for key in tx.transaction.message().account_keys().iter() { if debug_keys.contains(key) { let result = execution_result.flattened_result(); info!("slot: {} result: {:?} tx: {:?}", self.slot, result, tx); @@ -4644,7 +4653,7 @@ impl Bank { } } - let is_vote = tx.is_simple_vote_transaction(); + let is_vote = tx.transaction.is_simple_vote_transaction(); if execution_result.was_executed() // Skip log collection for unprocessed transactions && transaction_log_collector_config.filter != TransactionLogCollectorFilter::None @@ -4654,7 +4663,7 @@ impl Bank { .mentioned_addresses .is_empty() { - for key in tx.message().account_keys().iter() { + for key in tx.transaction.message().account_keys().iter() { if transaction_log_collector_config .mentioned_addresses .contains(key) @@ -4687,7 +4696,7 @@ impl Bank { let transaction_log_index = transaction_log_collector.logs.len(); transaction_log_collector.logs.push(TransactionLogInfo { - signature: *tx.signature(), + signature: *tx.transaction.signature(), result: status.clone(), is_vote, log_messages: log_messages.clone(), @@ -4707,7 +4716,8 @@ impl Bank { // Signature count must be accumulated only if the transaction // is executed, otherwise a mismatched count between banking and // replay could occur - signature_count += u64::from(tx.message().header().num_required_signatures); + signature_count += + u64::from(tx.transaction.message().header().num_required_signatures); executed_transactions_count += 1; } @@ -4817,7 +4827,7 @@ impl Bank { fn filter_program_errors_and_collect_fee( &self, - txs: &[SanitizedTransaction], + txs: &[ExtendedSanitizedTransaction], execution_results: &[TransactionExecutionResult], ) -> Vec> { let hash_queue = self.blockhash_queue.read().unwrap(); @@ -4839,7 +4849,9 @@ impl Bank { .map(|maybe_lamports_per_signature| (maybe_lamports_per_signature, true)) .unwrap_or_else(|| { ( - hash_queue.get_lamports_per_signature(tx.message().recent_blockhash()), + hash_queue.get_lamports_per_signature( + tx.transaction.message().recent_blockhash(), + ), false, ) }); @@ -4847,7 +4859,7 @@ impl Bank { let lamports_per_signature = lamports_per_signature.ok_or(TransactionError::BlockhashNotFound)?; let fee = self.get_fee_for_message_with_lamports_per_signature( - tx.message(), + tx.transaction.message(), lamports_per_signature, ); @@ -4859,7 +4871,7 @@ impl Bank { // post-load, fee deducted, pre-execute account state // stored if execution_status.is_err() && !is_nonce { - self.withdraw(tx.message().fee_payer(), fee)?; + self.withdraw(tx.transaction.message().fee_payer(), fee)?; } fees += fee; @@ -4877,7 +4889,7 @@ impl Bank { /// a failure result. pub fn commit_transactions( &self, - sanitized_txs: &[SanitizedTransaction], + sanitized_txs: &[ExtendedSanitizedTransaction], loaded_txs: &mut [TransactionLoadResult], execution_results: Vec, last_blockhash: Hash, @@ -6550,7 +6562,7 @@ impl Bank { &self, tx: VersionedTransaction, verification_mode: TransactionVerificationMode, - ) -> Result { + ) -> Result { let sanitized_tx = { let size = bincode::serialized_size(&tx).map_err(|_| TransactionError::SanitizeFailure)?; @@ -6573,13 +6585,13 @@ impl Bank { sanitized_tx.verify_precompiles(&self.feature_set)?; } - Ok(sanitized_tx) + Ok(ExtendedSanitizedTransaction::from(sanitized_tx)) } pub fn fully_verify_transaction( &self, tx: VersionedTransaction, - ) -> Result { + ) -> Result { self.verify_transaction(tx, TransactionVerificationMode::FullVerification) } @@ -6921,7 +6933,7 @@ impl Bank { /// a bank-level cache of vote accounts and stake delegation info fn update_stakes_cache( &self, - txs: &[SanitizedTransaction], + txs: &[ExtendedSanitizedTransaction], execution_results: &[TransactionExecutionResult], loaded_txs: &[TransactionLoadResult], ) { @@ -6932,7 +6944,7 @@ impl Bank { .filter(|(_, execution_result, _)| execution_result.was_executed_successfully()) .flat_map(|(tx, _, (load_result, _))| { load_result.iter().flat_map(|loaded_transaction| { - let num_account_keys = tx.message().account_keys().len(); + let num_account_keys = tx.transaction.message().account_keys().len(); loaded_transaction.accounts.iter().take(num_account_keys) }) }) @@ -7454,7 +7466,7 @@ impl Bank { /// Checks a batch of sanitized transactions again bank for age and status pub fn check_transactions_with_forwarding_delay( &self, - transactions: &[SanitizedTransaction], + transactions: &[ExtendedSanitizedTransaction], filter: &[transaction::Result<()>], forward_transactions_to_leader_at_slot_offset: u64, ) -> Vec { @@ -7677,7 +7689,10 @@ impl Bank { let transaction_account_lock_limit = self.get_transaction_account_lock_limit(); let sanitized_txs = txs .into_iter() - .map(SanitizedTransaction::from_transaction_for_tests) + .map(|txn| ExtendedSanitizedTransaction { + transaction: SanitizedTransaction::from_transaction_for_tests(txn), + start_time: None, + }) .collect::>(); let lock_results = self .rc diff --git a/runtime/src/bank_utils.rs b/runtime/src/bank_utils.rs index 10835afb82dc49..645630a89a938e 100644 --- a/runtime/src/bank_utils.rs +++ b/runtime/src/bank_utils.rs @@ -7,7 +7,7 @@ use { solana_sdk::{pubkey::Pubkey, signature::Signer}, }; use { - solana_sdk::transaction::SanitizedTransaction, + solana_sdk::transaction::ExtendedSanitizedTransaction, solana_svm::transaction_results::TransactionResults, solana_vote::{vote_parser, vote_sender_types::ReplayVoteSender}, }; @@ -37,7 +37,7 @@ pub fn setup_bank_and_vote_pubkeys_for_tests( } pub fn find_and_send_votes( - sanitized_txs: &[SanitizedTransaction], + sanitized_txs: &[ExtendedSanitizedTransaction], tx_results: &TransactionResults, vote_sender: Option<&ReplayVoteSender>, ) { @@ -49,8 +49,11 @@ pub fn find_and_send_votes( .iter() .zip(execution_results.iter()) .for_each(|(tx, result)| { - if tx.is_simple_vote_transaction() && result.was_executed_successfully() { - if let Some(parsed_vote) = vote_parser::parse_sanitized_vote_transaction(tx) { + if tx.transaction.is_simple_vote_transaction() && result.was_executed_successfully() + { + if let Some(parsed_vote) = + vote_parser::parse_sanitized_vote_transaction(&tx.transaction) + { if parsed_vote.1.last_voted_slot().is_some() { let _ = vote_sender.send(parsed_vote); } diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index d39a18d567232a..ff62b69a8d87d9 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -27,7 +27,7 @@ use { solana_sdk::{ hash::Hash, slot_history::Slot, - transaction::{Result, SanitizedTransaction}, + transaction::{ExtendedSanitizedTransaction, Result}, }, std::{ fmt::Debug, @@ -104,7 +104,7 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static { // Calling this is illegal as soon as wait_for_termination is called. fn schedule_execution<'a>( &'a self, - transaction_with_index: &'a (&'a SanitizedTransaction, usize), + transaction_with_index: &'a (&'a ExtendedSanitizedTransaction, usize), ); /// Wait for a scheduler to terminate after processing. @@ -289,7 +289,9 @@ impl BankWithScheduler { // 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet... pub fn schedule_transaction_executions<'a>( &self, - transactions_with_indexes: impl ExactSizeIterator, + transactions_with_indexes: impl ExactSizeIterator< + Item = (&'a ExtendedSanitizedTransaction, &'a usize), + >, ) { trace!( "schedule_transaction_executions(): {} txs", diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index 0490f594451b9c..363379178174bd 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -8,7 +8,7 @@ use { solana_sdk::{ clock::{BankId, Slot}, pubkey::Pubkey, - transaction::SanitizedTransaction, + transaction::ExtendedSanitizedTransaction, }, std::{ collections::HashMap, @@ -208,20 +208,29 @@ impl PrioritizationFeeCache { /// Update with a list of non-vote transactions' compute_budget_details and account_locks; Only /// transactions have both valid compute_budget_details and account_locks will be used to update /// fee_cache asynchronously. - pub fn update<'a>(&self, bank: &Bank, txs: impl Iterator) { + pub fn update<'a>( + &self, + bank: &Bank, + txs: impl Iterator, + ) { let (_, send_updates_time) = measure!( { for sanitized_transaction in txs { // Vote transactions are not prioritized, therefore they are excluded from // updating fee_cache. - if sanitized_transaction.is_simple_vote_transaction() { + if sanitized_transaction + .transaction + .is_simple_vote_transaction() + { continue; } let round_compute_unit_price_enabled = false; // TODO: bank.feture_set.is_active(round_compute_unit_price) let compute_budget_details = sanitized_transaction + .transaction .get_compute_budget_details(round_compute_unit_price_enabled); let account_locks = sanitized_transaction + .transaction .get_account_locks(bank.get_transaction_account_lock_limit()); if compute_budget_details.is_none() || account_locks.is_err() { diff --git a/runtime/src/transaction_batch.rs b/runtime/src/transaction_batch.rs index 66711fd5a1acd5..01f8a46582a285 100644 --- a/runtime/src/transaction_batch.rs +++ b/runtime/src/transaction_batch.rs @@ -1,6 +1,6 @@ use { crate::bank::Bank, - solana_sdk::transaction::{Result, SanitizedTransaction}, + solana_sdk::transaction::{ExtendedSanitizedTransaction, Result}, std::borrow::Cow, }; @@ -8,7 +8,7 @@ use { pub struct TransactionBatch<'a, 'b> { lock_results: Vec>, bank: &'a Bank, - sanitized_txs: Cow<'b, [SanitizedTransaction]>, + sanitized_txs: Cow<'b, [ExtendedSanitizedTransaction]>, needs_unlock: bool, } @@ -16,7 +16,7 @@ impl<'a, 'b> TransactionBatch<'a, 'b> { pub fn new( lock_results: Vec>, bank: &'a Bank, - sanitized_txs: Cow<'b, [SanitizedTransaction]>, + sanitized_txs: Cow<'b, [ExtendedSanitizedTransaction]>, ) -> Self { assert_eq!(lock_results.len(), sanitized_txs.len()); Self { @@ -31,7 +31,7 @@ impl<'a, 'b> TransactionBatch<'a, 'b> { &self.lock_results } - pub fn sanitized_transactions(&self) -> &[SanitizedTransaction] { + pub fn sanitized_transactions(&self) -> &[ExtendedSanitizedTransaction] { &self.sanitized_txs } diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index faea9ab4753c67..8300b57218c696 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -33,6 +33,8 @@ bitflags! { /// the packet is built. /// This field can be removed when the above feature gate is adopted by mainnet-beta. const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000; + /// For tracking performance + const PERF_TRACK_PACKET = 0b0100_0000; } } @@ -228,6 +230,12 @@ impl Meta { self.flags.set(PacketFlags::TRACER_PACKET, is_tracer); } + #[inline] + pub fn set_track_performance(&mut self, is_performance_track: bool) { + self.flags + .set(PacketFlags::PERF_TRACK_PACKET, is_performance_track); + } + #[inline] pub fn set_simple_vote(&mut self, is_simple_vote: bool) { self.flags.set(PacketFlags::SIMPLE_VOTE_TX, is_simple_vote); @@ -261,6 +269,11 @@ impl Meta { self.flags.contains(PacketFlags::TRACER_PACKET) } + #[inline] + pub fn is_perf_track_packet(&self) -> bool { + self.flags.contains(PacketFlags::PERF_TRACK_PACKET) + } + #[inline] pub fn round_compute_unit_price(&self) -> bool { self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE) diff --git a/sdk/src/transaction/sanitized.rs b/sdk/src/transaction/sanitized.rs index b7383b4a0a454c..2d4e2bd21018dc 100644 --- a/sdk/src/transaction/sanitized.rs +++ b/sdk/src/transaction/sanitized.rs @@ -19,6 +19,7 @@ use { transaction::{Result, Transaction, TransactionError, VersionedTransaction}, }, solana_program::message::SanitizedVersionedMessage, + std::time::Instant, }; /// Maximum number of accounts that a transaction may lock. @@ -35,6 +36,22 @@ pub struct SanitizedTransaction { signatures: Vec, } +/// Sanitized transaction with option start_time +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct ExtendedSanitizedTransaction { + pub transaction: SanitizedTransaction, + pub start_time: Option, +} + +impl From for ExtendedSanitizedTransaction { + fn from(value: SanitizedTransaction) -> Self { + Self { + transaction: value, + start_time: None, + } + } +} + /// Set of accounts that must be locked for safe transaction processing #[derive(Debug, Clone, Default, Eq, PartialEq)] pub struct TransactionAccountLocks<'a> { diff --git a/sdk/src/transaction/versioned/sanitized.rs b/sdk/src/transaction/versioned/sanitized.rs index 61ecdfea56bb2a..b6311d5886b0e3 100644 --- a/sdk/src/transaction/versioned/sanitized.rs +++ b/sdk/src/transaction/versioned/sanitized.rs @@ -33,6 +33,10 @@ impl SanitizedVersionedTransaction { &self.message } + pub fn get_signatures(&self) -> &Vec { + &self.signatures + } + /// Consumes the SanitizedVersionedTransaction, returning the fields individually. pub fn destruct(self) -> (Vec, SanitizedVersionedMessage) { (self.signatures, self.message) diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 8e1eb12dff1d42..55d0030e734607 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -26,9 +26,11 @@ quinn = { workspace = true } quinn-proto = { workspace = true } rand = { workspace = true } rustls = { workspace = true, features = ["dangerous_configuration"] } +solana-measure = { workspace = true } solana-metrics = { workspace = true } solana-perf = { workspace = true } solana-sdk = { workspace = true } +solana-transaction-metrics-tracker = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } x509-parser = { workspace = true } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 225412dd08b315..3485e4fe585d06 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -17,6 +17,7 @@ use { quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, + solana_measure::measure::Measure, solana_perf::packet::{PacketBatch, PACKETS_PER_BATCH}, solana_sdk::{ packet::{Meta, PACKET_DATA_SIZE}, @@ -27,9 +28,10 @@ use { QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, }, - signature::Keypair, + signature::{Keypair, Signature}, timing, }, + solana_transaction_metrics_tracker::signature_if_should_track_packet, std::{ iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, @@ -81,6 +83,7 @@ struct PacketChunk { struct PacketAccumulator { pub meta: Meta, pub chunks: Vec, + pub start_time: Instant, } #[derive(Copy, Clone, Debug)] @@ -628,6 +631,7 @@ async fn packet_batch_sender( trace!("enter packet_batch_sender"); let mut batch_start_time = Instant::now(); loop { + let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default(); let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); let mut total_bytes: usize = 0; @@ -647,6 +651,8 @@ async fn packet_batch_sender( || (!packet_batch.is_empty() && elapsed >= coalesce) { let len = packet_batch.len(); + track_streamer_fetch_packet_performance(&mut packet_perf_measure, &stats); + if let Err(e) = packet_sender.send(packet_batch) { stats .total_packet_batch_send_err @@ -692,6 +698,14 @@ async fn packet_batch_sender( total_bytes += packet_batch[i].meta().size; + if let Some(signature) = signature_if_should_track_packet(&packet_batch[i]) + .ok() + .flatten() + { + packet_perf_measure.push((*signature, packet_accumulator.start_time)); + // we set the PERF_TRACK_PACKET on + packet_batch[i].meta_mut().set_track_performance(true); + } stats .total_chunks_processed_by_batcher .fetch_add(num_chunks, Ordering::Relaxed); @@ -700,6 +714,32 @@ async fn packet_batch_sender( } } +fn track_streamer_fetch_packet_performance( + packet_perf_measure: &mut [([u8; 64], Instant)], + stats: &Arc, +) { + if packet_perf_measure.is_empty() { + return; + } + let mut measure = Measure::start("track_perf"); + let mut process_sampled_packets_us_hist = stats.process_sampled_packets_us_hist.lock().unwrap(); + + for (signature, start_time) in packet_perf_measure.iter() { + let duration = Instant::now().duration_since(*start_time); + debug!( + "QUIC streamer fetch stage took {duration:?} for transaction {:?}", + Signature::from(*signature) + ); + process_sampled_packets_us_hist + .increment(duration.as_micros() as u64) + .unwrap(); + } + measure.stop(); + stats + .perf_track_overhead_us + .fetch_add(measure.as_us(), Ordering::Relaxed); +} + async fn handle_connection( connection: Connection, remote_addr: SocketAddr, @@ -854,6 +894,7 @@ async fn handle_chunk( *packet_accum = Some(PacketAccumulator { meta, chunks: Vec::new(), + start_time: Instant::now(), }); } @@ -1453,6 +1494,7 @@ pub mod test { offset, end_of_chunk: size, }], + start_time: Instant::now(), }; ptk_sender.send(packet_accum).await.unwrap(); } diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 69a75532b8ca68..3c9d95b2333c42 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -16,8 +16,8 @@ use { std::{ net::UdpSocket, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, RwLock, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, + Arc, Mutex, RwLock, }, thread, time::{Duration, SystemTime}, @@ -175,10 +175,13 @@ pub struct StreamStats { pub(crate) stream_load_ema: AtomicUsize, pub(crate) stream_load_ema_overflow: AtomicUsize, pub(crate) stream_load_capacity_overflow: AtomicUsize, + pub(crate) process_sampled_packets_us_hist: Mutex, + pub(crate) perf_track_overhead_us: AtomicU64, } impl StreamStats { pub fn report(&self, name: &'static str) { + let process_sampled_packets_us_hist = self.process_sampled_packets_us_hist.lock().unwrap(); datapoint_info!( name, ( @@ -425,6 +428,33 @@ impl StreamStats { self.stream_load_capacity_overflow.load(Ordering::Relaxed), i64 ), + ( + "process_sampled_packets_us_90pct", + process_sampled_packets_us_hist + .percentile(90.0) + .unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_min", + process_sampled_packets_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_max", + process_sampled_packets_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_mean", + process_sampled_packets_us_hist.mean().unwrap_or(0), + i64 + ), + ( + "perf_track_overhead_us", + self.perf_track_overhead_us.swap(0, Ordering::Relaxed), + i64 + ), ); } } diff --git a/svm/src/account_loader.rs b/svm/src/account_loader.rs index 1c02ded24665ff..f60ac957b3592b 100644 --- a/svm/src/account_loader.rs +++ b/svm/src/account_loader.rs @@ -30,7 +30,7 @@ use { rent_debits::RentDebits, saturating_add_assign, sysvar::{self, instructions::construct_instructions_data}, - transaction::{self, Result, SanitizedTransaction, TransactionError}, + transaction::{self, ExtendedSanitizedTransaction, Result, TransactionError}, transaction_context::{IndexOfAccount, TransactionAccount}, }, solana_system_program::{get_system_account_kind, SystemAccountKind}, @@ -53,7 +53,7 @@ pub type TransactionCheckResult = (transaction::Result<()>, Option pub fn load_accounts( callbacks: &CB, - txs: &[SanitizedTransaction], + txs: &[ExtendedSanitizedTransaction], lock_results: &[TransactionCheckResult], error_counters: &mut TransactionErrorMetrics, fee_structure: &FeeStructure, @@ -66,7 +66,7 @@ pub fn load_accounts( .zip(lock_results) .map(|etx| match etx { (tx, (Ok(()), nonce, lamports_per_signature)) => { - let message = tx.message(); + let message = tx.transaction.message(); let fee = if let Some(lamports_per_signature) = lamports_per_signature { fee_structure.calculate_fee( message, diff --git a/svm/src/transaction_processor.rs b/svm/src/transaction_processor.rs index 38c5c23affd4de..53136462a2cb64 100644 --- a/svm/src/transaction_processor.rs +++ b/svm/src/transaction_processor.rs @@ -43,7 +43,7 @@ use { pubkey::Pubkey, rent_collector::RentCollector, saturating_add_assign, - transaction::{self, SanitizedTransaction, TransactionError}, + transaction::{self, ExtendedSanitizedTransaction, SanitizedTransaction, TransactionError}, transaction_context::{ExecutionRecord, TransactionContext}, }, std::{ @@ -180,7 +180,7 @@ impl TransactionBatchProcessor { pub fn load_and_execute_sanitized_transactions<'a, CB: TransactionProcessingCallback>( &self, callbacks: &CB, - sanitized_txs: &[SanitizedTransaction], + sanitized_txs: &[ExtendedSanitizedTransaction], check_results: &mut [TransactionCheckResult], error_counters: &mut TransactionErrorMetrics, enable_cpi_recording: bool, @@ -244,7 +244,7 @@ impl TransactionBatchProcessor { let mut compute_budget_process_transaction_time = Measure::start("compute_budget_process_transaction_time"); let maybe_compute_budget = ComputeBudget::try_from_instructions( - tx.message().program_instructions_iter(), + tx.transaction.message().program_instructions_iter(), ); compute_budget_process_transaction_time.stop(); saturating_add_assign!( @@ -261,7 +261,7 @@ impl TransactionBatchProcessor { let result = self.execute_loaded_transaction( callbacks, - tx, + &tx.transaction, loaded_transaction, compute_budget, nonce.as_ref().map(DurableNonceFee::from), @@ -325,7 +325,7 @@ impl TransactionBatchProcessor { /// blockhash or nonce. pub fn filter_executable_program_accounts<'a, CB: TransactionProcessingCallback>( callbacks: &CB, - txs: &[SanitizedTransaction], + txs: &[ExtendedSanitizedTransaction], lock_results: &mut [TransactionCheckResult], program_owners: &'a [Pubkey], ) -> HashMap { @@ -333,7 +333,8 @@ impl TransactionBatchProcessor { lock_results.iter_mut().zip(txs).for_each(|etx| { if let ((Ok(()), _nonce, lamports_per_signature), tx) = etx { if lamports_per_signature.is_some() { - tx.message() + tx.transaction + .message() .account_keys() .iter() .for_each(|key| match result.entry(*key) { diff --git a/transaction-metrics-tracker/Cargo.toml b/transaction-metrics-tracker/Cargo.toml new file mode 100644 index 00000000000000..9bd82702a3ebb4 --- /dev/null +++ b/transaction-metrics-tracker/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "solana-transaction-metrics-tracker" +description = "Solana transaction metrics tracker" +documentation = "https://docs.rs/solana-transaction-metrics-tracker" +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } +publish = false + +[dependencies] +Inflector = { workspace = true } +base64 = { workspace = true } +bincode = { workspace = true } +# Update this borsh dependency to the workspace version once +lazy_static = { workspace = true } +log = { workspace = true } +rand = { workspace = true } +solana-perf = { workspace = true } +solana-sdk = { workspace = true } + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs new file mode 100644 index 00000000000000..2baec195de9b84 --- /dev/null +++ b/transaction-metrics-tracker/src/lib.rs @@ -0,0 +1,157 @@ +use { + lazy_static::lazy_static, + log::*, + rand::Rng, + solana_perf::sigverify::PacketError, + solana_sdk::{packet::Packet, short_vec::decode_shortu16_len, signature::SIGNATURE_BYTES}, +}; + +// The mask is 12 bits long (1<<12 = 4096), it means the probability of matching +// the transaction is 1/4096 assuming the portion being matched is random. +lazy_static! { + static ref TXN_MASK: u16 = rand::thread_rng().gen_range(0..4096); +} + +/// Check if a transaction given its signature matches the randomly selected mask. +/// The signaure should be from the reference of Signature +pub fn should_track_transaction(signature: &[u8; SIGNATURE_BYTES]) -> bool { + // We do not use the highest signature byte as it is not really random + let match_portion: u16 = u16::from_le_bytes([signature[61], signature[62]]) >> 4; + trace!("Matching txn: {match_portion:016b} {:016b}", *TXN_MASK); + *TXN_MASK == match_portion +} + +/// Check if a transaction packet's signature matches the mask. +/// This does a rudimentry verification to make sure the packet at least +/// contains the signature data and it returns the reference to the signature. +pub fn signature_if_should_track_packet( + packet: &Packet, +) -> Result, PacketError> { + let signature = get_signature_from_packet(packet)?; + Ok(should_track_transaction(signature).then_some(signature)) +} + +/// Get the signature of the transaction packet +/// This does a rudimentry verification to make sure the packet at least +/// contains the signature data and it returns the reference to the signature. +pub fn get_signature_from_packet(packet: &Packet) -> Result<&[u8; SIGNATURE_BYTES], PacketError> { + let (sig_len_untrusted, sig_start) = packet + .data(..) + .and_then(|bytes| decode_shortu16_len(bytes).ok()) + .ok_or(PacketError::InvalidShortVec)?; + + if sig_len_untrusted < 1 { + return Err(PacketError::InvalidSignatureLen); + } + + let signature = packet + .data(sig_start..sig_start.saturating_add(SIGNATURE_BYTES)) + .ok_or(PacketError::InvalidSignatureLen)?; + let signature = signature + .try_into() + .map_err(|_| PacketError::InvalidSignatureLen)?; + Ok(signature) +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_sdk::{ + hash::Hash, + signature::{Keypair, Signature}, + system_transaction, + }, + }; + + #[test] + fn test_get_signature_from_packet() { + // Default invalid txn packet + let packet = Packet::default(); + let sig = get_signature_from_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidShortVec)); + + // Use a valid transaction, it should succeed + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let mut packet = Packet::from_data(None, tx).unwrap(); + + let sig = get_signature_from_packet(&packet); + assert!(sig.is_ok()); + + // Invalid signature length + packet.buffer_mut()[0] = 0x0; + let sig = get_signature_from_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidSignatureLen)); + } + + #[test] + fn test_should_track_transaction() { + let mut sig = [0x0; SIGNATURE_BYTES]; + let track = should_track_transaction(&sig); + assert!(!track); + + // Intentionally matching the randomly generated mask + // The lower four bits are ignored as only 12 highest bits from + // signature's 61 and 62 u8 are used for matching. + // We generate a random one + let mut rng = rand::thread_rng(); + let random_number: u8 = rng.gen_range(0..=15); + sig[61] = ((*TXN_MASK & 0xf_u16) << 4) as u8 | random_number; + sig[62] = (*TXN_MASK >> 4) as u8; + + let track = should_track_transaction(&sig); + assert!(track); + } + + #[test] + fn test_signature_if_should_track_packet() { + // Default invalid txn packet + let packet = Packet::default(); + let sig = signature_if_should_track_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidShortVec)); + + // Use a valid transaction which is not matched + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let packet = Packet::from_data(None, tx).unwrap(); + let sig = signature_if_should_track_packet(&packet); + assert_eq!(Ok(None), sig); + + // Now simulate a txn matching the signature mask + let mut tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let mut sig = [0x0; SIGNATURE_BYTES]; + sig[61] = ((*TXN_MASK & 0xf_u16) << 4) as u8; + sig[62] = (*TXN_MASK >> 4) as u8; + + let sig = Signature::from(sig); + tx.signatures[0] = sig; + let mut packet = Packet::from_data(None, tx).unwrap(); + let sig2 = signature_if_should_track_packet(&packet); + + match sig2 { + Ok(sig) => { + assert!(sig.is_some()); + } + Err(_) => panic!("Expected to get a matching signature!"), + } + + // Invalid signature length + packet.buffer_mut()[0] = 0x0; + let sig = signature_if_should_track_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidSignatureLen)); + } +} diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 997c6c1745a7c9..e587c3ba06f38d 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -1,12 +1,12 @@ -use solana_sdk::transaction::SanitizedTransaction; +use solana_sdk::transaction::ExtendedSanitizedTransaction; pub struct Task { - transaction: SanitizedTransaction, + transaction: ExtendedSanitizedTransaction, index: usize, } impl Task { - pub fn create_task(transaction: SanitizedTransaction, index: usize) -> Self { + pub fn create_task(transaction: ExtendedSanitizedTransaction, index: usize) -> Self { Task { transaction, index } } @@ -14,7 +14,7 @@ impl Task { self.index } - pub fn transaction(&self) -> &SanitizedTransaction { + pub fn transaction(&self) -> &ExtendedSanitizedTransaction { &self.transaction } } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 09ded82ee88e7d..4d8780c278a3c9 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -26,7 +26,7 @@ use { }, prioritization_fee_cache::PrioritizationFeeCache, }, - solana_sdk::transaction::{Result, SanitizedTransaction}, + solana_sdk::transaction::{ExtendedSanitizedTransaction, Result}, solana_unified_scheduler_logic::Task, solana_vote::vote_sender_types::ReplayVoteSender, std::{ @@ -203,7 +203,7 @@ pub trait TaskHandler: Send + Sync + Debug + Sized + 'static { result: &mut Result<()>, timings: &mut ExecuteTimings, bank: &Arc, - transaction: &SanitizedTransaction, + transaction: &ExtendedSanitizedTransaction, index: usize, handler_context: &HandlerContext, ); @@ -217,7 +217,7 @@ impl TaskHandler for DefaultTaskHandler { result: &mut Result<()>, timings: &mut ExecuteTimings, bank: &Arc, - transaction: &SanitizedTransaction, + transaction: &ExtendedSanitizedTransaction, index: usize, handler_context: &HandlerContext, ) { @@ -740,7 +740,7 @@ impl InstalledScheduler for PooledScheduler { &self.context } - fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { + fn schedule_execution(&self, &(transaction, index): &(&ExtendedSanitizedTransaction, usize)) { let task = Task::create_task(transaction.clone(), index); self.inner.thread_manager.send_task(task); }