Skip to content

Commit

Permalink
[JIT-1711] Compare the unprocessed transaction storage BundleStorage…
Browse files Browse the repository at this point in the history
… against a constant instead of VecDeque::capacity() (#587)
  • Loading branch information
esemeniuc authored and jito-infra committed Feb 27, 2024
1 parent ee1246f commit 7af7f84
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 90 deletions.
61 changes: 36 additions & 25 deletions core/src/banking_stage/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,15 @@ impl UnprocessedTransactionStorage {
})
}

pub fn new_bundle_storage(
unprocessed_bundle_storage: VecDeque<ImmutableDeserializedBundle>,
cost_model_failed_bundles: VecDeque<ImmutableDeserializedBundle>,
) -> Self {
pub fn new_bundle_storage() -> Self {
Self::BundleStorage(BundleStorage {
last_update_slot: Slot::default(),
unprocessed_bundle_storage,
cost_model_buffered_bundle_storage: cost_model_failed_bundles,
unprocessed_bundle_storage: VecDeque::with_capacity(
BundleStorage::BUNDLE_STORAGE_CAPACITY,
),
cost_model_buffered_bundle_storage: VecDeque::with_capacity(
BundleStorage::BUNDLE_STORAGE_CAPACITY,
),
})
}

Expand Down Expand Up @@ -1131,6 +1132,7 @@ pub struct BundleStorage {
}

impl BundleStorage {
pub const BUNDLE_STORAGE_CAPACITY: usize = 1000;
fn is_empty(&self) -> bool {
self.unprocessed_bundle_storage.is_empty()
}
Expand Down Expand Up @@ -1179,24 +1181,33 @@ impl BundleStorage {
deserialized_bundles: Vec<ImmutableDeserializedBundle>,
push_back: bool,
) -> InsertPacketBundlesSummary {
let mut num_bundles_inserted: usize = 0;
let mut num_packets_inserted: usize = 0;
let mut num_bundles_dropped: usize = 0;
let mut num_packets_dropped: usize = 0;

for bundle in deserialized_bundles {
if deque.capacity() == deque.len() {
saturating_add_assign!(num_bundles_dropped, 1);
saturating_add_assign!(num_packets_dropped, bundle.len());
} else {
saturating_add_assign!(num_bundles_inserted, 1);
saturating_add_assign!(num_packets_inserted, bundle.len());
if push_back {
deque.push_back(bundle);
} else {
deque.push_front(bundle)
}
}
// deque should be initialized with size [Self::BUNDLE_STORAGE_CAPACITY]
let deque_free_space = Self::BUNDLE_STORAGE_CAPACITY
.checked_sub(deque.len())
.unwrap();
let bundles_to_insert_count = std::cmp::min(deque_free_space, deserialized_bundles.len());
let num_bundles_dropped = deserialized_bundles
.len()
.checked_sub(bundles_to_insert_count)
.unwrap();
let num_packets_inserted = deserialized_bundles
.iter()
.take(bundles_to_insert_count)
.map(|b| b.len())
.sum::<usize>();
let num_packets_dropped = deserialized_bundles
.iter()
.skip(bundles_to_insert_count)
.map(|b| b.len())
.sum::<usize>();

let to_insert = deserialized_bundles
.into_iter()
.take(bundles_to_insert_count);
if push_back {
deque.extend(to_insert)
} else {
to_insert.for_each(|b| deque.push_front(b));
}

InsertPacketBundlesSummary {
Expand All @@ -1205,7 +1216,7 @@ impl BundleStorage {
num_dropped_tracer_packets: 0,
}
.into(),
num_bundles_inserted,
num_bundles_inserted: bundles_to_insert_count,
num_packets_inserted,
num_bundles_dropped,
}
Expand Down
6 changes: 1 addition & 5 deletions core/src/bundle_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use {
solana_sdk::timing::AtomicInterval,
solana_vote::vote_sender_types::ReplayVoteSender,
std::{
collections::VecDeque,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, RwLock,
Expand Down Expand Up @@ -265,10 +264,7 @@ impl BundleStage {
);
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());

let unprocessed_bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(1_000),
VecDeque::with_capacity(1_000),
);
let unprocessed_bundle_storage = UnprocessedTransactionStorage::new_bundle_storage();

let reserved_ticks = poh_recorder
.read()
Expand Down
12 changes: 3 additions & 9 deletions core/src/bundle_stage/bundle_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ mod tests {
transaction_error_metrics::TransactionErrorMetrics,
},
std::{
collections::{HashSet, VecDeque},
collections::HashSet,
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -1091,10 +1091,7 @@ mod tests {

let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();

let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage();
let mut bundle_stage_leader_metrics = BundleStageLeaderMetrics::new(1);

let mut packet_bundles = make_random_overlapping_bundles(
Expand Down Expand Up @@ -1244,10 +1241,7 @@ mod tests {

let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();

let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage();
let mut bundle_stage_leader_metrics = BundleStageLeaderMetrics::new(1);
// MAIN LOGIC

Expand Down
96 changes: 45 additions & 51 deletions core/src/bundle_stage/bundle_packet_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ impl BundleReceiver {
mod tests {
use {
super::*,
crate::banking_stage::unprocessed_transaction_storage::BundleStorage,
crossbeam_channel::unbounded,
rand::{thread_rng, RngCore},
solana_bundle::{
Expand All @@ -182,7 +183,7 @@ mod tests {
system_transaction::transfer,
transaction::VersionedTransaction,
},
std::collections::{HashSet, VecDeque},
std::collections::HashSet,
};

/// Makes `num_bundles` random bundles with `num_packets_per_bundle` packets per bundle.
Expand Down Expand Up @@ -249,10 +250,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(1_000),
VecDeque::with_capacity(1_000),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -303,15 +301,18 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));

let bundles = make_random_bundles(&mint_keypair, 15, 2, genesis_config.hash());
// send 5 more than capacity
let bundles = make_random_bundles(
&mint_keypair,
BundleStorage::BUNDLE_STORAGE_CAPACITY + 5,
2,
genesis_config.hash(),
);

sender.send(bundles.clone()).unwrap();

Expand All @@ -325,9 +326,9 @@ mod tests {
assert!(result.is_ok());

let bundle_storage = unprocessed_storage.bundle_storage().unwrap();
// 15 bundles were sent, but the capacity is 10
assert_eq!(bundle_storage.unprocessed_bundles_len(), 10);
assert_eq!(bundle_storage.unprocessed_packets_len(), 20);
// 1005 bundles were sent, but the capacity is 1000
assert_eq!(bundle_storage.unprocessed_bundles_len(), 1000);
assert_eq!(bundle_storage.unprocessed_packets_len(), 2000);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 0);
assert_eq!(bundle_storage.cost_model_buffered_packets_len(), 0);

Expand All @@ -336,8 +337,11 @@ mod tests {
&mut bundle_stage_leader_metrics,
&HashSet::default(),
|bundles_to_process, _stats| {
// make sure the first 10 bundles are the ones to process
assert_bundles_same(&bundles[0..10], bundles_to_process);
// make sure the first 1000 bundles are the ones to process
assert_bundles_same(
&bundles[0..BundleStorage::BUNDLE_STORAGE_CAPACITY],
bundles_to_process,
);
(0..bundles_to_process.len()).map(|_| Ok(())).collect()
}
));
Expand All @@ -356,10 +360,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -426,10 +427,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -494,10 +492,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -545,10 +540,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -594,10 +586,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -639,10 +628,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -719,16 +705,18 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));

// send 15 bundles across the queue
let bundles0 = make_random_bundles(&mint_keypair, 5, 2, genesis_config.hash());
// send 500 bundles across the queue
let bundles0 = make_random_bundles(
&mint_keypair,
BundleStorage::BUNDLE_STORAGE_CAPACITY / 2,
2,
genesis_config.hash(),
);
sender.send(bundles0.clone()).unwrap();

let mut bundle_stage_stats = BundleStageLoopMetrics::default();
Expand All @@ -754,11 +742,16 @@ mod tests {
}
));
assert_eq!(bundle_storage.unprocessed_bundles_len(), 0);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 5);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 500);

let bundles1 = make_random_bundles(&mint_keypair, 5, 2, genesis_config.hash());
let bundles1 = make_random_bundles(
&mint_keypair,
BundleStorage::BUNDLE_STORAGE_CAPACITY / 2,
2,
genesis_config.hash(),
);
sender.send(bundles1.clone()).unwrap();
// should get 5 more bundles + cost model buffered length should be 10
// should get 500 more bundles, cost model buffered length should be 1000
let result = bundle_receiver.receive_and_buffer_bundles(
&mut unprocessed_storage,
&mut bundle_stage_stats,
Expand All @@ -778,9 +771,10 @@ mod tests {
}
));
assert_eq!(bundle_storage.unprocessed_bundles_len(), 0);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 10);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 1000); // full now

let bundles2 = make_random_bundles(&mint_keypair, 5, 2, genesis_config.hash());
// send 10 bundles to go over capacity
let bundles2 = make_random_bundles(&mint_keypair, 10, 2, genesis_config.hash());
sender.send(bundles2.clone()).unwrap();

// this set will get dropped from cost model buffered bundles
Expand All @@ -803,7 +797,7 @@ mod tests {
}
));
assert_eq!(bundle_storage.unprocessed_bundles_len(), 0);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 10);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 1000);

// create new bank then call process_bundles again, expect to see [bundles1,bundles2]
let bank = bank_forks.read().unwrap().working_bank();
Expand Down

0 comments on commit 7af7f84

Please sign in to comment.