diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 4744579f45..9185683b9d 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -286,14 +286,15 @@ impl UnprocessedTransactionStorage { }) } - pub fn new_bundle_storage( - unprocessed_bundle_storage: VecDeque, - cost_model_failed_bundles: VecDeque, - ) -> Self { + pub fn new_bundle_storage() -> Self { Self::BundleStorage(BundleStorage { last_update_slot: Slot::default(), - unprocessed_bundle_storage, - cost_model_buffered_bundle_storage: cost_model_failed_bundles, + unprocessed_bundle_storage: VecDeque::with_capacity( + BundleStorage::BUNDLE_STORAGE_CAPACITY, + ), + cost_model_buffered_bundle_storage: VecDeque::with_capacity( + BundleStorage::BUNDLE_STORAGE_CAPACITY, + ), }) } @@ -1104,6 +1105,7 @@ pub struct BundleStorage { } impl BundleStorage { + pub const BUNDLE_STORAGE_CAPACITY: usize = 1000; fn is_empty(&self) -> bool { self.unprocessed_bundle_storage.is_empty() } @@ -1152,24 +1154,33 @@ impl BundleStorage { deserialized_bundles: Vec, push_back: bool, ) -> InsertPacketBundlesSummary { - let mut num_bundles_inserted: usize = 0; - let mut num_packets_inserted: usize = 0; - let mut num_bundles_dropped: usize = 0; - let mut num_packets_dropped: usize = 0; - - for bundle in deserialized_bundles { - if deque.capacity() == deque.len() { - saturating_add_assign!(num_bundles_dropped, 1); - saturating_add_assign!(num_packets_dropped, bundle.len()); - } else { - saturating_add_assign!(num_bundles_inserted, 1); - saturating_add_assign!(num_packets_inserted, bundle.len()); - if push_back { - deque.push_back(bundle); - } else { - deque.push_front(bundle) - } - } + // deque should be initialized with size [Self::BUNDLE_STORAGE_CAPACITY] + let deque_free_space = Self::BUNDLE_STORAGE_CAPACITY + .checked_sub(deque.len()) + .unwrap(); + let bundles_to_insert_count = std::cmp::min(deque_free_space, deserialized_bundles.len()); + let num_bundles_dropped = deserialized_bundles + .len() + .checked_sub(bundles_to_insert_count) + .unwrap(); + let num_packets_inserted = deserialized_bundles + .iter() + .take(bundles_to_insert_count) + .map(|b| b.len()) + .sum::(); + let num_packets_dropped = deserialized_bundles + .iter() + .skip(bundles_to_insert_count) + .map(|b| b.len()) + .sum::(); + + let to_insert = deserialized_bundles + .into_iter() + .take(bundles_to_insert_count); + if push_back { + deque.extend(to_insert) + } else { + to_insert.for_each(|b| deque.push_front(b)); } InsertPacketBundlesSummary { @@ -1178,7 +1189,7 @@ impl BundleStorage { num_dropped_tracer_packets: 0, } .into(), - num_bundles_inserted, + num_bundles_inserted: bundles_to_insert_count, num_packets_inserted, num_bundles_dropped, } diff --git a/core/src/bundle_stage.rs b/core/src/bundle_stage.rs index 3a4103831b..08487e74ce 100644 --- a/core/src/bundle_stage.rs +++ b/core/src/bundle_stage.rs @@ -27,7 +27,6 @@ use { solana_sdk::timing::AtomicInterval, solana_vote::vote_sender_types::ReplayVoteSender, std::{ - collections::VecDeque, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, Mutex, RwLock, @@ -265,10 +264,7 @@ impl BundleStage { ); let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); - let unprocessed_bundle_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(1_000), - VecDeque::with_capacity(1_000), - ); + let unprocessed_bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(); let reserved_ticks = poh_recorder .read() diff --git a/core/src/bundle_stage/bundle_consumer.rs b/core/src/bundle_stage/bundle_consumer.rs index 7cfaccc4cb..5c10712670 100644 --- a/core/src/bundle_stage/bundle_consumer.rs +++ b/core/src/bundle_stage/bundle_consumer.rs @@ -835,7 +835,7 @@ mod tests { }, solana_streamer::socket::SocketAddrSpace, std::{ - collections::{HashSet, VecDeque}, + collections::HashSet, str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, @@ -1089,10 +1089,7 @@ mod tests { let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(); let mut bundle_stage_leader_metrics = BundleStageLeaderMetrics::new(1); let mut packet_bundles = make_random_overlapping_bundles( @@ -1242,10 +1239,7 @@ mod tests { let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(); let mut bundle_stage_leader_metrics = BundleStageLeaderMetrics::new(1); // MAIN LOGIC diff --git a/core/src/bundle_stage/bundle_packet_receiver.rs b/core/src/bundle_stage/bundle_packet_receiver.rs index 90ae7d3ba2..0c9f0a4849 100644 --- a/core/src/bundle_stage/bundle_packet_receiver.rs +++ b/core/src/bundle_stage/bundle_packet_receiver.rs @@ -165,6 +165,7 @@ impl BundleReceiver { mod tests { use { super::*, + crate::banking_stage::unprocessed_transaction_storage::BundleStorage, crossbeam_channel::unbounded, rand::{thread_rng, RngCore}, solana_bundle::{ @@ -182,7 +183,7 @@ mod tests { system_transaction::transfer, transaction::VersionedTransaction, }, - std::collections::{HashSet, VecDeque}, + std::collections::HashSet, }; /// Makes `num_bundles` random bundles with `num_packets_per_bundle` packets per bundle. @@ -249,10 +250,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(1_000), - VecDeque::with_capacity(1_000), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -303,15 +301,18 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); - let bundles = make_random_bundles(&mint_keypair, 15, 2, genesis_config.hash()); + // send 5 more than capacity + let bundles = make_random_bundles( + &mint_keypair, + BundleStorage::BUNDLE_STORAGE_CAPACITY + 5, + 2, + genesis_config.hash(), + ); sender.send(bundles.clone()).unwrap(); @@ -325,9 +326,9 @@ mod tests { assert!(result.is_ok()); let bundle_storage = unprocessed_storage.bundle_storage().unwrap(); - // 15 bundles were sent, but the capacity is 10 - assert_eq!(bundle_storage.unprocessed_bundles_len(), 10); - assert_eq!(bundle_storage.unprocessed_packets_len(), 20); + // 1005 bundles were sent, but the capacity is 1000 + assert_eq!(bundle_storage.unprocessed_bundles_len(), 1000); + assert_eq!(bundle_storage.unprocessed_packets_len(), 2000); assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 0); assert_eq!(bundle_storage.cost_model_buffered_packets_len(), 0); @@ -336,8 +337,11 @@ mod tests { &mut bundle_stage_leader_metrics, &HashSet::default(), |bundles_to_process, _stats| { - // make sure the first 10 bundles are the ones to process - assert_bundles_same(&bundles[0..10], bundles_to_process); + // make sure the first 1000 bundles are the ones to process + assert_bundles_same( + &bundles[0..BundleStorage::BUNDLE_STORAGE_CAPACITY], + bundles_to_process, + ); (0..bundles_to_process.len()).map(|_| Ok(())).collect() } )); @@ -356,10 +360,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -426,10 +427,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -494,10 +492,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -545,10 +540,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -594,10 +586,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -639,10 +628,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -719,16 +705,18 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); - // send 15 bundles across the queue - let bundles0 = make_random_bundles(&mint_keypair, 5, 2, genesis_config.hash()); + // send 500 bundles across the queue + let bundles0 = make_random_bundles( + &mint_keypair, + BundleStorage::BUNDLE_STORAGE_CAPACITY / 2, + 2, + genesis_config.hash(), + ); sender.send(bundles0.clone()).unwrap(); let mut bundle_stage_stats = BundleStageLoopMetrics::default(); @@ -754,11 +742,16 @@ mod tests { } )); assert_eq!(bundle_storage.unprocessed_bundles_len(), 0); - assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 5); + assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 500); - let bundles1 = make_random_bundles(&mint_keypair, 5, 2, genesis_config.hash()); + let bundles1 = make_random_bundles( + &mint_keypair, + BundleStorage::BUNDLE_STORAGE_CAPACITY / 2, + 2, + genesis_config.hash(), + ); sender.send(bundles1.clone()).unwrap(); - // should get 5 more bundles + cost model buffered length should be 10 + // should get 500 more bundles, cost model buffered length should be 1000 let result = bundle_receiver.receive_and_buffer_bundles( &mut unprocessed_storage, &mut bundle_stage_stats, @@ -778,9 +771,10 @@ mod tests { } )); assert_eq!(bundle_storage.unprocessed_bundles_len(), 0); - assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 10); + assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 1000); // full now - let bundles2 = make_random_bundles(&mint_keypair, 5, 2, genesis_config.hash()); + // send 10 bundles to go over capacity + let bundles2 = make_random_bundles(&mint_keypair, 10, 2, genesis_config.hash()); sender.send(bundles2.clone()).unwrap(); // this set will get dropped from cost model buffered bundles @@ -803,7 +797,7 @@ mod tests { } )); assert_eq!(bundle_storage.unprocessed_bundles_len(), 0); - assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 10); + assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 1000); // create new bank then call process_bundles again, expect to see [bundles1,bundles2] let bank = bank_forks.read().unwrap().working_bank();