diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 68c3243efd1512..8df66e9b47f362 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -9,7 +9,7 @@ use { solana_entry::entry::{create_ticks, Entry}, solana_ledger::shred::{ max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, ShredFlags, - Shredder, LEGACY_SHRED_DATA_CAPACITY, MAX_DATA_SHREDS_PER_FEC_BLOCK, + Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY, }, solana_perf::test_tx, solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair}, @@ -153,13 +153,12 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) { #[bench] fn bench_shredder_coding(bencher: &mut Bencher) { - let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; + let symbol_count = DATA_SHREDS_PER_FEC_BLOCK; let data_shreds = make_shreds(symbol_count); bencher.iter(|| { Shredder::generate_coding_shreds( &data_shreds[..symbol_count], - true, // is_last_in_slot - 0, // next_code_index + 0, // next_code_index ) .len(); }) @@ -167,12 +166,11 @@ fn bench_shredder_coding(bencher: &mut Bencher) { #[bench] fn bench_shredder_decoding(bencher: &mut Bencher) { - let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; + let symbol_count = DATA_SHREDS_PER_FEC_BLOCK; let data_shreds = make_shreds(symbol_count); let coding_shreds = Shredder::generate_coding_shreds( &data_shreds[..symbol_count], - true, // is_last_in_slot - 0, // next_code_index + 0, // next_code_index ); bencher.iter(|| { Shredder::try_recovery(coding_shreds[..].to_vec()).unwrap(); @@ -181,18 +179,18 @@ fn bench_shredder_decoding(bencher: &mut Bencher) { #[bench] fn bench_shredder_coding_raptorq(bencher: &mut Bencher) { - let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK; - let data = make_concatenated_shreds(symbol_count as usize); + let symbol_count = DATA_SHREDS_PER_FEC_BLOCK; + let data = make_concatenated_shreds(symbol_count); bencher.iter(|| { let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16); - encoder.get_encoded_packets(symbol_count); + encoder.get_encoded_packets(symbol_count as u32); }) } #[bench] fn bench_shredder_decoding_raptorq(bencher: &mut Bencher) { - let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK; - let data = make_concatenated_shreds(symbol_count as usize); + let symbol_count = DATA_SHREDS_PER_FEC_BLOCK; + let data = make_concatenated_shreds(symbol_count); let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16); let mut packets = encoder.get_encoded_packets(symbol_count as u32); packets.shuffle(&mut rand::thread_rng()); diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index fd30cfb3b9546d..a54b0fa79f79c5 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -2,7 +2,6 @@ use { crate::result::Result, crossbeam_channel::Receiver, solana_entry::entry::Entry, - solana_ledger::shred::Shred, solana_poh::poh_recorder::WorkingBankEntry, solana_runtime::bank::Bank, solana_sdk::clock::Slot, @@ -25,9 +24,6 @@ pub struct UnfinishedSlotInfo { pub(crate) next_code_index: u32, pub slot: Slot, pub parent: Slot, - // Data shreds buffered to make a batch of size - // MAX_DATA_SHREDS_PER_FEC_BLOCK. - pub(crate) data_shreds_buffer: Vec, } /// This parameter tunes how many entries are received in one iteration of recv loop diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 17a3725cd57594..87715f8b16c82b 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -9,9 +9,7 @@ use { broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache, }, solana_entry::entry::Entry, - solana_ledger::shred::{ - ProcessShredsStats, Shred, ShredFlags, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, - }, + solana_ledger::shred::{ProcessShredsStats, Shred, ShredFlags, Shredder}, solana_sdk::{ signature::Keypair, timing::{duration_as_us, AtomicInterval}, @@ -68,41 +66,27 @@ impl StandardBroadcastRun { None => Vec::default(), Some(ref state) if state.slot == current_slot => Vec::default(), Some(ref mut state) => { - let parent_offset = state.slot - state.parent; let reference_tick = max_ticks_in_slot & SHRED_TICK_REFERENCE_MASK; - let fec_set_offset = state - .data_shreds_buffer - .first() - .map(Shred::index) - .unwrap_or(state.next_shred_index); - let fec_set_index = Shredder::fec_set_index(state.next_shred_index, fec_set_offset); - let mut shred = Shred::new_from_data( - state.slot, - state.next_shred_index, - parent_offset as u16, - &[], // data - ShredFlags::LAST_SHRED_IN_SLOT, - reference_tick, - self.shred_version, - fec_set_index.unwrap(), - ); - shred.sign(keypair); - state.data_shreds_buffer.push(shred.clone()); - let mut shreds = make_coding_shreds( + let shredder = + Shredder::new(state.slot, state.parent, reference_tick, self.shred_version) + .unwrap(); + let (mut shreds, coding_shreds) = shredder.entries_to_shreds( keypair, - &mut self.unfinished_slot, - true, // is_last_in_slot + &[], // entries + true, // is_last_in_slot, + state.next_shred_index, + state.next_code_index, stats, ); - shreds.insert(0, shred); self.report_and_reset_stats(true); self.unfinished_slot = None; + shreds.extend(coding_shreds); shreds } } } - fn entries_to_data_shreds( + fn entries_to_shreds( &mut self, keypair: &Keypair, entries: &[Entry], @@ -110,10 +94,13 @@ impl StandardBroadcastRun { reference_tick: u8, is_slot_end: bool, process_stats: &mut ProcessShredsStats, - ) -> Vec { + ) -> ( + Vec, // data shreds + Vec, // coding shreds + ) { let (slot, parent_slot) = self.current_slot_and_parent.unwrap(); - let next_shred_index = match &self.unfinished_slot { - Some(state) => state.next_shred_index, + let (next_shred_index, next_code_index) = match &self.unfinished_slot { + Some(state) => (state.next_shred_index, state.next_code_index), None => { // If the blockstore has shreds for the slot, it should not // recreate the slot: @@ -123,46 +110,37 @@ impl StandardBroadcastRun { process_stats.num_extant_slots += 1; // This is a faulty situation that should not happen. // Refrain from generating shreds for the slot. - return Vec::default(); + return (Vec::default(), Vec::default()); } } - 0u32 - } - }; - let data_shreds = Shredder::new(slot, parent_slot, reference_tick, self.shred_version) - .unwrap() - .entries_to_data_shreds( - keypair, - entries, - is_slot_end, - next_shred_index, - 0, // fec_set_offset - process_stats, - ); - let mut data_shreds_buffer = match &mut self.unfinished_slot { - Some(state) => { - assert_eq!(state.slot, slot); - std::mem::take(&mut state.data_shreds_buffer) + (0u32, 0u32) } - None => Vec::default(), }; - data_shreds_buffer.extend(data_shreds.clone()); + let shredder = + Shredder::new(slot, parent_slot, reference_tick, self.shred_version).unwrap(); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( + keypair, + entries, + is_slot_end, + next_shred_index, + next_code_index, + process_stats, + ); let next_shred_index = match data_shreds.iter().map(Shred::index).max() { Some(index) => index + 1, None => next_shred_index, }; - let next_code_index = match &self.unfinished_slot { - Some(state) => state.next_code_index, - None => 0, + let next_code_index = match coding_shreds.iter().map(Shred::index).max() { + Some(index) => index + 1, + None => next_code_index, }; self.unfinished_slot = Some(UnfinishedSlotInfo { next_shred_index, next_code_index, slot, parent: parent_slot, - data_shreds_buffer, }); - data_shreds + (data_shreds, coding_shreds) } #[cfg(test)] @@ -228,7 +206,7 @@ impl StandardBroadcastRun { // 2) Convert entries to shreds and coding shreds let is_last_in_slot = last_tick_height == bank.max_tick_height(); let reference_tick = bank.tick_height() % bank.ticks_per_slot(); - let data_shreds = self.entries_to_data_shreds( + let (data_shreds, coding_shreds) = self.entries_to_shreds( keypair, &receive_results.entries, blockstore, @@ -300,13 +278,7 @@ impl StandardBroadcastRun { socket_sender.send((data_shreds.clone(), batch_info.clone()))?; blockstore_sender.send((data_shreds, batch_info.clone()))?; - // Create and send coding shreds - let coding_shreds = make_coding_shreds( - keypair, - &mut self.unfinished_slot, - is_last_in_slot, - &mut process_stats, - ); + // Send coding shreds let coding_shreds = Arc::new(coding_shreds); debug_assert!(coding_shreds .iter() @@ -435,49 +407,6 @@ impl StandardBroadcastRun { } } -// Consumes data_shreds_buffer returning corresponding coding shreds. -fn make_coding_shreds( - keypair: &Keypair, - unfinished_slot: &mut Option, - is_slot_end: bool, - stats: &mut ProcessShredsStats, -) -> Vec { - let unfinished_slot = match unfinished_slot { - None => return Vec::default(), - Some(state) => state, - }; - let data_shreds: Vec<_> = { - let size = unfinished_slot.data_shreds_buffer.len(); - // Consume a multiple of 32, unless this is the slot end. - let offset = if is_slot_end { - 0 - } else { - size % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - }; - unfinished_slot - .data_shreds_buffer - .drain(0..size - offset) - .collect() - }; - let shreds = Shredder::data_shreds_to_coding_shreds( - keypair, - &data_shreds, - is_slot_end, - unfinished_slot.next_code_index, - stats, - ) - .unwrap(); - if let Some(index) = shreds - .iter() - .filter(|shred| shred.is_code()) - .map(Shred::index) - .max() - { - unfinished_slot.next_code_index = unfinished_slot.next_code_index.max(index + 1); - } - shreds -} - impl BroadcastRun for StandardBroadcastRun { fn run( &mut self, @@ -591,7 +520,6 @@ mod test { next_code_index: 17, slot, parent, - data_shreds_buffer: Vec::default(), }); run.slot_broadcast_start = Some(Instant::now()); @@ -776,19 +704,15 @@ mod test { while let Ok((recv_shreds, _)) = brecv.recv_timeout(Duration::from_secs(1)) { shreds.extend(recv_shreds.deref().clone()); } - assert!(shreds.len() < 32, "shreds.len(): {}", shreds.len()); - assert!(shreds.iter().all(|shred| shred.is_data())); + // At least as many coding shreds as data shreds. + assert!(shreds.len() >= 29 * 2); + assert_eq!(shreds.iter().filter(|shred| shred.is_data()).count(), 29); process_ticks(75); while let Ok((recv_shreds, _)) = brecv.recv_timeout(Duration::from_secs(1)) { shreds.extend(recv_shreds.deref().clone()); } - assert!(shreds.len() > 64, "shreds.len(): {}", shreds.len()); - let num_coding_shreds = shreds.iter().filter(|shred| shred.is_code()).count(); - assert_eq!( - num_coding_shreds, 32, - "num coding shreds: {}", - num_coding_shreds - ); + assert!(shreds.len() >= 33 * 2); + assert_eq!(shreds.iter().filter(|shred| shred.is_data()).count(), 33); } #[test] diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index ae604e766f1957..d9427e30f64da2 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -293,8 +293,7 @@ mod tests { )); let coding = solana_ledger::shred::Shredder::generate_coding_shreds( &[shred], - false, // is_last_in_slot - 3, // next_code_index + 3, // next_code_index ); coding[0].copy_to_packet(&mut packet); assert!(!should_discard_packet( diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 403f6105b5e3a6..acacf9d842a7e5 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -8969,14 +8969,8 @@ pub mod tests { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - let coding1 = Shredder::generate_coding_shreds( - &shreds, false, // is_last_in_slot - 0, // next_code_index - ); - let coding2 = Shredder::generate_coding_shreds( - &shreds, true, // is_last_in_slot - 0, // next_code_index - ); + let coding1 = Shredder::generate_coding_shreds(&shreds, /*next_code_index:*/ 0); + let coding2 = Shredder::generate_coding_shreds(&shreds, /*next_code_index:*/ 1); for shred in &shreds { info!("shred {:?}", shred); } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index dda68ff093a309..2656367db79493 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -99,7 +99,11 @@ const OFFSET_OF_SHRED_VARIANT: usize = SIZE_OF_SIGNATURE; const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_VARIANT; const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; -pub const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 32; +// Shreds are uniformly split into erasure batches with a "target" number of +// data shreds per each batch as below. The actual number of data shreds in +// each erasure batch depends on the number of shreds obtained from serializing +// a &[Entry]. +pub const DATA_SHREDS_PER_FEC_BLOCK: usize = 32; // For legacy tests and benchmarks. const_assert_eq!(LEGACY_SHRED_DATA_CAPACITY, 1051); diff --git a/ledger/src/shred/shred_code.rs b/ledger/src/shred/shred_code.rs index 25ce8a2385916d..cd5d99e86a33e4 100644 --- a/ledger/src/shred/shred_code.rs +++ b/ledger/src/shred/shred_code.rs @@ -3,7 +3,7 @@ use { common::dispatch, legacy, merkle, traits::{Shred, ShredCode as ShredCodeTrait}, - CodingShredHeader, Error, ShredCommonHeader, ShredType, MAX_DATA_SHREDS_PER_FEC_BLOCK, + CodingShredHeader, Error, ShredCommonHeader, ShredType, DATA_SHREDS_PER_FEC_BLOCK, MAX_DATA_SHREDS_PER_SLOT, SIZE_OF_NONCE, }, solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, signature::Signature}, @@ -132,8 +132,8 @@ pub(super) fn sanitize(shred: &T) -> Result<(), Error> { common_header.index, )); } - let num_coding_shreds = u32::from(coding_header.num_coding_shreds); - if num_coding_shreds > 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK { + let num_coding_shreds = usize::from(coding_header.num_coding_shreds); + if num_coding_shreds > 8 * DATA_SHREDS_PER_FEC_BLOCK { return Err(Error::InvalidNumCodingShreds( coding_header.num_coding_shreds, )); diff --git a/ledger/src/shredder.rs b/ledger/src/shredder.rs index 22df6b87d7a063..ea7d869206bfb4 100644 --- a/ledger/src/shredder.rs +++ b/ledger/src/shredder.rs @@ -1,6 +1,6 @@ use { crate::shred::{ - Error, ProcessShredsStats, Shred, ShredData, ShredFlags, MAX_DATA_SHREDS_PER_FEC_BLOCK, + Error, ProcessShredsStats, Shred, ShredData, ShredFlags, DATA_SHREDS_PER_FEC_BLOCK, }, itertools::Itertools, lazy_static::lazy_static, @@ -24,6 +24,15 @@ lazy_static! { .unwrap(); } +// Maps number of data shreds to the optimal erasure batch size which has the +// same recovery probabilities as a 32:32 erasure batch. +const ERASURE_BATCH_SIZE: [usize; 33] = [ + 0, 18, 20, 22, 23, 25, 27, 28, 30, // 8 + 32, 33, 35, 36, 38, 39, 41, 42, // 16 + 43, 45, 46, 48, 49, 51, 52, 53, // 24 + 55, 56, 58, 59, 60, 62, 63, 64, // 32 +]; + type ReedSolomon = reed_solomon_erasure::ReedSolomon; #[derive(Debug)] @@ -65,45 +74,20 @@ impl Shredder { Vec, // data shreds Vec, // coding shreds ) { - let data_shreds = self.entries_to_data_shreds( - keypair, - entries, - is_last_in_slot, - next_shred_index, - next_shred_index, // fec_set_offset - stats, - ); - let coding_shreds = Self::data_shreds_to_coding_shreds( - keypair, - &data_shreds, - is_last_in_slot, - next_code_index, - stats, - ) - .unwrap(); + let data_shreds = + self.entries_to_data_shreds(keypair, entries, is_last_in_slot, next_shred_index, stats); + let coding_shreds = + Self::data_shreds_to_coding_shreds(keypair, &data_shreds, next_code_index, stats) + .unwrap(); (data_shreds, coding_shreds) } - /// Each FEC block has maximum MAX_DATA_SHREDS_PER_FEC_BLOCK shreds. - /// "FEC set index" is the index of first data shred in that FEC block. - /// **Data** shreds with the same value of: - /// (data_shred.index() - fec_set_offset) / MAX_DATA_SHREDS_PER_FEC_BLOCK - /// belong to the same FEC set. - /// Coding shreds inherit their fec_set_index from the data shreds that - /// they are generated from. - pub fn fec_set_index(data_shred_index: u32, fec_set_offset: u32) -> Option { - let diff = data_shred_index.checked_sub(fec_set_offset)?; - Some(data_shred_index - diff % MAX_DATA_SHREDS_PER_FEC_BLOCK) - } - - pub fn entries_to_data_shreds( + fn entries_to_data_shreds( &self, keypair: &Keypair, entries: &[Entry], is_last_in_slot: bool, next_shred_index: u32, - // Shred index offset at which FEC sets are generated. - fec_set_offset: u32, process_stats: &mut ProcessShredsStats, ) -> Vec { let mut serialize_time = Measure::start("shred_serialize"); @@ -119,7 +103,7 @@ impl Shredder { let num_shreds = (serialized_shreds.len() + data_buffer_size - 1) / data_buffer_size; let last_shred_index = next_shred_index + num_shreds as u32 - 1; // 1) Generate data shreds - let make_data_shred = |shred_index: u32, data| { + let make_data_shred = |data, shred_index: u32, fec_set_index: u32| { let flags = if shred_index != last_shred_index { ShredFlags::empty() } else if is_last_in_slot { @@ -129,7 +113,6 @@ impl Shredder { ShredFlags::DATA_COMPLETE_SHRED }; let parent_offset = self.slot - self.parent_slot; - let fec_set_index = Self::fec_set_index(shred_index, fec_set_offset); let mut shred = Shred::new_from_data( self.slot, shred_index, @@ -138,18 +121,24 @@ impl Shredder { flags, self.reference_tick, self.version, - fec_set_index.unwrap(), + fec_set_index, ); shred.sign(keypair); shred }; - let data_shreds: Vec = PAR_THREAD_POOL.install(|| { - serialized_shreds - .par_chunks(data_buffer_size) + let shreds: Vec<&[u8]> = serialized_shreds.chunks(data_buffer_size).collect(); + let fec_set_offsets: Vec = + get_fec_set_offsets(shreds.len(), DATA_SHREDS_PER_FEC_BLOCK).collect(); + assert_eq!(shreds.len(), fec_set_offsets.len()); + let shreds: Vec = PAR_THREAD_POOL.install(|| { + shreds + .into_par_iter() + .zip(fec_set_offsets) .enumerate() - .map(|(i, shred_data)| { + .map(|(i, (shred, offset))| { let shred_index = next_shred_index + i as u32; - make_data_shred(shred_index, shred_data) + let fec_set_index = next_shred_index + offset as u32; + make_data_shred(shred, shred_index, fec_set_index) }) .collect() }); @@ -157,15 +146,14 @@ impl Shredder { process_stats.serialize_elapsed += serialize_time.as_us(); process_stats.gen_data_elapsed += gen_data_time.as_us(); - process_stats.record_num_data_shreds(data_shreds.len()); + process_stats.record_num_data_shreds(shreds.len()); - data_shreds + shreds } - pub fn data_shreds_to_coding_shreds( + fn data_shreds_to_coding_shreds( keypair: &Keypair, data_shreds: &[Shred], - is_last_in_slot: bool, next_code_index: u32, process_stats: &mut ProcessShredsStats, ) -> Result, Error> { @@ -185,8 +173,7 @@ impl Shredder { .iter() .scan(next_code_index, |next_code_index, chunk| { let num_data_shreds = chunk.len(); - let erasure_batch_size = - get_erasure_batch_size(num_data_shreds, is_last_in_slot); + let erasure_batch_size = get_erasure_batch_size(num_data_shreds); *next_code_index += (erasure_batch_size - num_data_shreds) as u32; Some(*next_code_index) }), @@ -198,7 +185,7 @@ impl Shredder { .into_par_iter() .zip(next_code_index) .flat_map(|(shreds, next_code_index)| { - Shredder::generate_coding_shreds(&shreds, is_last_in_slot, next_code_index) + Shredder::generate_coding_shreds(&shreds, next_code_index) }) .collect() }); @@ -221,7 +208,6 @@ impl Shredder { /// Generates coding shreds for the data shreds in the current FEC set pub fn generate_coding_shreds>( data: &[T], - is_last_in_slot: bool, next_code_index: u32, ) -> Vec { let (slot, index, version, fec_set_index) = { @@ -241,9 +227,10 @@ impl Shredder { && shred.version() == version && shred.fec_set_index() == fec_set_index)); let num_data = data.len(); - let num_coding = get_erasure_batch_size(num_data, is_last_in_slot) + let num_coding = get_erasure_batch_size(num_data) .checked_sub(num_data) .unwrap(); + assert!(num_coding > 0); let data: Vec<_> = data .iter() .map(Borrow::borrow) @@ -360,12 +347,31 @@ impl Shredder { } /// Maps number of data shreds in each batch to the erasure batch size. -fn get_erasure_batch_size(num_data_shreds: usize, is_last_in_slot: bool) -> usize { - if is_last_in_slot { - 2 * num_data_shreds.max(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) - } else { - 2 * num_data_shreds - } +fn get_erasure_batch_size(num_data_shreds: usize) -> usize { + ERASURE_BATCH_SIZE + .get(num_data_shreds) + .copied() + .unwrap_or(2 * num_data_shreds) +} + +// Returns offsets to fec_set_index when spliting shreds into erasure batches. +fn get_fec_set_offsets( + mut num_shreds: usize, + min_chunk_size: usize, +) -> impl Iterator { + let mut offset = 0; + std::iter::from_fn(move || { + if num_shreds == 0 { + return None; + } + let num_chunks = (num_shreds / min_chunk_size).max(1); + let chunk_size = (num_shreds + num_chunks - 1) / num_chunks; + let offsets = std::iter::repeat(offset).take(chunk_size); + num_shreds -= chunk_size; + offset += chunk_size; + Some(offsets) + }) + .flatten() } #[cfg(test)] @@ -427,8 +433,7 @@ mod tests { let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap(); let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size; let num_expected_coding_shreds = - get_erasure_batch_size(num_expected_data_shreds, /*is_last_in_slot:*/ true) - - num_expected_data_shreds; + get_erasure_batch_size(num_expected_data_shreds) - num_expected_data_shreds; let start_index = 0; let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &keypair, @@ -684,7 +689,7 @@ mod tests { assert_eq!(data_shreds.len(), num_data_shreds); assert_eq!( num_coding_shreds, - get_erasure_batch_size(num_data_shreds, is_last_in_slot) - num_data_shreds + get_erasure_batch_size(num_data_shreds) - num_data_shreds ); let all_shreds = data_shreds @@ -989,19 +994,34 @@ mod tests { start_index, // next_code_index &mut ProcessShredsStats::default(), ); - let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; - data_shreds.iter().enumerate().for_each(|(i, s)| { - let expected_fec_set_index = start_index + (i - i % max_per_block) as u32; - assert_eq!(s.fec_set_index(), expected_fec_set_index); - }); - - coding_shreds.iter().enumerate().for_each(|(i, s)| { - let mut expected_fec_set_index = start_index + (i - i % max_per_block) as u32; - while expected_fec_set_index as usize - start_index as usize > data_shreds.len() { - expected_fec_set_index -= max_per_block as u32; - } - assert_eq!(s.fec_set_index(), expected_fec_set_index); - }); + const MIN_CHUNK_SIZE: usize = DATA_SHREDS_PER_FEC_BLOCK; + let chunks: Vec<_> = data_shreds + .iter() + .group_by(|shred| shred.fec_set_index()) + .into_iter() + .map(|(fec_set_index, chunk)| (fec_set_index, chunk.count())) + .collect(); + assert!(chunks + .iter() + .all(|(_, chunk_size)| *chunk_size >= MIN_CHUNK_SIZE)); + assert!(chunks + .iter() + .all(|(_, chunk_size)| *chunk_size < 2 * MIN_CHUNK_SIZE)); + assert_eq!(chunks[0].0, start_index); + assert!(chunks.iter().tuple_windows().all( + |((fec_set_index, chunk_size), (next_fec_set_index, _chunk_size))| fec_set_index + + *chunk_size as u32 + == *next_fec_set_index + )); + assert!(coding_shreds.len() >= data_shreds.len()); + assert!(coding_shreds + .iter() + .zip(&data_shreds) + .all(|(code, data)| code.fec_set_index() == data.fec_set_index())); + assert_eq!( + coding_shreds.last().unwrap().fec_set_index(), + data_shreds.last().unwrap().fec_set_index() + ); } #[test] @@ -1028,43 +1048,61 @@ mod tests { &entries, true, // is_last_in_slot start_index, - start_index, // fec_set_offset &mut stats, ); - assert!(data_shreds.len() > MAX_DATA_SHREDS_PER_FEC_BLOCK as usize); let next_code_index = data_shreds[0].index(); - (1..=MAX_DATA_SHREDS_PER_FEC_BLOCK as usize).for_each(|count| { - for is_last_in_slot in [false, true] { - let coding_shreds = Shredder::data_shreds_to_coding_shreds( - &keypair, - &data_shreds[..count], - is_last_in_slot, - next_code_index, - &mut stats, - ) - .unwrap(); - let num_coding_shreds = get_erasure_batch_size(count, is_last_in_slot) - count; - assert_eq!(coding_shreds.len(), num_coding_shreds); - } - }); - for is_last_in_slot in [false, true] { + for size in (1..data_shreds.len()).step_by(5) { + let data_shreds = &data_shreds[..size]; let coding_shreds = Shredder::data_shreds_to_coding_shreds( &keypair, - &data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1], - is_last_in_slot, + data_shreds, next_code_index, &mut stats, ) .unwrap(); - let num_shreds = - get_erasure_batch_size(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize, is_last_in_slot) - + get_erasure_batch_size(1, is_last_in_slot); + let num_shreds: usize = data_shreds + .iter() + .group_by(|shred| shred.fec_set_index()) + .into_iter() + .map(|(_, chunk)| get_erasure_batch_size(chunk.count())) + .sum(); + assert_eq!(coding_shreds.len(), num_shreds - data_shreds.len()); + } + } + + #[test] + fn test_get_fec_set_offsets() { + const MIN_CHUNK_SIZE: usize = 32usize; + for num_shreds in 0usize..MIN_CHUNK_SIZE { + let offsets: Vec<_> = get_fec_set_offsets(num_shreds, MIN_CHUNK_SIZE).collect(); + assert_eq!(offsets, vec![0usize; num_shreds]); + } + for num_shreds in MIN_CHUNK_SIZE..MIN_CHUNK_SIZE * 8 { + let chunks: Vec<_> = get_fec_set_offsets(num_shreds, MIN_CHUNK_SIZE) + .group_by(|offset| *offset) + .into_iter() + .map(|(offset, chunk)| (offset, chunk.count())) + .collect(); assert_eq!( - coding_shreds.len(), - num_shreds - MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - 1 + chunks + .iter() + .map(|(_offset, chunk_size)| chunk_size) + .sum::(), + num_shreds ); + assert!(chunks + .iter() + .all(|(_offset, chunk_size)| *chunk_size >= MIN_CHUNK_SIZE)); + assert!(chunks + .iter() + .all(|(_offset, chunk_size)| *chunk_size < 2 * MIN_CHUNK_SIZE)); + assert_eq!(chunks[0].0, 0); + assert!(chunks.iter().tuple_windows().all( + |((offset, chunk_size), (next_offset, _chunk_size))| offset + chunk_size + == *next_offset + )); } } } diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index 192e36ecf0bc5e..23a1cf83f967f7 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -3,7 +3,7 @@ use { solana_entry::entry::Entry, solana_ledger::shred::{ max_entries_per_n_shred, verify_test_data_shred, ProcessShredsStats, Shred, Shredder, - LEGACY_SHRED_DATA_CAPACITY, MAX_DATA_SHREDS_PER_FEC_BLOCK, + DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY, }, solana_sdk::{ clock::Slot, @@ -26,7 +26,7 @@ fn test_multi_fec_block_coding() { let slot = 0x1234_5678_9abc_def0; let shredder = Shredder::new(slot, slot - 5, 0, 0).unwrap(); let num_fec_sets = 100; - let num_data_shreds = (MAX_DATA_SHREDS_PER_FEC_BLOCK * num_fec_sets) as usize; + let num_data_shreds = DATA_SHREDS_PER_FEC_BLOCK * num_fec_sets; let keypair0 = Keypair::new(); let keypair1 = Keypair::new(); let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); @@ -67,8 +67,8 @@ fn test_multi_fec_block_coding() { let mut all_shreds = vec![]; for i in 0..num_fec_sets { - let shred_start_index = (MAX_DATA_SHREDS_PER_FEC_BLOCK * i) as usize; - let end_index = shred_start_index + MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - 1; + let shred_start_index = DATA_SHREDS_PER_FEC_BLOCK * i; + let end_index = shred_start_index + DATA_SHREDS_PER_FEC_BLOCK - 1; let fec_set_shreds = data_shreds[shred_start_index..=end_index] .iter() .cloned() @@ -99,11 +99,7 @@ fn test_multi_fec_block_coding() { shred_info.insert(i * 2, recovered_shred); } - all_shreds.extend( - shred_info - .into_iter() - .take(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize), - ); + all_shreds.extend(shred_info.into_iter().take(DATA_SHREDS_PER_FEC_BLOCK)); } let result = Shredder::deshred(&all_shreds[..]).unwrap(); @@ -193,11 +189,11 @@ fn setup_different_sized_fec_blocks( let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); let entry = Entry::new(&Hash::default(), 1, vec![tx0]); - // Make enough entries for `MAX_DATA_SHREDS_PER_FEC_BLOCK + 2` shreds so one - // fec set will have `MAX_DATA_SHREDS_PER_FEC_BLOCK` shreds and the next + // Make enough entries for `DATA_SHREDS_PER_FEC_BLOCK + 2` shreds so one + // fec set will have `DATA_SHREDS_PER_FEC_BLOCK` shreds and the next // will have 2 shreds. - assert!(MAX_DATA_SHREDS_PER_FEC_BLOCK > 2); - let num_shreds_per_iter = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 2; + assert!(DATA_SHREDS_PER_FEC_BLOCK > 2); + let num_shreds_per_iter = DATA_SHREDS_PER_FEC_BLOCK + 2; let num_entries = max_entries_per_n_shred( &entry, num_shreds_per_iter as u64,