diff --git a/Cargo.lock b/Cargo.lock index 1cc73618c2747e..5d675c836da4f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2510,6 +2510,12 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "min-max-heap" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2687e6cf9c00f48e9284cf9fd15f2ef341d03cc7743abf9df4c5f07fdee50b18" + [[package]] name = "minimal-lexical" version = "0.1.4" @@ -4753,6 +4759,7 @@ dependencies = [ "log", "lru", "matches", + "min-max-heap", "rand 0.7.3", "rand_chacha 0.2.2", "raptorq", diff --git a/core/Cargo.toml b/core/Cargo.toml index 423bfb87792948..35d9b5e33b04ce 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -27,6 +27,7 @@ histogram = "0.6.9" itertools = "0.10.3" log = "0.4.14" lru = "0.7.5" +min-max-heap = "1.3.0" rand = "0.7.0" rand_chacha = "0.2.2" rayon = "1.5.1" @@ -96,5 +97,8 @@ name = "sigverify_stage" [[bench]] name = "retransmit_stage" +[[bench]] +name = "unprocessed_packet_batches" + [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/core/benches/unprocessed_packet_batches.rs b/core/benches/unprocessed_packet_batches.rs new file mode 100644 index 00000000000000..05c0b2458c2592 --- /dev/null +++ b/core/benches/unprocessed_packet_batches.rs @@ -0,0 +1,195 @@ +#![allow(clippy::integer_arithmetic)] +#![feature(test)] + +extern crate test; + +use { + min_max_heap::MinMaxHeap, + rand::{ + distributions::{Distribution, Uniform}, + seq::SliceRandom, + }, + solana_core::{ + unprocessed_packet_batches::*, + }, + solana_measure::measure::Measure, + solana_perf::packet::{Packet, PacketBatch}, + solana_sdk::{ + hash::Hash, + signature::Keypair, + system_transaction, + }, + test::Bencher, +}; + +fn build_packet_batch( + packet_per_batch_count: usize, + ) -> (PacketBatch, Vec) { + let packet_batch = PacketBatch::new( + (0..packet_per_batch_count) + .map(|sender_stake| { + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let mut packet = Packet::from_data(None, &tx).unwrap(); + packet.meta.sender_stake = sender_stake as u64; + packet + }) + .collect() + ); + let packet_indexes: Vec = (0..packet_per_batch_count).collect(); + + (packet_batch, packet_indexes) +} + +fn build_randomized_packet_batch( + packet_per_batch_count: usize, + ) -> (PacketBatch, Vec) { + let mut rng = rand::thread_rng(); + let distribution = Uniform::from(0..200_000); + + let packet_batch = PacketBatch::new( + (0..packet_per_batch_count) + .map(|_| { + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let mut packet = Packet::from_data(None, &tx).unwrap(); + let sender_stake = distribution.sample(&mut rng); + packet.meta.sender_stake = sender_stake as u64; + packet + }) + .collect() + ); + let packet_indexes: Vec = (0..packet_per_batch_count).collect(); + + (packet_batch, packet_indexes) +} + +fn insert_packet_batches( + buffer_max_size: usize, + batch_count: usize, + packet_per_batch_count: usize, + randomize: bool, +){ + solana_logger::setup(); + let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(buffer_max_size); + let mut index = PacketIndex::with_capacity(buffer_max_size * packet_per_batch_count); + + let mut timer = Measure::start("insert_batch"); + (0..batch_count).for_each(|_| { + let (packet_batch, packet_indexes) = if randomize { + build_randomized_packet_batch(packet_per_batch_count) + } else { + build_packet_batch(packet_per_batch_count) + }; + let batch = DeserializedPacketBatch::new(&mut index, packet_batch, packet_indexes, false); + unprocessed_packet_batches.insert_batch( + &mut index, + batch, + buffer_max_size, + ); + }); + timer.stop(); + log::info!("inserted {} batch, elapsed {}", buffer_max_size, timer.as_us()); +} + +//* +// v1, bench: 5,600,038,163 ns/iter (+/- 940,818,988) +// v2, bench: 5,265,382,750 ns/iter (+/- 153,623,264) +#[bench] +fn bench_unprocessed_packet_batches_within_limit(bencher: &mut Bencher) { + let buffer_capacity = 1_000; + let batch_count = 1_000; + let packet_per_batch_count = 128; + + bencher.iter(|| { + insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, false); + }); +} + +// v1, bench: 6,607,014,940 ns/iter (+/- 768,191,361) +// v2, bench: 5,692,753,323 ns/iter (+/- 548,959,624) +#[bench] +fn bench_unprocessed_packet_batches_beyond_limit(bencher: &mut Bencher) { + let buffer_capacity = 1_000; + let batch_count = 1_100; + let packet_per_batch_count = 128; + + // this is the worst scenario testing: all batches are uniformly populated with packets from + // priority 100..228, so in order to drop a batch, algo will have to drop all packets that has + // priority < 228, plus one 228. That's 2000 batch * 127 packets + 1 + // Also, since all batches have same stake distribution, the new one is always the one got + // dropped. Tho it does not change algo complexity. + bencher.iter(|| { + insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, false); + }); +} +// */ + +// v1, bench: 5,843,307,086 ns/iter (+/- 844,249,298) +// v2, bench: 5,139,525,951 ns/iter (+/- 48,005,521) +#[bench] +fn bench_unprocessed_packet_batches_randomized_within_limit(bencher: &mut Bencher) { + let buffer_capacity = 1_000; + let batch_count = 1_000; + let packet_per_batch_count = 128; + + bencher.iter(|| { + insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, true); + }); +} + +// v1, bench: 6,497,623,849 ns/iter (+/- 3,206,382,212) +// v2, bench: 5,762,071,682 ns/iter (+/- 168,244,418) +#[bench] +fn bench_unprocessed_packet_batches_randomized_beyond_limit(bencher: &mut Bencher) { + let buffer_capacity = 1_000; + let batch_count = 1_100; + let packet_per_batch_count = 128; + + bencher.iter(|| { + insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, true); + }); +} + +/* +// bench: 125,923 ns/iter (+/- 12,539) +#[bench] +fn bench_unprocessed_packet_batches_vector(bencher: &mut Bencher) { + let buffer_max_size = 100_000; + let mut unprocessed_packet_batches = vec![]; + + bencher.iter(|| { + for i in 0..buffer_max_size { + unprocessed_packet_batches.push(i); + } + + unprocessed_packet_batches.clear(); + }); +} + +// bench: 2,264,168 ns/iter (+/- 61,353) +#[bench] +fn bench_unprocessed_packet_batches_min_max_heap(bencher: &mut Bencher) { + let buffer_max_size = 100_000; + let mut unprocessed_packet_batches = MinMaxHeap::default(); + let mut weights: Vec = (0..buffer_max_size).collect(); + weights.shuffle(&mut rand::thread_rng()); + + bencher.iter(|| { + // min_max_heap has O(log n) for insertion + for weight in &weights { + unprocessed_packet_batches.push(weight); + } + + unprocessed_packet_batches.clear(); + }); +} +// */ diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 14aabf33da5a78..8ca892cd8e61f1 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -58,6 +58,9 @@ use { collect_token_balances, TransactionTokenBalancesSet, }, std::{ + cell::RefCell, + rc::{Rc, Weak}, + cmp, collections::HashMap, env, @@ -467,9 +470,12 @@ impl BankingStage { } fn filter_valid_packets_for_forwarding<'a>( - packet_batches: impl Iterator, + packet_batches: impl Iterator>>, ) -> Vec<&'a Packet> { + vec![] + /* TODO - re-impl packet_batches + .map(|deserialized_packet_batch| deserialized_packet_batch.borrow() ) .filter(|deserialized_packet_batch| !deserialized_packet_batch.forwarded) .flat_map(|deserialized_packet_batch| { deserialized_packet_batch @@ -478,6 +484,7 @@ impl BankingStage { .map(|(index, _)| &deserialized_packet_batch.packet_batch.packets[*index]) }) .collect() + // */ } /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns @@ -941,10 +948,10 @@ impl BankingStage { if hold { buffered_packet_batches.retain(|deserialized_packet_batch| { - !deserialized_packet_batch.unprocessed_packets.is_empty() + !deserialized_packet_batch.borrow().unprocessed_packets.is_empty() }); for deserialized_packet_batch in buffered_packet_batches.iter_mut() { - deserialized_packet_batch.forwarded = true; + deserialized_packet_batch.borrow_mut().forwarded = true; } } else { slot_metrics_tracker @@ -969,6 +976,7 @@ impl BankingStage { ) { let recorder = poh_recorder.lock().unwrap().recorder(); let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit); + let mut buffered_packet_index = PacketIndex::with_capacity(batch_limit * MAX_NUM_TRANSACTIONS_PER_BATCH); let mut banking_stage_stats = BankingStageStats::new(id); let qos_service = QosService::new(cost_model, id); let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); @@ -1041,6 +1049,7 @@ impl BankingStage { id, batch_limit, &mut buffered_packet_batches, + &mut buffered_packet_index, &mut banking_stage_stats, &mut slot_metrics_tracker, ) @@ -1927,6 +1936,7 @@ impl BankingStage { id: u32, batch_limit: usize, buffered_packet_batches: &mut UnprocessedPacketBatches, + buffered_packet_index: &mut PacketIndex, banking_stage_stats: &mut BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> Result<(), RecvTimeoutError> { @@ -1962,6 +1972,7 @@ impl BankingStage { Self::push_unprocessed( buffered_packet_batches, + buffered_packet_index, packet_batch, packet_indexes, &mut dropped_packet_batches_count, @@ -2004,7 +2015,7 @@ impl BankingStage { buffered_packet_batches .iter() .map(|deserialized_packet_batch| { - deserialized_packet_batch.unprocessed_packets.len() + deserialized_packet_batch.borrow().unprocessed_packets.len() }) .sum(), Ordering::Relaxed, @@ -2015,6 +2026,7 @@ impl BankingStage { fn push_unprocessed( unprocessed_packet_batches: &mut UnprocessedPacketBatches, + buffered_packet_index: &mut PacketIndex, packet_batch: PacketBatch, packet_indexes: Vec, dropped_packet_batches_count: &mut usize, @@ -2025,15 +2037,6 @@ impl BankingStage { slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { if !packet_indexes.is_empty() { - if unprocessed_packet_batches.len() >= batch_limit { - *dropped_packet_batches_count += 1; - if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() { - *dropped_packets_count += dropped_batch.unprocessed_packets.len(); - slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count( - dropped_batch.unprocessed_packets.len() as u64, - ); - } - } let _ = banking_stage_stats .batch_packet_indexes_len .increment(packet_indexes.len() as u64); @@ -2042,11 +2045,23 @@ impl BankingStage { slot_metrics_tracker .increment_newly_buffered_packets_count(packet_indexes.len() as u64); - unprocessed_packet_batches.push_back(DeserializedPacketBatch::new( - packet_batch, - packet_indexes, - false, - )); + let deserialized_packet_batch = DeserializedPacketBatch::new(buffered_packet_index, packet_batch, packet_indexes, false); + let (number_of_dropped_batches, number_of_dropped_packets) = unprocessed_packet_batches + .insert_batch( + buffered_packet_index, + deserialized_packet_batch, + batch_limit, + ); + + if let Some(number_of_dropped_batches) = number_of_dropped_batches { + saturating_add_assign!(*dropped_packet_batches_count, number_of_dropped_batches); + } + if let Some(number_of_dropped_packets) = number_of_dropped_packets { + saturating_add_assign!(*dropped_packets_count, number_of_dropped_packets); + slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count( + number_of_dropped_packets as u64, + ); + } } } @@ -4148,8 +4163,9 @@ mod tests { 1, Hash::new_unique(), ); - let packet = Packet::from_data(None, &tx).unwrap(); - let new_packet_batch = PacketBatch::new(vec![packet; 2]); + let mut heavy_packet = Packet::from_data(None, &tx).unwrap(); + heavy_packet.meta.sender_stake = 1_000_000u64; + let new_packet_batch = PacketBatch::new(vec![heavy_packet; 2]); let mut unprocessed_packets: UnprocessedPacketBatches = vec![DeserializedPacketBatch::new( new_packet_batch, vec![0, 1], @@ -4163,7 +4179,10 @@ mod tests { // Set the limit to 2 let batch_limit = 2; // Create new unprocessed packets and add to a batch - let new_packet_batch = PacketBatch::new(vec![Packet::default()]); + let mut light_packet = + Packet::from_data(Some(&SocketAddr::from(([10, 10, 10, 1], 9001))), &tx).unwrap(); + light_packet.meta.sender_stake = 1u64; + let new_packet_batch = PacketBatch::new(vec![light_packet; 3]); let packet_indexes = vec![]; let mut dropped_packet_batches_count = 0; @@ -4190,11 +4209,11 @@ mod tests { // Because the set of unprocessed `packet_indexes` is non-empty, the // packets are added to the unprocessed queue - let packet_indexes = vec![0]; + let packet_indexes = vec![0, 1, 2]; BankingStage::push_unprocessed( &mut unprocessed_packets, new_packet_batch, - packet_indexes.clone(), + packet_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, &mut newly_buffered_packets_count, @@ -4205,15 +4224,15 @@ mod tests { assert_eq!(unprocessed_packets.len(), 2); assert_eq!(dropped_packet_batches_count, 0); assert_eq!(dropped_packets_count, 0); - assert_eq!(newly_buffered_packets_count, 1); + assert_eq!(newly_buffered_packets_count, 3); - // Because we've reached the batch limit, old unprocessed packets are + // Because we've reached the batch limit, old light_packets are // dropped and the new one is appended to the end - let new_packet_batch = PacketBatch::new(vec![Packet::from_data( - Some(&SocketAddr::from(([127, 0, 0, 1], 8001))), - 42, - ) - .unwrap()]); + let mut also_heavy_packet = + Packet::from_data(Some(&SocketAddr::from(([127, 0, 0, 1], 8001))), &tx).unwrap(); + also_heavy_packet.meta.sender_stake = 100_000_000u64; + let new_packet_batch = PacketBatch::new(vec![also_heavy_packet]); + let packet_indexes = vec![0]; assert_eq!(unprocessed_packets.len(), batch_limit); BankingStage::push_unprocessed( &mut unprocessed_packets, @@ -4232,8 +4251,8 @@ mod tests { new_packet_batch.packets[0] ); assert_eq!(dropped_packet_batches_count, 1); - assert_eq!(dropped_packets_count, 2); - assert_eq!(newly_buffered_packets_count, 2); + assert_eq!(dropped_packets_count, 3); + assert_eq!(newly_buffered_packets_count, 4); } #[cfg(test)] diff --git a/core/src/lib.rs b/core/src/lib.rs index abc843bbd90b4c..fa231fb85a6a7b 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -40,6 +40,7 @@ pub mod packet_threshold; pub mod poh_timing_report_service; pub mod poh_timing_reporter; pub mod progress_map; +pub mod priority_flat_index; pub mod qos_service; pub mod repair_generic_traversal; pub mod repair_response; diff --git a/core/src/priority_flat_index.rs b/core/src/priority_flat_index.rs new file mode 100644 index 00000000000000..c4e31772d99f1a --- /dev/null +++ b/core/src/priority_flat_index.rs @@ -0,0 +1,259 @@ +use { + min_max_heap::MinMaxHeap, + std::{ + cell::RefCell, + collections::{HashMap, VecDeque}, + cmp::Ordering, + rc::{Rc, Weak}, + }, + rand::{ + distributions::{Distribution, Uniform}, + }, +}; + +/// storage is a nested struct, priority_flat_index flats out the underlying object, index by its +/// priority +/// +/// 1. Buffer is operated at Batch level, eg insert_batch, remove_batch ... +/// 2. Prioritization is operated on packet level, by packet.priority +#[derive(Default)] +pub struct Buffer(VecDeque>>); + +/// index lives outside of buffer for now +pub type Index = MinMaxHeap>; + +/// Batch is essentially a collection of Packet +#[derive(Debug, Default)] +pub struct Batch { + packets: HashMap>, // batch owns packet strongly +} + +/// Packet has week ref to its owner +#[derive(Debug, Default)] +pub struct Packet { + priority: u64, + index: usize, // same usize used in HashMap key in batch + owner: Weak>, // packet ref to batch weakly +} + +/// MinMaxHeap needs Ord for Packet +impl Ord for Packet { + fn cmp(&self, other: &Self) -> Ordering { + self.priority.cmp(&other.priority) + } +} + +impl PartialOrd for Packet { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for Packet { + fn eq(&self, other: &Self) -> bool { + self.priority == other.priority + } +} + +impl Eq for Packet {} + +impl std::ops::Deref for Buffer { + type Target = VecDeque>>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for Buffer { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Buffer { + pub fn with_capacity(capacity: usize) -> Self { + Buffer(VecDeque::with_capacity(capacity)) + } + + /// Pushing batch into buffer then drop excessive batches if needed; + /// This sequence allows new batch being evaluated together with existing + /// batches when decide which one to drop, ensures all remaining packets + /// are have equal or higher priority than those dropped. + pub fn insert_batch( + &mut self, + index: &mut Index, + batch_limit: usize, + batch: Rc>, + ) { + if batch.borrow().packets.is_empty() { + return; + } + + self.push_back(batch); + + let num_batches_to_remove = self.len().saturating_sub(batch_limit); + if num_batches_to_remove > 0 { + self.remove_batches_by_priority(index, num_batches_to_remove); + } + + // NOTE: push_back() plus remove() are more expensive than swap_remove_back() + // However, VecDeque now hold `Rc` instead of `Batch` itself, it shouldn't too + // bad. + } + + /// TODO this should be Batch's associate function. + /// make_batch implements the inner relationship between batch <--> packets. + pub fn make_batch( + index: &mut Index, + // raw inputs, would be PacketBatch in real life + packet_per_batch_count: usize, + random_priority: bool, + ) -> Rc> { + let mut rng = rand::thread_rng(); + let distribution = Uniform::from(0..200_000); + + let batch = Rc::new(RefCell::new(Batch::default())); + (*batch.borrow_mut()).packets = + (0..packet_per_batch_count).map(|m| { + let priority = if random_priority { + distribution.sample(&mut rng) + } + else { + m as u64 + }; + let packet = Rc::new(Packet { + index: m, + priority, + owner: Rc::downgrade(&batch.clone()), + }); + // update index on insertion + index.push(Rc::clone(&packet)); + (packet.index, packet) + }) + .collect(); + batch + } + + /// Utilizing existing priority packet index to efficiently drop low priority packets. + /// Compare to other approach, at the time of drop batch, it does not need to do: + /// 1. Scan and index buffer -- it is eagerly prepared at batch insertion; + /// 2. Lookup batch to remove low priority packet from its unprocessed list. + /// 3. Also added a option to drop multiple batches at a time to further improve efficiency. + fn remove_batches_by_priority( + &mut self, + index: &mut Index, + num_batches_to_remove: usize, + ) { + let mut removed_batch_count = 0; + while let Some(pkt) = index.pop_min() { + debug!("popped min from index: {:?}", pkt); + + // index yields ref to min priority packet, using packet.owner to reference to + // batch, then remove the packet from batch's unprocessed list + let batch = pkt.owner.upgrade().unwrap(); + let _popped_packet = batch.borrow_mut().packets.remove(&pkt.index).unwrap(); + // be more efficient to remove multiple batches at one go + if batch.borrow().packets.is_empty() { + removed_batch_count += 1; + if removed_batch_count >= num_batches_to_remove { + break; + } + } + } + // still need to iterate through VecDeque buffer to remove empty batches + self.retain(|batch| { + !batch.borrow().packets.is_empty() + }); + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + }; + + #[test] + fn test_priority_flat_index_make_batch() { + solana_logger::setup(); + + // create one index per buffer + let mut index = Index::default(); + + let num_packets = 10; + // batch needs to be referenced by many of its packets, so need to be Rc<> + // batch needs to be mutable after deref from packet, so Rc> + let batch = Buffer::make_batch(&mut index, num_packets, true); + + assert_eq!(num_packets, batch.borrow().packets.len()); + assert_eq!(num_packets, index.len()); + + let mut expected_pkt_count = num_packets; + // arbitrary order + for pkt in index.iter() { + debug!("checking {:?}", pkt); + + // assert getting owner from child + let batch = pkt.owner.upgrade().unwrap(); + assert_eq!(expected_pkt_count, batch.borrow().packets.len()); + // assert parent/child relationship + assert!(batch.borrow().packets.contains_key(&pkt.index)); + // assert can do mut op on owner + { + // directly remove packet from batch saves one batch [index] op, plus packet O(n) + // lookup. + let popped_packet = batch.borrow_mut().packets.remove(&pkt.index).unwrap(); + assert_eq!(2, Rc::strong_count(&popped_packet)); + } + assert_eq!(1, Rc::strong_count(&pkt)); + expected_pkt_count -= 1; + assert_eq!(expected_pkt_count, batch.borrow().packets.len()); + } + assert!(batch.borrow().packets.is_empty()); + } + + #[test] + fn test_priority_flat_index_insert_batch() { + solana_logger::setup(); + let buffer_capacity = 4; + let batch_count = 7; + let packet_per_batch_count = 3; + + // initialize buffer and index + let mut buffer = Buffer::with_capacity(buffer_capacity); + let mut index = Index::with_capacity(buffer_capacity * packet_per_batch_count); + + // build Batch from provided input data, update index, then insert batch to buffer; + // if batch_count > buffer_capacity, low priority packets will be dropped until + // batch(es) are removed. + (0..batch_count).for_each(|_| { + let batch = Buffer::make_batch(&mut index, packet_per_batch_count, false); + buffer.insert_batch( + &mut index, + buffer_capacity, + batch, + ); + }); + + // assert that buffer is full, has `buffer_capacity` packets in buffer and index. + // The reason is since each batch as priority {0, 1, 2}, when the first batch is dropped, + // all `0` and `1` packets would have been dropped first. + let expected_packets_count = buffer_capacity; + assert_eq!(expected_packets_count, index.len()); + assert_eq!(buffer_capacity, buffer.len()); + let packet_count: usize = buffer.iter().map(|x| x.borrow().packets.len()).sum(); + assert_eq!(expected_packets_count, packet_count); + + // assert what's left in buffer are abiding the priority rule. Since batch in + // buffer has packet priority as (0, 1, 2), after buffer is saturated, only packets + // left in buffer should be priority `2`. + let expected_priority = 2; + buffer.iter().for_each(|batch| { + let packets = &batch.borrow().packets; + assert_eq!(1, packets.len()); + assert!(packets.contains_key(&expected_priority)); + }); + } +} + + diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 87af6f04450ce5..a71c6964a936d1 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -1,16 +1,68 @@ use { + min_max_heap::MinMaxHeap, retain_mut::RetainMut, + solana_gossip::weighted_shuffle::WeightedShuffle, solana_perf::packet::{limited_deserialize, Packet, PacketBatch}, + solana_runtime::bank::Bank, solana_sdk::{ - hash::Hash, message::Message, short_vec::decode_shortu16_len, signature::Signature, - transaction::VersionedTransaction, + clock::Slot, + feature_set::tx_wide_compute_cap, + hash::Hash, + message::{ + v0::{self}, + Message, SanitizedMessage, VersionedMessage, + }, + sanitize::Sanitize, + saturating_add_assign, + short_vec::decode_shortu16_len, + signature::Signature, + transaction::{AddressLoader, VersionedTransaction}, }, std::{ - collections::{HashMap, VecDeque}, + cell::RefCell, + cmp::Ordering, + collections::{BTreeMap, HashMap, VecDeque}, + rc::{Rc, Weak}, mem::size_of, + sync::Arc, }, }; +/// FeePerCu is valid by up to X slots +#[derive(Debug, Default)] +struct FeePerCu { + fee_per_cu: u64, + slot: Slot, +} + +impl FeePerCu { + fn too_old(&self, slot: &Slot) -> bool { + const MAX_SLOT_AGE: Slot = 1; + slot - self.slot >= MAX_SLOT_AGE + } +} + +impl Ord for FeePerCu { + fn cmp(&self, other: &Self) -> Ordering { + self.fee_per_cu.cmp(&other.fee_per_cu) + } +} + +impl PartialOrd for FeePerCu { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for FeePerCu { + fn eq(&self, other: &Self) -> bool { + self.fee_per_cu == other.fee_per_cu + } +} + +impl Eq for FeePerCu {} + + /// Holds deserialized messages, as well as computed message_hash and other things needed to create /// SanitizedTransaction #[derive(Debug, Default)] @@ -23,8 +75,66 @@ pub struct DeserializedPacket { #[allow(dead_code)] is_simple_vote: bool, + + // priority by fee/CU and sedner_stake + fee_per_cu: Option, + sender_stake: u64, + + // index of packet in PacketBatch + packet_index: usize, + + // ref to batch that has thispacket + owner: Weak>, } +/// Impl Ord for DeserializedPacket for MinMaxHeap +impl Ord for DeserializedPacket { + fn cmp(&self, other: &Self) -> Ordering { + let self_has_fee = self.fee_per_cu.is_some(); + let other_has_fee = other.fee_per_cu.is_some(); + + if self_has_fee && !other_has_fee { + Ordering::Greater + } else if !self_has_fee && other_has_fee { + Ordering::Less + } else if self_has_fee && other_has_fee { + self.fee_per_cu.as_ref().unwrap().cmp(&other.fee_per_cu.as_ref().unwrap()) + .then(self.sender_stake.cmp(&other.sender_stake)) + } else { + // !self_has_fee && !other_has_fee + self.sender_stake.cmp(&other.sender_stake) + } + } +} + +impl PartialOrd for DeserializedPacket { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for DeserializedPacket { + fn eq(&self, other: &Self) -> bool { + let self_has_fee = self.fee_per_cu.is_some(); + let other_has_fee = other.fee_per_cu.is_some(); + + if self_has_fee && !other_has_fee { + false + } else if !self_has_fee && other_has_fee { + false + } else if self_has_fee && other_has_fee { + self.fee_per_cu.as_ref().unwrap() == other.fee_per_cu.as_ref().unwrap() && + self.sender_stake == other.sender_stake + } else { + // !self_has_fee && !other_has_fee + self.sender_stake == other.sender_stake + } + } +} + +impl Eq for DeserializedPacket {} + + /// Defines the type of entry in `UnprocessedPacketBatches`, it holds original packet_batch /// for forwarding, as well as `forwarded` flag; /// Each packet in packet_batch are deserialized upon receiving, the result are stored in @@ -34,13 +144,14 @@ pub struct DeserializedPacketBatch { pub packet_batch: PacketBatch, pub forwarded: bool, // indexes of valid packets in batch, and their corresponding deserialized_packet - pub unprocessed_packets: HashMap, + pub unprocessed_packets: HashMap>, } +// TODO TAO - may not need Locator anymore /// References to a packet in `UnprocessedPacketBatches`, where /// - batch_index references to `DeserializedPacketBatch`, /// - packet_index references to `packet` within `DeserializedPacketBatch.packet_batch` -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq)] pub struct PacketLocator { #[allow(dead_code)] batch_index: usize, @@ -49,13 +160,17 @@ pub struct PacketLocator { } /// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store -/// PacketBatch's received from sigverify. Banking thread continuously scans the buffer -/// to pick proper packets to add to the block. +/// PacketBatch's received from sigverify. When a packetBatch is added, the PacketIndex +/// that sorts each packet by priority is updated. #[derive(Default)] -pub struct UnprocessedPacketBatches(VecDeque); +pub struct UnprocessedPacketBatches(VecDeque>>); + +// TODO TAO - index lives outside `UnprocessedPacketBatches` for now, can make as field +/// PacketIndex sorts individual packet by its priority +pub type PacketIndex = MinMaxHeap>; impl std::ops::Deref for UnprocessedPacketBatches { - type Target = VecDeque; + type Target = VecDeque>>; fn deref(&self) -> &Self::Target { &self.0 } @@ -68,16 +183,16 @@ impl std::ops::DerefMut for UnprocessedPacketBatches { } impl RetainMut for UnprocessedPacketBatches { - fn retain_mut(&mut self, f: F) + fn retain_mut(&mut self, mut f: F) where F: FnMut(&mut DeserializedPacketBatch) -> bool, { - RetainMut::retain_mut(&mut self.0, f); + RetainMut::retain_mut(&mut self.0, |entry| f(&mut entry.borrow_mut())); } } -impl FromIterator for UnprocessedPacketBatches { - fn from_iter>(iter: I) -> Self { +impl FromIterator>> for UnprocessedPacketBatches { + fn from_iter>>>(iter: I) -> Self { Self(iter.into_iter().collect()) } } @@ -91,11 +206,97 @@ impl UnprocessedPacketBatches { UnprocessedPacketBatches(VecDeque::with_capacity(capacity)) } + /// Insert new `deserizlized_packet_batch` into inner `VecDeque`, + /// If buffer is at Max limit, packets (not packet_batch) will be dropped, starting from + /// the lowest priority until an empty batch is located, then swap it with new batch; + /// Otherwise, new_batch will be pushed into the end of VecDeque; + /// returns tuple of (number of batch dropped, number of packets dropped) + pub fn insert_batch( + &mut self, + packet_index: &mut PacketIndex, + deserialized_packet_batch: Rc>, + batch_limit: usize, + ) -> (Option, Option) { + if deserialized_packet_batch.borrow().unprocessed_packets.is_empty() { + return (None, None); + } + + self.push_back(deserialized_packet_batch); + let num_batches_to_remove = self.len().saturating_sub(batch_limit); + if num_batches_to_remove > 0 { + self.remove_batches_by_priority(packet_index, num_batches_to_remove) + } + else { + (None, None) + } +/* + if self.len() >= batch_limit { + let (_, dropped_batches_count, dropped_packets_count) = + self.replace_packet_by_priority(deserialized_packet_batch); + (dropped_batches_count, dropped_packets_count) + } else { + self.push_back(deserialized_packet_batch); + (None, None) + } +// */ + } + + /// Utilizing existing priority packet index to efficiently drop low priority packets. + /// Compare to other approach, at the time of drop batch, it does not need to do: + /// 1. Scan and index buffer -- it is eagerly prepared at batch insertion; + /// 2. Lookup batch to remove low priority packet from its unprocessed list. + /// 3. Also added a option to drop multiple batches at a time to further improve efficiency. + fn remove_batches_by_priority( + &mut self, + index: &mut PacketIndex, + num_batches_to_remove: usize, + ) -> (Option, Option) { + let mut removed_packet_count = 0usize; + let mut removed_batch_count = 0usize; + while let Some(pkt) = index.pop_min() { + debug!("popped min from index: {:?}", pkt); + + // index yields ref to min priority packet, using packet.owner to reference to + // batch, then remove the packet from batch's unprocessed list + let batch = pkt.owner.upgrade().unwrap(); + let _popped_packet = batch.borrow_mut().unprocessed_packets.remove(&pkt.packet_index).unwrap(); + saturating_add_assign!(removed_packet_count, 1); + + // be more efficient to remove multiple batches at one go + if batch.borrow().unprocessed_packets.is_empty() { + saturating_add_assign!(removed_batch_count, 1); + if removed_batch_count >= num_batches_to_remove { + break; + } + } + } + // still need to iterate through VecDeque buffer to remove empty batches + self.retain(|batch| { + !batch.borrow().unprocessed_packets.is_empty() + }); + + (Some(removed_packet_count), Some(removed_batch_count)) + } + +/* + /// prioritize unprocessed packets by their fee/CU then by sender's stakes + pub fn prioritize_by_fee_then_stakes( + &mut self, + working_bank: Option>, + ) -> Vec { + let (stakes, locators) = self.get_stakes_and_locators(); + + let shuffled_packet_locators = Self::weighted_shuffle(&stakes, &locators); + + self.prioritize_by_fee_per_cu(&shuffled_packet_locators, working_bank) + } +// */ + /// Returns total number of all packets (including unprocessed and processed) in buffer #[allow(dead_code)] fn get_packets_count(&self) -> usize { self.iter() - .map(|deserialized_packet_batch| deserialized_packet_batch.packet_batch.packets.len()) + .map(|deserialized_packet_batch| deserialized_packet_batch.borrow().packet_batch.packets.len()) .sum() } @@ -103,10 +304,11 @@ impl UnprocessedPacketBatches { #[allow(dead_code)] fn get_unprocessed_packets_count(&self) -> usize { self.iter() - .map(|deserialized_packet_batch| deserialized_packet_batch.unprocessed_packets.len()) + .map(|deserialized_packet_batch| deserialized_packet_batch.borrow().unprocessed_packets.len()) .sum() } +/* /// Iterates the inner `Vec`. /// Returns the flattened result of mapping each /// `DeserializedPacketBatch` to a list the batch's inner @@ -117,8 +319,9 @@ impl UnprocessedPacketBatches { self.iter() .enumerate() .flat_map(|(batch_index, deserialized_packet_batch)| { - let packet_batch = &deserialized_packet_batch.packet_batch; + let packet_batch = &deserialized_packet_batch.borrow().packet_batch; deserialized_packet_batch + .borrow() .unprocessed_packets .keys() .map(move |packet_index| { @@ -134,9 +337,235 @@ impl UnprocessedPacketBatches { }) .unzip() } +// */ + + fn weighted_shuffle(stakes: &[u64], locators: &[PacketLocator]) -> Vec { + let mut rng = rand::thread_rng(); + WeightedShuffle::new("leader_qos", stakes) + .shuffle(&mut rng) + .map(|i| locators[i].clone()) + .collect() + } + +/* + /// Index `locators` by their transaction's fee-per-cu value; For transactions + /// have same fee-per-cu, their relative order remains same (eg. in sender_stake order). + fn prioritize_by_fee_per_cu( + &mut self, + locators: &Vec, + bank: Option>, + ) -> Vec { + let mut fee_buckets = BTreeMap::>::new(); + for locator in locators { + // if unable to compute fee-per-cu for the packet, put it to the `0` bucket + let fee_per_cu = self.get_computed_fee_per_cu(locator, &bank).unwrap_or(0); + + let bucket = fee_buckets + .entry(fee_per_cu) + .or_insert(Vec::::new()); + bucket.push(locator.clone()); + } + fee_buckets + .iter() + .rev() + .flat_map(|(_key, bucket)| bucket.iter().cloned()) + .collect() + } + + /// get cached fee_per_cu for transaction referenced by `locator`, if cached value is + /// too old for current `bank`, or no cached value, then (re)compute and cache. + fn get_computed_fee_per_cu( + &mut self, + locator: &PacketLocator, + bank: &Option>, + ) -> Option { + if bank.is_none() { + return None; + } + let bank = bank.as_ref().unwrap(); + let deserialized_packet = self.locate_packet_mut(locator)?; + if let Some(cached_fee_per_cu) = + Self::get_cached_fee_per_cu(deserialized_packet, &bank.slot()) + { + Some(cached_fee_per_cu) + } else { + let computed_fee_per_cu = Self::compute_fee_per_cu(deserialized_packet, bank); + if let Some(computed_fee_per_cu) = computed_fee_per_cu { + deserialized_packet.fee_per_cu = Some(FeePerCu { + fee_per_cu: computed_fee_per_cu, + slot: bank.slot(), + }); + } + computed_fee_per_cu + } + } + + #[allow(dead_code)] + fn locate_packet(&self, locator: &PacketLocator) -> Option<&Rc> { + let deserialized_packet_batch = self.get(locator.batch_index)?; + deserialized_packet_batch + .borrow() + .unprocessed_packets + .get(&locator.packet_index) + } + + fn locate_packet_mut(&mut self, locator: &PacketLocator) -> Option<&mut Rc> { + let deserialized_packet_batch = self.get_mut(locator.batch_index)?; + deserialized_packet_batch + .borrow() + .unprocessed_packets + .get_mut(&locator.packet_index) + } +// */ + + /// Computes `(addition_fee + base_fee / requested_cu)` for packet referenced by `PacketLocator` + fn compute_fee_per_cu(deserialized_packet: &DeserializedPacket, bank: &Bank) -> Option { + let sanitized_message = + Self::sanitize_message(&deserialized_packet.versioned_transaction.message, bank)?; + let (total_fee, max_units) = Bank::calculate_fee( + &sanitized_message, + bank.get_lamports_per_signature(), + &bank.fee_structure, + bank.feature_set.is_active(&tx_wide_compute_cap::id()), + ); + Some(total_fee / max_units) + } + + fn get_cached_fee_per_cu(deserialized_packet: &DeserializedPacket, slot: &Slot) -> Option { + let cached_fee_per_cu = deserialized_packet.fee_per_cu.as_ref()?; + if cached_fee_per_cu.too_old(slot) { + None + } else { + Some(cached_fee_per_cu.fee_per_cu) + } + } + + fn sanitize_message( + versioned_message: &VersionedMessage, + address_loader: impl AddressLoader, + ) -> Option { + versioned_message.sanitize().ok()?; + + match versioned_message { + VersionedMessage::Legacy(message) => Some(SanitizedMessage::Legacy(message.clone())), + VersionedMessage::V0(message) => { + let loaded_addresses = address_loader + .load_addresses(&message.address_table_lookups) + .ok()?; + Some(SanitizedMessage::V0(v0::LoadedMessage::new( + message.clone(), + loaded_addresses, + ))) + } + } + } + + /* + /// This function is called to put new deserialized_packet_batch into buffer when it is + /// at Max capacity. + /// It tries to drop lower prioritized transactions in order to find an empty batch to swap + /// with new deserialized_packet_batch. + /// Returns the dropped deserialized_packet_batch, dropped batch count and dropped packets count. + fn replace_packet_by_priority( + &mut self, + deserialized_packet_batch: DeserializedPacketBatch, + ) -> ( + Option, + Option, + Option, + ) { + // push new batch to the end of Vec to join existing batches for prioritizing and selecting + self.push_back(deserialized_packet_batch); + let new_batch_index = self.len() - 1; + + // Right now, it doesn't have a bank that can be used to calculate fee/cu at + // point of packet receiving. + let bank_to_compute_fee: Option> = None; + // Get locators ordered by fee then sender's stake, the highest priority packet's locator + // at the top. + let ordered_locators_for_eviction = self.prioritize_by_fee_then_stakes(bank_to_compute_fee); + + // Start from the lowest priority to collect packets as candidates to be dropped, until + // find a Batch that would no longer have unprocessed packets. + let mut eviction_batch_index: Option = None; + let mut evicting_packets = HashMap::>::new(); + for locator in ordered_locators_for_eviction.iter().rev() { + if let Some(batch) = self.get(locator.batch_index) { + if batch + .borrow() + .unprocessed_packets + .contains_key(&locator.packet_index) + { + let packet_indexes = evicting_packets + .entry(locator.batch_index) + .or_insert(vec![]); + packet_indexes.push(locator.packet_index); + + if Self::would_be_empty_batch(batch, packet_indexes) { + // found an empty batch can be swapped with new batch + eviction_batch_index = Some(locator.batch_index); + break; + } + } + } + } + // remove those evicted packets by removing them from `unprocessed` list + let mut dropped_packets_count: Option = None; + evicting_packets + .iter() + .for_each(|(batch_index, evicted_packet_indexes)| { + if let Some(batch) = self.get_mut(*batch_index) { + batch + .unprocessed_packets + .retain(|&k, _| !evicted_packet_indexes.contains(&k)); + dropped_packets_count = Some( + dropped_packets_count + .unwrap_or(0usize) + .saturating_add(evicted_packet_indexes.len()), + ); + } + }); + + if let Some(eviction_batch_index) = eviction_batch_index { + if eviction_batch_index == new_batch_index { + // the new batch is identified to be the one for eviction, just pop it out; + (self.pop_back(), None, dropped_packets_count) + } else { + // we have a spot in the queue, swap it with the new batch at end of queue; + ( + self.swap_remove_back(eviction_batch_index), + Some(1usize), + dropped_packets_count, + ) + } + } else { + warn!("Cannot find eviction candidate from buffer in single iteration. New packet batch is dropped."); + (self.pop_back(), None, dropped_packets_count) + } + } +// */ + + /// Returns True if all unprocessed packets in the batch are in eviction list + fn would_be_empty_batch( + deserialized_packet_batch: &DeserializedPacketBatch, + eviction_list: &[usize], + ) -> bool { + if deserialized_packet_batch.unprocessed_packets.len() != eviction_list.len() { + return false; + } + + for (k, _) in deserialized_packet_batch.unprocessed_packets.iter() { + if !eviction_list.contains(k) { + return false; + } + } + + true + } } impl DeserializedPacketBatch { +/* pub fn new(packet_batch: PacketBatch, packet_indexes: Vec, forwarded: bool) -> Self { let unprocessed_packets = Self::deserialize_packets(&packet_batch, &packet_indexes); Self { @@ -145,22 +574,38 @@ impl DeserializedPacketBatch { forwarded, } } - - fn deserialize_packets( - packet_batch: &PacketBatch, - packet_indexes: &[usize], - ) -> HashMap { - packet_indexes - .iter() - .filter_map(|packet_index| { - let deserialized_packet = - Self::deserialize_packet(&packet_batch.packets[*packet_index])?; - Some((*packet_index, deserialized_packet)) - }) - .collect() +// */ + /// New self, update packet_index which implements the inner relationship between batch <--> packets. + pub fn new( + index: &mut PacketIndex, + packet_batch: PacketBatch, + packet_indexes: Vec, + forwarded: bool + ) -> Rc> { + let batch = Rc::new(RefCell::new(Self::default())); + (*batch.borrow_mut()).unprocessed_packets = + packet_indexes + .iter() + .filter_map(|packet_index| { + let deserialized_packet = + Self::deserialize_packet(&packet_batch.packets[*packet_index], *packet_index, Rc::downgrade(&batch.clone()))?; +// deserialized_packet.packet_index = packet_index; +// deserialized_packet.owner = Rc::downgrade(&batch.clone()); + let packet = Rc::new(deserialized_packet); + // update index on insertion + index.push(Rc::clone(&packet)); + Some((*packet_index, packet)) + }) + .collect(); + (*batch.borrow_mut()).packet_batch = packet_batch; + (*batch.borrow_mut()).forwarded = forwarded; + batch } - fn deserialize_packet(packet: &Packet) -> Option { + fn deserialize_packet(packet: &Packet, packet_index: usize, + + owner: Weak>, + ) -> Option { let versioned_transaction: VersionedTransaction = match limited_deserialize(&packet.data[0..packet.meta.size]) { Ok(tx) => tx, @@ -174,6 +619,10 @@ impl DeserializedPacketBatch { versioned_transaction, message_hash, is_simple_vote, + fee_per_cu: None, + sender_stake: 0, + packet_index, + owner, }) } else { None @@ -209,11 +658,22 @@ impl DeserializedPacketBatch { } } +/* #[cfg(test)] mod tests { use { super::*, - solana_sdk::{signature::Keypair, system_transaction}, + solana_runtime::{ +// bank::goto_end_of_slot, + genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, + }, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + signature::{Keypair, Signer}, + system_instruction::{self}, + system_transaction, + transaction::Transaction, + }, std::net::IpAddr, }; @@ -277,6 +737,7 @@ mod tests { ); } +/* #[test] fn test_get_stakes_and_locators_from_empty_buffer() { let unprocessed_packet_batches = UnprocessedPacketBatches::default(); @@ -334,4 +795,251 @@ mod tests { ); }); } +// */ +/* + #[test] + fn test_replace_packet_by_priority() { + solana_logger::setup(); + + let batch = DeserializedPacketBatch::new( + PacketBatch::new(vec![ + packet_with_sender_stake(200, None), + packet_with_sender_stake(210, None), + ]), + vec![0, 1], + false, + ); + let mut unprocessed_packets: UnprocessedPacketBatches = vec![batch].into_iter().collect(); + + // try to insert one with weight lesser than anything in buffer. + // the new one should be rejected, and buffer should be unchanged + { + let sender_stake = 0u64; + let new_batch = DeserializedPacketBatch::new( + PacketBatch::new(vec![packet_with_sender_stake(sender_stake, None)]), + vec![0], + false, + ); + let (dropped_batch, _, _) = unprocessed_packets.replace_packet_by_priority(new_batch); + // dropped batch should be the one made from new packet: + let dropped_packets = dropped_batch.unwrap(); + assert_eq!(1, dropped_packets.packet_batch.packets.len()); + assert_eq!( + sender_stake, + dropped_packets.packet_batch.packets[0].meta.sender_stake + ); + // buffer should be unchanged + assert_eq!(1, unprocessed_packets.len()); + } + + // try to insert one with sender_stake higher than anything in buffer. + // the lest sender_stake batch should be dropped, new one will take its palce. + { + let sender_stake = 50_000u64; + let new_batch = DeserializedPacketBatch::new( + PacketBatch::new(vec![packet_with_sender_stake(sender_stake, None)]), + vec![0], + false, + ); + let (dropped_batch, _, _) = unprocessed_packets.replace_packet_by_priority(new_batch); + // dropped batch should be the one with lest sender_stake in buffer (the 3rd batch): + let dropped_packets = dropped_batch.unwrap(); + assert_eq!(2, dropped_packets.packet_batch.packets.len()); + assert_eq!( + 200, + dropped_packets.packet_batch.packets[0].meta.sender_stake + ); + assert_eq!( + 210, + dropped_packets.packet_batch.packets[1].meta.sender_stake + ); + // buffer should still have 1 batches + assert_eq!(1, unprocessed_packets.len()); + // ... which should be the new batch with one packet + assert_eq!(1, unprocessed_packets[0].packet_batch.packets.len()); + assert_eq!( + sender_stake, + unprocessed_packets[0].packet_batch.packets[0] + .meta + .sender_stake + ); + assert_eq!(1, unprocessed_packets[0].unprocessed_packets.len()); + } + } +// */ + + // build a buffer of four batches, each contains packet with following stake: + // 0: [ 10, 300] + // 1: [100, 200, 300] + // 2: [ 20, 30, 40] + // 3: [500, 30, 200] + fn build_unprocessed_packets_buffer() -> UnprocessedPacketBatches { + vec![ + DeserializedPacketBatch::new( + PacketBatch::new(vec![ + packet_with_sender_stake(10, None), + packet_with_sender_stake(300, None), + packet_with_sender_stake(200, None), + ]), + vec![0, 1], + false, + ), + DeserializedPacketBatch::new( + PacketBatch::new(vec![ + packet_with_sender_stake(100, None), + packet_with_sender_stake(200, None), + packet_with_sender_stake(300, None), + ]), + vec![0, 1, 2], + false, + ), + DeserializedPacketBatch::new( + PacketBatch::new(vec![ + packet_with_sender_stake(20, None), + packet_with_sender_stake(30, None), + packet_with_sender_stake(40, None), + ]), + vec![0, 1, 2], + false, + ), + DeserializedPacketBatch::new( + PacketBatch::new(vec![ + packet_with_sender_stake(500, None), + packet_with_sender_stake(30, None), + packet_with_sender_stake(200, None), + ]), + vec![0, 1, 2], + false, + ), + ] + .into_iter() + .collect() + } + +/* + #[test] + fn test_prioritize_by_fee_per_cu() { + solana_logger::setup(); + + let leader = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + mut genesis_config, .. + } = create_genesis_config_with_leader(1_000_000, &leader, 3); + genesis_config + .fee_rate_governor + .target_lamports_per_signature = 1; + genesis_config.fee_rate_governor.target_signatures_per_slot = 1; + + let bank = Bank::new_for_tests(&genesis_config); + let mut bank = Bank::new_from_parent(&Arc::new(bank), &leader, 1); + goto_end_of_slot(&mut bank); + // build a "packet" with higher fee-pr-cu with compute_budget instruction. + let key0 = Keypair::new(); + let key1 = Keypair::new(); + let ix0 = system_instruction::transfer(&key0.pubkey(), &key1.pubkey(), 1); + let ix1 = system_instruction::transfer(&key1.pubkey(), &key0.pubkey(), 1); + let ix_cb = ComputeBudgetInstruction::request_units(1000, 20000); + let message = Message::new(&[ix0, ix1, ix_cb], Some(&key0.pubkey())); + let tx = Transaction::new(&[&key0, &key1], message, bank.last_blockhash()); + let packet = Packet::from_data(None, &tx).unwrap(); + + // build a buffer with 4 batches + let mut unprocessed_packets = build_unprocessed_packets_buffer(); + // add "packet" with higher fee-per-cu to buffer + unprocessed_packets.push_back(DeserializedPacketBatch::new( + PacketBatch::new(vec![packet]), + vec![0], + false, + )); + // randomly select 4 packets plus the higher fee/cu packets to feed into + // prioritize_by_fee_per_cu function. + let locators = vec![ + PacketLocator { + batch_index: 2, + packet_index: 2, + }, + PacketLocator { + batch_index: 1, + packet_index: 2, + }, + PacketLocator { + batch_index: 3, + packet_index: 0, + }, + PacketLocator { + batch_index: 3, + packet_index: 2, + }, + PacketLocator { + batch_index: 4, + packet_index: 0, + }, + ]; + + // If no bank is given, fee-per-cu won't calculate, should expect output is same as input + { + let prioritized_locators = + unprocessed_packets.prioritize_by_fee_per_cu(&locators, None); + assert_eq!(locators, prioritized_locators); + } + + // If bank is given, fee-per-cu is calculated, should expect higher fee-per-cu come + // out first + { + let expected_locators = vec![ + PacketLocator { + batch_index: 4, + packet_index: 0, + }, + PacketLocator { + batch_index: 2, + packet_index: 2, + }, + PacketLocator { + batch_index: 1, + packet_index: 2, + }, + PacketLocator { + batch_index: 3, + packet_index: 0, + }, + PacketLocator { + batch_index: 3, + packet_index: 2, + }, + ]; + + let prioritized_locators = + unprocessed_packets.prioritize_by_fee_per_cu(&locators, Some(Arc::new(bank))); + assert_eq!(expected_locators, prioritized_locators); + } + } +// */ + #[test] + fn test_get_cached_fee_per_cu() { + let mut deserialized_packet = DeserializedPacket::default(); + let slot: Slot = 100; + + // assert default deserialized_packet has no cached fee-per-cu + assert!( + UnprocessedPacketBatches::get_cached_fee_per_cu(&deserialized_packet, &slot).is_none() + ); + + // cache fee-per-cu with slot 100 + let fee_per_cu = 1_000u64; + deserialized_packet.fee_per_cu = Some(FeePerCu { fee_per_cu, slot }); + + // assert cache fee-per-cu is available for same slot + assert_eq!( + fee_per_cu, + UnprocessedPacketBatches::get_cached_fee_per_cu(&deserialized_packet, &slot).unwrap() + ); + + // assert cached value became too old + assert!( + UnprocessedPacketBatches::get_cached_fee_per_cu(&deserialized_packet, &(slot + 1)) + .is_none() + ); + } } +// */ diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index f3fbdca6b1da56..99873e7fad0939 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -507,16 +507,17 @@ impl Accounts { .unwrap_or_else(|| { hash_queue.get_lamports_per_signature(tx.message().recent_blockhash()) }); - let fee = if let Some(lamports_per_signature) = lamports_per_signature { - Bank::calculate_fee( - tx.message(), - lamports_per_signature, - fee_structure, - feature_set.is_active(&tx_wide_compute_cap::id()), - ) - } else { - return (Err(TransactionError::BlockhashNotFound), None); - }; + let (fee, _max_units) = + if let Some(lamports_per_signature) = lamports_per_signature { + Bank::calculate_fee( + tx.message(), + lamports_per_signature, + fee_structure, + feature_set.is_active(&tx_wide_compute_cap::id()), + ) + } else { + return (Err(TransactionError::BlockhashNotFound), None); + }; let loaded_transaction = match self.load_transaction( ancestors, @@ -1587,7 +1588,7 @@ mod tests { instructions, ); - let fee = Bank::calculate_fee( + let (fee, _max_units) = Bank::calculate_fee( &SanitizedMessage::try_from(tx.message().clone()).unwrap(), 10, &FeeStructure::default(), diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 140ff4e10f8336..a4abe22cc1d62c 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -3361,12 +3361,15 @@ impl Bank { NoncePartial::new(address, account).lamports_per_signature() }) })?; - Some(Self::calculate_fee( - message, - lamports_per_signature, - &self.fee_structure, - self.feature_set.is_active(&tx_wide_compute_cap::id()), - )) + Some( + Self::calculate_fee( + message, + lamports_per_signature, + &self.fee_structure, + self.feature_set.is_active(&tx_wide_compute_cap::id()), + ) + .0, + ) } pub fn get_fee_for_message_with_lamports_per_signature( @@ -3380,6 +3383,7 @@ impl Bank { &self.fee_structure, self.feature_set.is_active(&tx_wide_compute_cap::id()), ) + .0 } #[deprecated( @@ -4357,12 +4361,13 @@ impl Bank { } /// Calculate fee for `SanitizedMessage` + /// returns (total_fee, max_units) pub fn calculate_fee( message: &SanitizedMessage, lamports_per_signature: u64, fee_structure: &FeeStructure, tx_wide_compute_cap: bool, - ) -> u64 { + ) -> (u64, u64) { if tx_wide_compute_cap { // Fee based on compute units and signatures const BASE_CONGESTION: f64 = 5_000.0; @@ -4394,15 +4399,21 @@ impl Bank { .unwrap_or_default() }); - ((additional_fee - .saturating_add(signature_fee) - .saturating_add(write_lock_fee) - .saturating_add(compute_fee) as f64) - * congestion_multiplier) - .round() as u64 + ( + ((additional_fee + .saturating_add(signature_fee) + .saturating_add(write_lock_fee) + .saturating_add(compute_fee) as f64) + * congestion_multiplier) + .round() as u64, + compute_budget.max_units, + ) } else { // Fee based only on signatures - lamports_per_signature.saturating_mul(Self::get_num_signatures_in_message(message)) + ( + lamports_per_signature.saturating_mul(Self::get_num_signatures_in_message(message)), + compute_budget::MAX_UNITS as u64, + ) } } @@ -4437,7 +4448,7 @@ impl Bank { let lamports_per_signature = lamports_per_signature.ok_or(TransactionError::BlockhashNotFound)?; - let fee = Self::calculate_fee( + let (fee, _units) = Self::calculate_fee( tx.message(), lamports_per_signature, &self.fee_structure, @@ -9592,7 +9603,7 @@ pub(crate) mod tests { } = create_genesis_config_with_leader(mint, &leader, 3); genesis_config.fee_rate_governor = FeeRateGovernor::new(4, 0); // something divisible by 2 - let expected_fee_paid = Bank::calculate_fee( + let (expected_fee_paid, _max_units) = Bank::calculate_fee( &SanitizedMessage::try_from(Message::new(&[], Some(&Pubkey::new_unique()))).unwrap(), genesis_config .fee_rate_governor @@ -9777,7 +9788,7 @@ pub(crate) mod tests { let tx = system_transaction::transfer(&mint_keypair, &key.pubkey(), 1, cheap_blockhash); assert_eq!(bank.process_transaction(&tx), Ok(())); assert_eq!(bank.get_balance(&key.pubkey()), 1); - let cheap_fee = Bank::calculate_fee( + let (cheap_fee, _max_units) = Bank::calculate_fee( &SanitizedMessage::try_from(Message::new(&[], Some(&Pubkey::new_unique()))).unwrap(), cheap_lamports_per_signature, &FeeStructure::default(), @@ -9794,7 +9805,7 @@ pub(crate) mod tests { let tx = system_transaction::transfer(&mint_keypair, &key.pubkey(), 1, expensive_blockhash); assert_eq!(bank.process_transaction(&tx), Ok(())); assert_eq!(bank.get_balance(&key.pubkey()), 1); - let expensive_fee = Bank::calculate_fee( + let (expensive_fee, _max_units) = Bank::calculate_fee( &SanitizedMessage::try_from(Message::new(&[], Some(&Pubkey::new_unique()))).unwrap(), expensive_lamports_per_signature, &FeeStructure::default(), @@ -9915,7 +9926,8 @@ pub(crate) mod tests { .lamports_per_signature, &FeeStructure::default(), true, - ) * 2 + ) + .0 * 2 ) .0 ); @@ -16196,13 +16208,13 @@ pub(crate) mod tests { SanitizedMessage::try_from(Message::new(&[], Some(&Pubkey::new_unique()))).unwrap(); assert_eq!( Bank::calculate_fee(&message, 0, &FeeStructure::default(), false), - 0 + (0, compute_budget::MAX_UNITS as u64) ); // One signature, a fee. assert_eq!( Bank::calculate_fee(&message, 1, &FeeStructure::default(), false), - 1 + (1, compute_budget::MAX_UNITS as u64) ); // Two signatures, double the fee. @@ -16213,7 +16225,7 @@ pub(crate) mod tests { let message = SanitizedMessage::try_from(Message::new(&[ix0, ix1], Some(&key0))).unwrap(); assert_eq!( Bank::calculate_fee(&message, 2, &FeeStructure::default(), false), - 4 + (4, compute_budget::MAX_UNITS as u64) ); } @@ -16229,7 +16241,10 @@ pub(crate) mod tests { SanitizedMessage::try_from(Message::new(&[], Some(&Pubkey::new_unique()))).unwrap(); assert_eq!( Bank::calculate_fee(&message, 1, &fee_structure, true), - max_fee + lamports_per_signature + ( + max_fee + lamports_per_signature, + compute_budget::MAX_UNITS as u64 + ) ); // Three signatures, two instructions, no unit request @@ -16241,7 +16256,10 @@ pub(crate) mod tests { .unwrap(); assert_eq!( Bank::calculate_fee(&message, 1, &fee_structure, true), - max_fee + 3 * lamports_per_signature + ( + max_fee + 3 * lamports_per_signature, + compute_budget::MAX_UNITS as u64 + ) ); // Explicit fee schedule @@ -16267,7 +16285,7 @@ pub(crate) mod tests { let message = SanitizedMessage::try_from(Message::new(&[ix0, ix1], Some(&Pubkey::new_unique()))) .unwrap(); - let fee = Bank::calculate_fee(&message, 1, &fee_structure, true); + let (fee, _units) = Bank::calculate_fee(&message, 1, &fee_structure, true); assert_eq!( fee, sol_to_lamports(pair.1) + lamports_per_signature + ADDITIONAL_FEE @@ -16303,7 +16321,7 @@ pub(crate) mod tests { .unwrap(); assert_eq!( Bank::calculate_fee(&message, 1, &FeeStructure::default(), false), - 2 + (2, compute_budget::MAX_UNITS as u64) ); secp_instruction1.data = vec![0]; @@ -16315,7 +16333,7 @@ pub(crate) mod tests { .unwrap(); assert_eq!( Bank::calculate_fee(&message, 1, &FeeStructure::default(), false), - 11 + (11, compute_budget::MAX_UNITS as u64) ); }