diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index b68b5e9734aa3b..04ba0d117cf4ec 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -145,16 +145,10 @@ impl BankingStage { buffered_packets: &[(Packets, Vec)], ) -> Result { let mut unprocessed_packets = vec![]; - let mut bank_shutdown = false; let mut rebuffered_packets = 0; let mut new_tx_count = 0; - for (msgs, unprocessed_indexes) in buffered_packets { - if bank_shutdown { - rebuffered_packets += unprocessed_indexes.len(); - unprocessed_packets.push((msgs.to_owned(), unprocessed_indexes.to_owned())); - continue; - } - + let mut buffered_packets_iter = buffered_packets.iter(); + while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() { let bank = poh_recorder.lock().unwrap().bank(); if bank.is_none() { rebuffered_packets += unprocessed_indexes.len(); @@ -173,14 +167,22 @@ impl BankingStage { new_tx_count += processed; - if processed < verified_txs_len { - bank_shutdown = true; - } // Collect any unprocessed transactions in this batch for forwarding if !new_unprocessed_indexes.is_empty() { rebuffered_packets += new_unprocessed_indexes.len(); unprocessed_packets.push((msgs.to_owned(), new_unprocessed_indexes)); } + + if processed < verified_txs_len { + // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones + while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() { + let unprocessed_indexes = + Self::filter_unprocessed_packets(&bank, &msgs, &unprocessed_indexes); + if !unprocessed_indexes.is_empty() { + unprocessed_packets.push((msgs.to_owned(), unprocessed_indexes)); + } + } + } } inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets); @@ -540,12 +542,11 @@ impl BankingStage { .collect() } - fn process_received_packets( - bank: &Arc, - poh: &Arc>, + // This function deserializes packets into transactions and returns non-None transactions + fn transactions_from_packets( msgs: &Packets, - transaction_indexes: Vec, - ) -> Result<(usize, usize, Vec)> { + transaction_indexes: &[usize], + ) -> (Vec, Vec) { let packets = Packets::new( transaction_indexes .iter() @@ -555,15 +556,37 @@ impl BankingStage { let transactions = Self::deserialize_transactions(&packets); - debug!( - "banking-stage-tx bank: {} transactions received {}", - bank.slot(), - transactions.len() + Self::filter_transaction_indexes(transactions, &transaction_indexes) + } + + // This function filters pending transactions that are still valid + fn filter_pending_transactions( + bank: &Arc, + transactions: &[Transaction], + transaction_indexes: &[usize], + pending_indexes: &[usize], + ) -> Vec { + let filter = Self::prepare_filter_for_pending_transactions(transactions, pending_indexes); + + let mut error_counters = ErrorCounters::default(); + let result = bank.check_transactions( + transactions, + &filter, + (MAX_RECENT_BLOCKHASHES - MAX_TRANSACTION_FORWARDING_DELAY) / 2, + &mut error_counters, ); - let (transactions, transaction_indexes) = - Self::filter_transaction_indexes(transactions, &transaction_indexes); + Self::filter_valid_transaction_indexes(&result, transaction_indexes) + } + fn process_received_packets( + bank: &Arc, + poh: &Arc>, + msgs: &Packets, + transaction_indexes: Vec, + ) -> Result<(usize, usize, Vec)> { + let (transactions, transaction_indexes) = + Self::transactions_from_packets(msgs, &transaction_indexes); debug!( "bank: {} filtered transactions {}", bank.slot(), @@ -575,22 +598,46 @@ impl BankingStage { let (processed, unprocessed_tx_indexes) = Self::process_transactions(bank, &transactions, poh)?; - let filter = - Self::prepare_filter_for_pending_transactions(&transactions, &unprocessed_tx_indexes); + let unprocessed_tx_count = unprocessed_tx_indexes.len(); - let mut error_counters = ErrorCounters::default(); - let result = bank.check_transactions( + let filtered_unprocessed_tx_indexes = Self::filter_pending_transactions( + bank, &transactions, - &filter, - (MAX_RECENT_BLOCKHASHES - MAX_TRANSACTION_FORWARDING_DELAY) / 2, - &mut error_counters, + &transaction_indexes, + &unprocessed_tx_indexes, + ); + inc_new_counter_info!( + "banking_stage-dropped_tx_before_forwarding", + unprocessed_tx_count.saturating_sub(filtered_unprocessed_tx_indexes.len()) ); - Ok(( - processed, - tx_len, - Self::filter_valid_transaction_indexes(&result, &transaction_indexes), - )) + Ok((processed, tx_len, filtered_unprocessed_tx_indexes)) + } + + fn filter_unprocessed_packets( + bank: &Arc, + msgs: &Packets, + transaction_indexes: &[usize], + ) -> Vec { + let (transactions, transaction_indexes) = + Self::transactions_from_packets(msgs, &transaction_indexes); + + let tx_count = transaction_indexes.len(); + + let unprocessed_tx_indexes = (0..transactions.len()).collect_vec(); + let filtered_unprocessed_tx_indexes = Self::filter_pending_transactions( + bank, + &transactions, + &transaction_indexes, + &unprocessed_tx_indexes, + ); + + inc_new_counter_info!( + "banking_stage-dropped_tx_before_forwarding", + tx_count.saturating_sub(filtered_unprocessed_tx_indexes.len()) + ); + + filtered_unprocessed_tx_indexes } /// Process the incoming packets @@ -616,22 +663,14 @@ impl BankingStage { inc_new_counter_info!("banking_stage-transactions_received", count); let proc_start = Instant::now(); let mut new_tx_count = 0; - + let mut mms_iter = mms.into_iter(); let mut unprocessed_packets = vec![]; - let mut bank_shutdown = false; - for (msgs, vers) in mms { + while let Some((msgs, vers)) = mms_iter.next() { let packet_indexes: Vec = vers .iter() .enumerate() .filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None }) .collect(); - if bank_shutdown { - if !packet_indexes.is_empty() { - unprocessed_packets.push((msgs, packet_indexes)); - } - continue; - } - let bank = poh.lock().unwrap().bank(); if bank.is_none() { if !packet_indexes.is_empty() { @@ -644,15 +683,28 @@ impl BankingStage { let (processed, verified_txs_len, unprocessed_indexes) = Self::process_received_packets(&bank, &poh, &msgs, packet_indexes)?; - if processed < verified_txs_len { - bank_shutdown = true; - } + new_tx_count += processed; + // Collect any unprocessed transactions in this batch for forwarding if !unprocessed_indexes.is_empty() { unprocessed_packets.push((msgs, unprocessed_indexes)); } - new_tx_count += processed; + if processed < verified_txs_len { + // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones + while let Some((msgs, vers)) = mms_iter.next() { + let packet_indexes: Vec = vers + .iter() + .enumerate() + .filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None }) + .collect(); + let unprocessed_indexes = + Self::filter_unprocessed_packets(&bank, &msgs, &packet_indexes); + if !unprocessed_indexes.is_empty() { + unprocessed_packets.push((msgs, unprocessed_indexes)); + } + } + } } inc_new_counter_info!( diff --git a/sdk/src/timing.rs b/sdk/src/timing.rs index 9858b94c17e5be..a05c8ca11ef9f2 100644 --- a/sdk/src/timing.rs +++ b/sdk/src/timing.rs @@ -26,7 +26,12 @@ pub const MAX_RECENT_BLOCKHASHES: usize = MAX_HASH_AGE_IN_SECONDS; /// This is maximum time consumed in forwarding a transaction from one node to next, before /// it can be processed in the target node -pub const MAX_TRANSACTION_FORWARDING_DELAY: usize = 3; +#[cfg(feature = "cuda")] +pub const MAX_TRANSACTION_FORWARDING_DELAY: usize = 4; + +/// More delay is expected if CUDA is not enabled (as signature verification takes longer) +#[cfg(not(feature = "cuda"))] +pub const MAX_TRANSACTION_FORWARDING_DELAY: usize = 12; pub fn duration_as_ns(d: &Duration) -> u64 { d.as_secs() * 1_000_000_000 + u64::from(d.subsec_nanos())