From 3a1485d810d3a047f8b6e5c8e4177853361a7ea6 Mon Sep 17 00:00:00 2001 From: Eric Semeniuc <3838856+esemeniuc@users.noreply.github.com> Date: Wed, 31 Jan 2024 18:41:23 -0800 Subject: [PATCH 1/6] rebase --- .../unprocessed_transaction_storage.rs | 42 ++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 4744579f45..d86b9910c1 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -1152,24 +1152,28 @@ impl BundleStorage { deserialized_bundles: Vec, push_back: bool, ) -> InsertPacketBundlesSummary { - let mut num_bundles_inserted: usize = 0; - let mut num_packets_inserted: usize = 0; - let mut num_bundles_dropped: usize = 0; - let mut num_packets_dropped: usize = 0; - - for bundle in deserialized_bundles { - if deque.capacity() == deque.len() { - saturating_add_assign!(num_bundles_dropped, 1); - saturating_add_assign!(num_packets_dropped, bundle.len()); - } else { - saturating_add_assign!(num_bundles_inserted, 1); - saturating_add_assign!(num_packets_inserted, bundle.len()); - if push_back { - deque.push_back(bundle); - } else { - deque.push_front(bundle) - } - } + let deque_bundle_free_space = deque.capacity() - deque.len(); + let bundles_to_insert_count = + std::cmp::min(deque_bundle_free_space, deserialized_bundles.len()); + let num_bundles_dropped = deserialized_bundles.len() - bundles_to_insert_count; + let num_packets_inserted = deserialized_bundles + .iter() + .take(bundles_to_insert_count) + .map(|b| b.len()) + .sum::(); + let num_packets_dropped = deserialized_bundles + .iter() + .skip(bundles_to_insert_count) + .map(|b| b.len()) + .sum::(); + + let to_insert = deserialized_bundles + .into_iter() + .take(bundles_to_insert_count); + if push_back { + deque.extend(to_insert) + } else { + to_insert.for_each(|b| deque.push_front(b)); } InsertPacketBundlesSummary { @@ -1178,7 +1182,7 @@ impl BundleStorage { num_dropped_tracer_packets: 0, } .into(), - num_bundles_inserted, + num_bundles_inserted: bundles_to_insert_count, num_packets_inserted, num_bundles_dropped, } From 3abf901d8673d39c93d3cc7cea4c3ad0383c9e14 Mon Sep 17 00:00:00 2001 From: Eric Semeniuc <3838856+esemeniuc@users.noreply.github.com> Date: Wed, 31 Jan 2024 19:18:49 -0800 Subject: [PATCH 2/6] hard code size --- core/src/banking_stage/unprocessed_transaction_storage.rs | 3 ++- core/src/bundle_stage.rs | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index d86b9910c1..ba4d1695e6 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -1104,6 +1104,7 @@ pub struct BundleStorage { } impl BundleStorage { + pub const BUNDLE_STORAGE_CAPACITY: usize = 1000; fn is_empty(&self) -> bool { self.unprocessed_bundle_storage.is_empty() } @@ -1152,7 +1153,7 @@ impl BundleStorage { deserialized_bundles: Vec, push_back: bool, ) -> InsertPacketBundlesSummary { - let deque_bundle_free_space = deque.capacity() - deque.len(); + let deque_bundle_free_space = Self::BUNDLE_STORAGE_CAPACITY - deque.len(); let bundles_to_insert_count = std::cmp::min(deque_bundle_free_space, deserialized_bundles.len()); let num_bundles_dropped = deserialized_bundles.len() - bundles_to_insert_count; diff --git a/core/src/bundle_stage.rs b/core/src/bundle_stage.rs index 3a4103831b..6d7727656e 100644 --- a/core/src/bundle_stage.rs +++ b/core/src/bundle_stage.rs @@ -5,7 +5,7 @@ use { banking_stage::{ decision_maker::{BufferedPacketsDecision, DecisionMaker}, qos_service::QosService, - unprocessed_transaction_storage::UnprocessedTransactionStorage, + unprocessed_transaction_storage::{BundleStorage, UnprocessedTransactionStorage}, }, bundle_stage::{ bundle_account_locker::BundleAccountLocker, bundle_consumer::BundleConsumer, @@ -266,8 +266,8 @@ impl BundleStage { let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); let unprocessed_bundle_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(1_000), - VecDeque::with_capacity(1_000), + VecDeque::with_capacity(BundleStorage::BUNDLE_STORAGE_CAPACITY), + VecDeque::with_capacity(BundleStorage::BUNDLE_STORAGE_CAPACITY), ); let reserved_ticks = poh_recorder From 5716af82005545a12887c52e92e2b77cba51b403 Mon Sep 17 00:00:00 2001 From: Eric Semeniuc <3838856+esemeniuc@users.noreply.github.com> Date: Wed, 31 Jan 2024 19:20:36 -0800 Subject: [PATCH 3/6] tidy --- core/src/banking_stage/unprocessed_transaction_storage.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index ba4d1695e6..4ec225b7c6 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -1153,9 +1153,8 @@ impl BundleStorage { deserialized_bundles: Vec, push_back: bool, ) -> InsertPacketBundlesSummary { - let deque_bundle_free_space = Self::BUNDLE_STORAGE_CAPACITY - deque.len(); - let bundles_to_insert_count = - std::cmp::min(deque_bundle_free_space, deserialized_bundles.len()); + let deque_free_space = Self::BUNDLE_STORAGE_CAPACITY - deque.len(); + let bundles_to_insert_count = std::cmp::min(deque_free_space, deserialized_bundles.len()); let num_bundles_dropped = deserialized_bundles.len() - bundles_to_insert_count; let num_packets_inserted = deserialized_bundles .iter() From 9f9b49ac8d2c2773077a06a5eefa12d48c2f86b0 Mon Sep 17 00:00:00 2001 From: Eric Semeniuc <3838856+esemeniuc@users.noreply.github.com> Date: Fri, 2 Feb 2024 08:39:50 -0800 Subject: [PATCH 4/6] fixed capacity --- .../unprocessed_transaction_storage.rs | 13 +-- core/src/bundle_stage.rs | 8 +- core/src/bundle_stage/bundle_consumer.rs | 12 +-- .../bundle_stage/bundle_packet_receiver.rs | 94 +++++++++---------- 4 files changed, 56 insertions(+), 71 deletions(-) diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 4ec225b7c6..c73360bc48 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -286,14 +286,15 @@ impl UnprocessedTransactionStorage { }) } - pub fn new_bundle_storage( - unprocessed_bundle_storage: VecDeque, - cost_model_failed_bundles: VecDeque, - ) -> Self { + pub fn new_bundle_storage() -> Self { Self::BundleStorage(BundleStorage { last_update_slot: Slot::default(), - unprocessed_bundle_storage, - cost_model_buffered_bundle_storage: cost_model_failed_bundles, + unprocessed_bundle_storage: VecDeque::with_capacity( + BundleStorage::BUNDLE_STORAGE_CAPACITY, + ), + cost_model_buffered_bundle_storage: VecDeque::with_capacity( + BundleStorage::BUNDLE_STORAGE_CAPACITY, + ), }) } diff --git a/core/src/bundle_stage.rs b/core/src/bundle_stage.rs index 6d7727656e..08487e74ce 100644 --- a/core/src/bundle_stage.rs +++ b/core/src/bundle_stage.rs @@ -5,7 +5,7 @@ use { banking_stage::{ decision_maker::{BufferedPacketsDecision, DecisionMaker}, qos_service::QosService, - unprocessed_transaction_storage::{BundleStorage, UnprocessedTransactionStorage}, + unprocessed_transaction_storage::UnprocessedTransactionStorage, }, bundle_stage::{ bundle_account_locker::BundleAccountLocker, bundle_consumer::BundleConsumer, @@ -27,7 +27,6 @@ use { solana_sdk::timing::AtomicInterval, solana_vote::vote_sender_types::ReplayVoteSender, std::{ - collections::VecDeque, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, Mutex, RwLock, @@ -265,10 +264,7 @@ impl BundleStage { ); let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); - let unprocessed_bundle_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(BundleStorage::BUNDLE_STORAGE_CAPACITY), - VecDeque::with_capacity(BundleStorage::BUNDLE_STORAGE_CAPACITY), - ); + let unprocessed_bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(); let reserved_ticks = poh_recorder .read() diff --git a/core/src/bundle_stage/bundle_consumer.rs b/core/src/bundle_stage/bundle_consumer.rs index 7cfaccc4cb..5c10712670 100644 --- a/core/src/bundle_stage/bundle_consumer.rs +++ b/core/src/bundle_stage/bundle_consumer.rs @@ -835,7 +835,7 @@ mod tests { }, solana_streamer::socket::SocketAddrSpace, std::{ - collections::{HashSet, VecDeque}, + collections::HashSet, str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, @@ -1089,10 +1089,7 @@ mod tests { let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(); let mut bundle_stage_leader_metrics = BundleStageLeaderMetrics::new(1); let mut packet_bundles = make_random_overlapping_bundles( @@ -1242,10 +1239,7 @@ mod tests { let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(); let mut bundle_stage_leader_metrics = BundleStageLeaderMetrics::new(1); // MAIN LOGIC diff --git a/core/src/bundle_stage/bundle_packet_receiver.rs b/core/src/bundle_stage/bundle_packet_receiver.rs index 90ae7d3ba2..e352129fa6 100644 --- a/core/src/bundle_stage/bundle_packet_receiver.rs +++ b/core/src/bundle_stage/bundle_packet_receiver.rs @@ -165,6 +165,7 @@ impl BundleReceiver { mod tests { use { super::*, + crate::banking_stage::unprocessed_transaction_storage::BundleStorage, crossbeam_channel::unbounded, rand::{thread_rng, RngCore}, solana_bundle::{ @@ -182,7 +183,7 @@ mod tests { system_transaction::transfer, transaction::VersionedTransaction, }, - std::collections::{HashSet, VecDeque}, + std::collections::HashSet, }; /// Makes `num_bundles` random bundles with `num_packets_per_bundle` packets per bundle. @@ -249,10 +250,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(1_000), - VecDeque::with_capacity(1_000), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -303,15 +301,18 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); - let bundles = make_random_bundles(&mint_keypair, 15, 2, genesis_config.hash()); + // send 5 more than capacity + let bundles = make_random_bundles( + &mint_keypair, + BundleStorage::BUNDLE_STORAGE_CAPACITY + 5, + 2, + genesis_config.hash(), + ); sender.send(bundles.clone()).unwrap(); @@ -325,9 +326,9 @@ mod tests { assert!(result.is_ok()); let bundle_storage = unprocessed_storage.bundle_storage().unwrap(); - // 15 bundles were sent, but the capacity is 10 - assert_eq!(bundle_storage.unprocessed_bundles_len(), 10); - assert_eq!(bundle_storage.unprocessed_packets_len(), 20); + // 1005 bundles were sent, but the capacity is 1000 + assert_eq!(bundle_storage.unprocessed_bundles_len(), 1000); + assert_eq!(bundle_storage.unprocessed_packets_len(), 2000); assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 0); assert_eq!(bundle_storage.cost_model_buffered_packets_len(), 0); @@ -337,7 +338,10 @@ mod tests { &HashSet::default(), |bundles_to_process, _stats| { // make sure the first 10 bundles are the ones to process - assert_bundles_same(&bundles[0..10], bundles_to_process); + assert_bundles_same( + &bundles[0..BundleStorage::BUNDLE_STORAGE_CAPACITY], + bundles_to_process, + ); (0..bundles_to_process.len()).map(|_| Ok(())).collect() } )); @@ -356,10 +360,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -426,10 +427,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -494,10 +492,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -545,10 +540,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -594,10 +586,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -639,10 +628,7 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); @@ -719,16 +705,18 @@ mod tests { } = create_genesis_config(10_000); let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage( - VecDeque::with_capacity(10), - VecDeque::with_capacity(10), - ); + let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(); let (sender, receiver) = unbounded(); let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5)); - // send 15 bundles across the queue - let bundles0 = make_random_bundles(&mint_keypair, 5, 2, genesis_config.hash()); + // send 500 bundles across the queue + let bundles0 = make_random_bundles( + &mint_keypair, + BundleStorage::BUNDLE_STORAGE_CAPACITY / 2, + 2, + genesis_config.hash(), + ); sender.send(bundles0.clone()).unwrap(); let mut bundle_stage_stats = BundleStageLoopMetrics::default(); @@ -754,11 +742,16 @@ mod tests { } )); assert_eq!(bundle_storage.unprocessed_bundles_len(), 0); - assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 5); + assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 500); - let bundles1 = make_random_bundles(&mint_keypair, 5, 2, genesis_config.hash()); + let bundles1 = make_random_bundles( + &mint_keypair, + BundleStorage::BUNDLE_STORAGE_CAPACITY / 2, + 2, + genesis_config.hash(), + ); sender.send(bundles1.clone()).unwrap(); - // should get 5 more bundles + cost model buffered length should be 10 + // should get 500 more bundles, cost model buffered length should be 1000 let result = bundle_receiver.receive_and_buffer_bundles( &mut unprocessed_storage, &mut bundle_stage_stats, @@ -778,9 +771,10 @@ mod tests { } )); assert_eq!(bundle_storage.unprocessed_bundles_len(), 0); - assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 10); + assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 1000); // full now - let bundles2 = make_random_bundles(&mint_keypair, 5, 2, genesis_config.hash()); + // send 10 bundles to go over capacity + let bundles2 = make_random_bundles(&mint_keypair, 10, 2, genesis_config.hash()); sender.send(bundles2.clone()).unwrap(); // this set will get dropped from cost model buffered bundles @@ -803,7 +797,7 @@ mod tests { } )); assert_eq!(bundle_storage.unprocessed_bundles_len(), 0); - assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 10); + assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 1000); // create new bank then call process_bundles again, expect to see [bundles1,bundles2] let bank = bank_forks.read().unwrap().working_bank(); From 0480d7a249549b426bea173d4ca2898b90d091a3 Mon Sep 17 00:00:00 2001 From: Eric Semeniuc <3838856+esemeniuc@users.noreply.github.com> Date: Fri, 2 Feb 2024 08:40:54 -0800 Subject: [PATCH 5/6] comment --- core/src/bundle_stage/bundle_packet_receiver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/bundle_stage/bundle_packet_receiver.rs b/core/src/bundle_stage/bundle_packet_receiver.rs index e352129fa6..0c9f0a4849 100644 --- a/core/src/bundle_stage/bundle_packet_receiver.rs +++ b/core/src/bundle_stage/bundle_packet_receiver.rs @@ -337,7 +337,7 @@ mod tests { &mut bundle_stage_leader_metrics, &HashSet::default(), |bundles_to_process, _stats| { - // make sure the first 10 bundles are the ones to process + // make sure the first 1000 bundles are the ones to process assert_bundles_same( &bundles[0..BundleStorage::BUNDLE_STORAGE_CAPACITY], bundles_to_process, From f6522afd89f0f1a2552ba7de6001f8aeee7e0bb8 Mon Sep 17 00:00:00 2001 From: Eric Semeniuc <3838856+esemeniuc@users.noreply.github.com> Date: Mon, 5 Feb 2024 11:17:18 -0800 Subject: [PATCH 6/6] feedbackg --- .../banking_stage/unprocessed_transaction_storage.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index c73360bc48..9185683b9d 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -1154,9 +1154,15 @@ impl BundleStorage { deserialized_bundles: Vec, push_back: bool, ) -> InsertPacketBundlesSummary { - let deque_free_space = Self::BUNDLE_STORAGE_CAPACITY - deque.len(); + // deque should be initialized with size [Self::BUNDLE_STORAGE_CAPACITY] + let deque_free_space = Self::BUNDLE_STORAGE_CAPACITY + .checked_sub(deque.len()) + .unwrap(); let bundles_to_insert_count = std::cmp::min(deque_free_space, deserialized_bundles.len()); - let num_bundles_dropped = deserialized_bundles.len() - bundles_to_insert_count; + let num_bundles_dropped = deserialized_bundles + .len() + .checked_sub(bundles_to_insert_count) + .unwrap(); let num_packets_inserted = deserialized_bundles .iter() .take(bundles_to_insert_count)