Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JIT-1711] Compare the unprocessed transaction storage BundleStorage against a constant instead of VecDeque::capacity() #587

Merged
merged 6 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 30 additions & 25 deletions core/src/banking_stage/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,15 @@ impl UnprocessedTransactionStorage {
})
}

pub fn new_bundle_storage(
unprocessed_bundle_storage: VecDeque<ImmutableDeserializedBundle>,
cost_model_failed_bundles: VecDeque<ImmutableDeserializedBundle>,
) -> Self {
pub fn new_bundle_storage() -> Self {
Self::BundleStorage(BundleStorage {
last_update_slot: Slot::default(),
unprocessed_bundle_storage,
cost_model_buffered_bundle_storage: cost_model_failed_bundles,
unprocessed_bundle_storage: VecDeque::with_capacity(
BundleStorage::BUNDLE_STORAGE_CAPACITY,
),
cost_model_buffered_bundle_storage: VecDeque::with_capacity(
BundleStorage::BUNDLE_STORAGE_CAPACITY,
),
})
}

Expand Down Expand Up @@ -1104,6 +1105,7 @@ pub struct BundleStorage {
}

impl BundleStorage {
pub const BUNDLE_STORAGE_CAPACITY: usize = 1000;
fn is_empty(&self) -> bool {
self.unprocessed_bundle_storage.is_empty()
}
Expand Down Expand Up @@ -1152,24 +1154,27 @@ impl BundleStorage {
deserialized_bundles: Vec<ImmutableDeserializedBundle>,
push_back: bool,
) -> InsertPacketBundlesSummary {
let mut num_bundles_inserted: usize = 0;
let mut num_packets_inserted: usize = 0;
let mut num_bundles_dropped: usize = 0;
let mut num_packets_dropped: usize = 0;

for bundle in deserialized_bundles {
if deque.capacity() == deque.len() {
saturating_add_assign!(num_bundles_dropped, 1);
saturating_add_assign!(num_packets_dropped, bundle.len());
} else {
saturating_add_assign!(num_bundles_inserted, 1);
saturating_add_assign!(num_packets_inserted, bundle.len());
if push_back {
deque.push_back(bundle);
} else {
deque.push_front(bundle)
}
}
let deque_free_space = Self::BUNDLE_STORAGE_CAPACITY - deque.len();
esemeniuc marked this conversation as resolved.
Show resolved Hide resolved
let bundles_to_insert_count = std::cmp::min(deque_free_space, deserialized_bundles.len());
let num_bundles_dropped = deserialized_bundles.len() - bundles_to_insert_count;
esemeniuc marked this conversation as resolved.
Show resolved Hide resolved
let num_packets_inserted = deserialized_bundles
.iter()
.take(bundles_to_insert_count)
.map(|b| b.len())
.sum::<usize>();
let num_packets_dropped = deserialized_bundles
.iter()
.skip(bundles_to_insert_count)
.map(|b| b.len())
.sum::<usize>();

let to_insert = deserialized_bundles
.into_iter()
.take(bundles_to_insert_count);
if push_back {
deque.extend(to_insert)
} else {
to_insert.for_each(|b| deque.push_front(b));
}

InsertPacketBundlesSummary {
Expand All @@ -1178,7 +1183,7 @@ impl BundleStorage {
num_dropped_tracer_packets: 0,
}
.into(),
num_bundles_inserted,
num_bundles_inserted: bundles_to_insert_count,
num_packets_inserted,
num_bundles_dropped,
}
Expand Down
6 changes: 1 addition & 5 deletions core/src/bundle_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use {
solana_sdk::timing::AtomicInterval,
solana_vote::vote_sender_types::ReplayVoteSender,
std::{
collections::VecDeque,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, RwLock,
Expand Down Expand Up @@ -265,10 +264,7 @@ impl BundleStage {
);
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());

let unprocessed_bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(1_000),
VecDeque::with_capacity(1_000),
);
let unprocessed_bundle_storage = UnprocessedTransactionStorage::new_bundle_storage();

let reserved_ticks = poh_recorder
.read()
Expand Down
12 changes: 3 additions & 9 deletions core/src/bundle_stage/bundle_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ mod tests {
},
solana_streamer::socket::SocketAddrSpace,
std::{
collections::{HashSet, VecDeque},
collections::HashSet,
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -1089,10 +1089,7 @@ mod tests {

let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();

let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage();
let mut bundle_stage_leader_metrics = BundleStageLeaderMetrics::new(1);

let mut packet_bundles = make_random_overlapping_bundles(
Expand Down Expand Up @@ -1242,10 +1239,7 @@ mod tests {

let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();

let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut bundle_storage = UnprocessedTransactionStorage::new_bundle_storage();
let mut bundle_stage_leader_metrics = BundleStageLeaderMetrics::new(1);
// MAIN LOGIC

Expand Down
96 changes: 45 additions & 51 deletions core/src/bundle_stage/bundle_packet_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ impl BundleReceiver {
mod tests {
use {
super::*,
crate::banking_stage::unprocessed_transaction_storage::BundleStorage,
crossbeam_channel::unbounded,
rand::{thread_rng, RngCore},
solana_bundle::{
Expand All @@ -182,7 +183,7 @@ mod tests {
system_transaction::transfer,
transaction::VersionedTransaction,
},
std::collections::{HashSet, VecDeque},
std::collections::HashSet,
};

/// Makes `num_bundles` random bundles with `num_packets_per_bundle` packets per bundle.
Expand Down Expand Up @@ -249,10 +250,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(1_000),
VecDeque::with_capacity(1_000),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -303,15 +301,18 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));

let bundles = make_random_bundles(&mint_keypair, 15, 2, genesis_config.hash());
// send 5 more than capacity
let bundles = make_random_bundles(
&mint_keypair,
BundleStorage::BUNDLE_STORAGE_CAPACITY + 5,
2,
genesis_config.hash(),
);

sender.send(bundles.clone()).unwrap();

Expand All @@ -325,9 +326,9 @@ mod tests {
assert!(result.is_ok());

let bundle_storage = unprocessed_storage.bundle_storage().unwrap();
// 15 bundles were sent, but the capacity is 10
assert_eq!(bundle_storage.unprocessed_bundles_len(), 10);
assert_eq!(bundle_storage.unprocessed_packets_len(), 20);
// 1005 bundles were sent, but the capacity is 1000
assert_eq!(bundle_storage.unprocessed_bundles_len(), 1000);
assert_eq!(bundle_storage.unprocessed_packets_len(), 2000);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 0);
assert_eq!(bundle_storage.cost_model_buffered_packets_len(), 0);

Expand All @@ -336,8 +337,11 @@ mod tests {
&mut bundle_stage_leader_metrics,
&HashSet::default(),
|bundles_to_process, _stats| {
// make sure the first 10 bundles are the ones to process
assert_bundles_same(&bundles[0..10], bundles_to_process);
// make sure the first 1000 bundles are the ones to process
assert_bundles_same(
&bundles[0..BundleStorage::BUNDLE_STORAGE_CAPACITY],
bundles_to_process,
);
(0..bundles_to_process.len()).map(|_| Ok(())).collect()
}
));
Expand All @@ -356,10 +360,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -426,10 +427,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -494,10 +492,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -545,10 +540,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -594,10 +586,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -639,10 +628,7 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));
Expand Down Expand Up @@ -719,16 +705,18 @@ mod tests {
} = create_genesis_config(10_000);
let (_, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);

let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage(
VecDeque::with_capacity(10),
VecDeque::with_capacity(10),
);
let mut unprocessed_storage = UnprocessedTransactionStorage::new_bundle_storage();

let (sender, receiver) = unbounded();
let mut bundle_receiver = BundleReceiver::new(0, receiver, bank_forks.clone(), Some(5));

// send 15 bundles across the queue
let bundles0 = make_random_bundles(&mint_keypair, 5, 2, genesis_config.hash());
// send 500 bundles across the queue
let bundles0 = make_random_bundles(
&mint_keypair,
BundleStorage::BUNDLE_STORAGE_CAPACITY / 2,
2,
genesis_config.hash(),
);
sender.send(bundles0.clone()).unwrap();

let mut bundle_stage_stats = BundleStageLoopMetrics::default();
Expand All @@ -754,11 +742,16 @@ mod tests {
}
));
assert_eq!(bundle_storage.unprocessed_bundles_len(), 0);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 5);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 500);

let bundles1 = make_random_bundles(&mint_keypair, 5, 2, genesis_config.hash());
let bundles1 = make_random_bundles(
&mint_keypair,
BundleStorage::BUNDLE_STORAGE_CAPACITY / 2,
2,
genesis_config.hash(),
);
sender.send(bundles1.clone()).unwrap();
// should get 5 more bundles + cost model buffered length should be 10
// should get 500 more bundles, cost model buffered length should be 1000
let result = bundle_receiver.receive_and_buffer_bundles(
&mut unprocessed_storage,
&mut bundle_stage_stats,
Expand All @@ -778,9 +771,10 @@ mod tests {
}
));
assert_eq!(bundle_storage.unprocessed_bundles_len(), 0);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 10);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 1000); // full now

let bundles2 = make_random_bundles(&mint_keypair, 5, 2, genesis_config.hash());
// send 10 bundles to go over capacity
let bundles2 = make_random_bundles(&mint_keypair, 10, 2, genesis_config.hash());
sender.send(bundles2.clone()).unwrap();

// this set will get dropped from cost model buffered bundles
Expand All @@ -803,7 +797,7 @@ mod tests {
}
));
assert_eq!(bundle_storage.unprocessed_bundles_len(), 0);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 10);
assert_eq!(bundle_storage.cost_model_buffered_bundles_len(), 1000);

// create new bank then call process_bundles again, expect to see [bundles1,bundles2]
let bank = bank_forks.read().unwrap().working_bank();
Expand Down
Loading