diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 333db34bcef2f4..39d098ba3d38a7 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -28,11 +28,11 @@ fn make_large_unchained_entries(txs_per_entry: u64, num_entries: u64) -> Vec Vec { +fn make_shreds(num_shreds: u32) -> Vec { let txs_per_entry = 128; let num_entries = max_entries_per_n_shred( &make_test_entry(txs_per_entry), - 2 * num_shreds as u64, + 2 * num_shreds, Some(LEGACY_SHRED_DATA_CAPACITY), ); let entries = make_large_unchained_entries(txs_per_entry, num_entries); @@ -49,7 +49,7 @@ fn make_shreds(num_shreds: usize) -> Vec { &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); - assert!(data_shreds.len() >= num_shreds); + assert!(data_shreds.len() >= num_shreds as usize); data_shreds } @@ -57,7 +57,7 @@ fn make_shreds(num_shreds: usize) -> Vec { fn bench_shredder_ticks(bencher: &mut Bencher) { let kp = Keypair::new(); let shred_size = LEGACY_SHRED_DATA_CAPACITY; - let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size; + let num_shreds = u32::try_from(((1000 * 1000) + (shred_size - 1)) / shred_size).unwrap(); // ~1Mb let num_ticks = max_ticks_per_n_shreds(1, Some(LEGACY_SHRED_DATA_CAPACITY)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); @@ -83,11 +83,11 @@ fn bench_shredder_ticks(bencher: &mut Bencher) { fn bench_shredder_large_entries(bencher: &mut Bencher) { let kp = Keypair::new(); let shred_size = LEGACY_SHRED_DATA_CAPACITY; - let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size; + let num_shreds = u32::try_from(((1000 * 1000) + (shred_size - 1)) / shred_size).unwrap(); let txs_per_entry = 128; let num_entries = max_entries_per_n_shred( &make_test_entry(txs_per_entry), - num_shreds as u64, + num_shreds, Some(shred_size), ); let entries = make_large_unchained_entries(txs_per_entry, num_entries); @@ -115,7 +115,7 @@ fn bench_deshredder(bencher: &mut Bencher) { let kp = Keypair::new(); let shred_size = LEGACY_SHRED_DATA_CAPACITY; // ~10Mb - let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size; + let num_shreds = u32::try_from(((10000 * 1000) + (shred_size - 1)) / shred_size).unwrap(); let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); let shredder = Shredder::new(1, 0, 0, 0).unwrap(); @@ -156,7 +156,7 @@ fn bench_shredder_coding(bencher: &mut Bencher) { let reed_solomon_cache = ReedSolomonCache::default(); bencher.iter(|| { Shredder::generate_coding_shreds( - &data_shreds[..symbol_count], + &data_shreds[..symbol_count as usize], 0, // next_code_index &reed_solomon_cache, ) @@ -170,7 +170,7 @@ fn bench_shredder_decoding(bencher: &mut Bencher) { let data_shreds = make_shreds(symbol_count); let reed_solomon_cache = ReedSolomonCache::default(); let coding_shreds = Shredder::generate_coding_shreds( - &data_shreds[..symbol_count], + &data_shreds[..symbol_count as usize], 0, // next_code_index &reed_solomon_cache, ); diff --git a/core/src/repair/repair_response.rs b/core/src/repair/repair_response.rs index 0a82935e89892c..a02b07d891fc40 100644 --- a/core/src/repair/repair_response.rs +++ b/core/src/repair/repair_response.rs @@ -10,7 +10,7 @@ use { pub fn repair_response_packet( blockstore: &Blockstore, slot: Slot, - shred_index: u64, + shred_index: u32, dest: &SocketAddr, nonce: Nonce, ) -> Option { diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index 7d3d6dd54213e4..c92cb65ea384bd 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -666,7 +666,11 @@ impl RepairService { if let Some(reference_tick) = slot_meta .received .checked_sub(1) - .and_then(|index| blockstore.get_data_shred(slot, index).ok()?) + .and_then(|index| { + blockstore + .get_data_shred(slot, u32::try_from(index).ok()?) + .ok()? + }) .and_then(|shred| shred::layout::get_reference_tick(&shred).ok()) .map(u64::from) { diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index 44c221f2a97877..d02802109859ba 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -96,8 +96,12 @@ pub enum ShredRepairType { /// Requesting `MAX_ORPHAN_REPAIR_RESPONSES ` parent shreds Orphan(Slot), /// Requesting any shred with index greater than or equal to the particular index + // TODO As the value is a shred index, it should use `u32`. But as changing the type would + // affect the serialized representation, we would need to introduce a conversion. HighestShred(Slot, u64), /// Requesting the missing shred at a particular index + // TODO As the value is a shred index, it should use `u32`. But as changing the type would + // affect the serialized representation, we would need to introduce a conversion. Shred(Slot, u64), } @@ -221,6 +225,7 @@ impl RepairRequestHeader { pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>; /// Window protocol messages +// TODO All `u64`s in the branches of this enum are shred indices, and they should be used `u32`. #[cfg_attr( feature = "frozen-abi", derive(AbiEnumVisitor, AbiExample), @@ -437,7 +442,7 @@ impl ServeRepair { from_addr, blockstore, *slot, - *shred_index, + u32::try_from(*shred_index).expect("All shred indices fit into u32"), *nonce, ); if batch.is_none() { @@ -457,7 +462,7 @@ impl ServeRepair { from_addr, blockstore, *slot, - *highest_index, + u32::try_from(*highest_index).expect("All shred indices fit into u32"), *nonce, ), "HighestWindowIndexWithNonce", @@ -1290,7 +1295,7 @@ impl ServeRepair { from_addr: &SocketAddr, blockstore: &Blockstore, slot: Slot, - shred_index: u64, + shred_index: u32, nonce: Nonce, ) -> Option { // Try to find the requested index in one of the slots @@ -1313,17 +1318,17 @@ impl ServeRepair { from_addr: &SocketAddr, blockstore: &Blockstore, slot: Slot, - highest_index: u64, + highest_index: u32, nonce: Nonce, ) -> Option { // Try to find the requested index in one of the slots let meta = blockstore.meta(slot).ok()??; - if meta.received > highest_index { + if meta.received > highest_index.into() { // meta.received must be at least 1 by this point let packet = repair_response::repair_response_packet( blockstore, slot, - meta.received - 1, + u32::try_from(meta.received - 1).expect("All shred indices fit into u32"), from_addr, nonce, )?; @@ -1354,7 +1359,8 @@ impl ServeRepair { repair_response::repair_response_packet( blockstore, meta.slot, - meta.received.checked_sub(1u64)?, + u32::try_from(meta.received.checked_sub(1u64)?) + .expect("All shred indices fit into u32"), from_addr, nonce, ) @@ -1909,7 +1915,7 @@ mod tests { nonce, ) .expect("packets"); - let request = ShredRepairType::HighestShred(slot, index); + let request = ShredRepairType::HighestShred(slot, index.into()); verify_responses(&request, rv.iter()); let rv: Vec = rv @@ -1920,8 +1926,10 @@ mod tests { }) .collect(); assert!(!rv.is_empty()); - let index = blockstore.meta(slot).unwrap().unwrap().received - 1; - assert_eq!(rv[0].index(), index as u32); + let index: u32 = (blockstore.meta(slot).unwrap().unwrap().received - 1) + .try_into() + .unwrap(); + assert_eq!(rv[0].index(), index); assert_eq!(rv[0].slot(), slot); let rv = ServeRepair::run_highest_window_request( @@ -1971,7 +1979,7 @@ mod tests { nonce, ) .expect("packets"); - let request = ShredRepairType::Shred(slot, index); + let request = ShredRepairType::Shred(slot, index.into()); verify_responses(&request, rv.iter()); let rv: Vec = rv .into_iter() @@ -2163,7 +2171,7 @@ mod tests { repair_response::repair_response_packet( &blockstore, slot, - index, + index.try_into().unwrap(), &socketaddr_any!(), nonce, ) diff --git a/ledger/benches/blockstore.rs b/ledger/benches/blockstore.rs index 27296d412d7ab3..edc032f459a441 100644 --- a/ledger/benches/blockstore.rs +++ b/ledger/benches/blockstore.rs @@ -28,13 +28,13 @@ fn bench_write_shreds(bench: &mut Bencher, entries: Vec, ledger_path: &Pa // Insert some shreds into the ledger in preparation for read benchmarks fn setup_read_bench( blockstore: &Blockstore, - num_small_shreds: u64, - num_large_shreds: u64, + num_small_shreds: u32, + num_large_shreds: u32, slot: Slot, ) { // Make some big and small entries let entries = create_ticks( - num_large_shreds * 4 + num_small_shreds * 2, + (num_large_shreds * 4 + num_small_shreds * 2).into(), 0, Hash::default(), ); @@ -117,12 +117,12 @@ fn bench_read_random(bench: &mut Bencher) { // Generate a num_reads sized random sample of indexes in range [0, total_shreds - 1], // simulating random reads let mut rng = rand::thread_rng(); - let indexes: Vec = (0..num_reads) - .map(|_| rng.gen_range(0..total_shreds) as usize) + let indexes: Vec = (0..num_reads) + .map(|_| rng.gen_range(0..total_shreds)) .collect(); bench.iter(move || { for i in indexes.iter() { - let _ = blockstore.get_data_shred(slot, *i as u64); + let _ = blockstore.get_data_shred(slot, *i); } }); } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 21d5418b7c6038..6da8919810097b 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -106,7 +106,7 @@ pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; // An upper bound on maximum number of data shreds we can handle in a slot // 32K shreds would allow ~320K peak TPS // (32K shreds per slot * 4 TX per shred * 2.5 slots per sec) -pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768; +pub const MAX_DATA_SHREDS_PER_SLOT: u32 = 32_768; pub type CompletedSlotsSender = Sender>; pub type CompletedSlotsReceiver = Receiver>; @@ -632,25 +632,41 @@ impl Blockstore { pub fn slot_data_iterator( &self, slot: Slot, - index: u64, - ) -> Result)> + '_> { + index: u32, + ) -> Result)> + '_> { let slot_iterator = self.db.iter::(IteratorMode::From( - (slot, index), + (slot, index.into()), IteratorDirection::Forward, ))?; - Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) + Ok(slot_iterator + .take_while(move |((shred_slot, _), _)| *shred_slot == slot) + .map(|((slot, index), data)| { + ( + slot, + u32::try_from(index).expect("shred index fits into u32"), + data, + ) + })) } pub fn slot_coding_iterator( &self, slot: Slot, - index: u64, - ) -> Result)> + '_> { + index: u32, + ) -> Result)> + '_> { let slot_iterator = self.db.iter::(IteratorMode::From( - (slot, index), + (slot, index.into()), IteratorDirection::Forward, ))?; - Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) + Ok(slot_iterator + .take_while(move |((shred_slot, _), _)| *shred_slot == slot) + .map(|((slot, index), data)| { + ( + slot, + u32::try_from(index).expect("shred index fits into u32"), + data, + ) + })) } fn prepare_rooted_slot_iterator( @@ -718,14 +734,14 @@ impl Blockstore { data_cf: &'a LedgerColumn, ) -> impl Iterator + 'a { erasure_meta.data_shreds_indices().filter_map(move |i| { - let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Data); + let key = ShredId::new(slot, i, ShredType::Data); if let Some(shred) = prev_inserted_shreds.get(&key) { return Some(shred.clone()); } if !index.data().contains(i) { return None; } - match data_cf.get_bytes((slot, i)).unwrap() { + match data_cf.get_bytes((slot, i.into())).unwrap() { None => { error!( "Unable to read the data shred with slot {slot}, index {i} for shred \ @@ -747,14 +763,14 @@ impl Blockstore { code_cf: &'a LedgerColumn, ) -> impl Iterator + 'a { erasure_meta.coding_shreds_indices().filter_map(move |i| { - let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Code); + let key = ShredId::new(slot, i, ShredType::Code); if let Some(shred) = prev_inserted_shreds.get(&key) { return Some(shred.clone()); } if !index.coding().contains(i) { return None; } - match code_cf.get_bytes((slot, i)).unwrap() { + match code_cf.get_bytes((slot, i.into())).unwrap() { None => { error!( "Unable to read the coding shred with slot {slot}, index {i} for shred \ @@ -1393,7 +1409,7 @@ impl Blockstore { metrics: &mut BlockstoreInsertionMetrics, ) -> bool { let slot = shred.slot(); - let shred_index = u64::from(shred.index()); + let shred_index = shred.index(); let index_meta_working_set_entry = self.get_index_meta_entry(slot, index_working_set, index_meta_time_us); @@ -1546,7 +1562,7 @@ impl Blockstore { return Some(Cow::Owned(potential_shred.into_payload())); } } else if let Some(potential_shred) = { - let key = ShredId::new(slot, u32::try_from(coding_index).unwrap(), ShredType::Code); + let key = ShredId::new(slot, coding_index, ShredType::Code); just_received_shreds.get(&key) } { if shred.erasure_mismatch(potential_shred).unwrap() { @@ -1720,7 +1736,7 @@ impl Blockstore { write_batch: &mut WriteBatch, ) -> Result<()> { let slot = shred.slot(); - let shred_index = u64::from(shred.index()); + let shred_index = shred.index(); // Assert guaranteed by integrity checks on the shred that happen before // `insert_coding_shred` is called @@ -1729,16 +1745,16 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, shred_index), shred.payload())?; + write_batch.put_bytes::((slot, shred_index.into()), shred.payload())?; index_meta.coding_mut().insert(shred_index); Ok(()) } fn is_data_shred_present(shred: &Shred, slot_meta: &SlotMeta, data_index: &ShredIndex) -> bool { - let shred_index = u64::from(shred.index()); + let shred_index = shred.index(); // Check that the shred doesn't already exist in blockstore - shred_index < slot_meta.consumed || data_index.contains(shred_index) + u64::from(shred_index) < slot_meta.consumed || data_index.contains(shred_index) } /// Finds the corresponding shred at `shred_id` in the just inserted @@ -1753,14 +1769,8 @@ impl Blockstore { (Some(shred), _) => Some(Cow::Borrowed(shred.payload())), // If it doesn't exist in the just inserted set, it must exist in // the backing store - (_, ShredType::Data) => self - .get_data_shred(slot, u64::from(index)) - .unwrap() - .map(Cow::Owned), - (_, ShredType::Code) => self - .get_coding_shred(slot, u64::from(index)) - .unwrap() - .map(Cow::Owned), + (_, ShredType::Data) => self.get_data_shred(slot, index).unwrap().map(Cow::Owned), + (_, ShredType::Code) => self.get_coding_shred(slot, index).unwrap().map(Cow::Owned), } } @@ -2025,7 +2035,10 @@ impl Blockstore { if !self.has_duplicate_shreds_in_slot(slot) { let shred_id = ShredId::new( slot, - u32::try_from(last_index.unwrap()).unwrap(), + last_index + .unwrap() + .try_into() + .expect("A valid `last_index` fits into u32."), ShredType::Data, ); let Some(ending_shred) = self @@ -2075,7 +2088,8 @@ impl Blockstore { if !self.has_duplicate_shreds_in_slot(slot) { let shred_id = ShredId::new( slot, - u32::try_from(slot_meta.received - 1).unwrap(), + u32::try_from(slot_meta.received - 1) + .expect("`slot_meta.received` is a shred index, so it fits into u32"), ShredType::Data, ); let Some(ending_shred) = self @@ -2149,7 +2163,7 @@ impl Blockstore { shred_source: ShredSource, ) -> Result> { let slot = shred.slot(); - let index = u64::from(shred.index()); + let index = shred.index(); let last_in_slot = if shred.last_in_slot() { debug!("got last in slot"); @@ -2168,7 +2182,7 @@ impl Blockstore { // Parent for slot meta should have been set by this point assert!(!slot_meta.is_orphan()); - let new_consumed = if slot_meta.consumed == index { + let new_consumed = if slot_meta.consumed == u64::from(index) { let mut current_index = index + 1; while data_index.contains(current_index) { @@ -2176,18 +2190,18 @@ impl Blockstore { } current_index } else { - slot_meta.consumed + u32::try_from(slot_meta.consumed).expect("`consumed` is a valid shred index") }; // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, index), shred.bytes_to_store())?; + write_batch.put_bytes::((slot, index.into()), shred.bytes_to_store())?; data_index.insert(index); let newly_completed_data_sets = update_slot_meta( last_in_slot, last_in_data, slot_meta, - index as u32, + index, new_consumed, shred.reference_tick(), data_index, @@ -2217,8 +2231,8 @@ impl Blockstore { Ok(newly_completed_data_sets) } - pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result>> { - let shred = self.data_shred_cf.get_bytes((slot, index))?; + pub fn get_data_shred(&self, slot: Slot, index: u32) -> Result>> { + let shred = self.data_shred_cf.get_bytes((slot, index.into()))?; let shred = shred.map(ShredData::resize_stored_shred).transpose(); shred.map_err(|err| { let err = format!("Invalid stored shred: {err}"); @@ -2227,10 +2241,10 @@ impl Blockstore { }) } - pub fn get_data_shreds_for_slot(&self, slot: Slot, start_index: u64) -> Result> { + pub fn get_data_shreds_for_slot(&self, slot: Slot, start_index: u32) -> Result> { self.slot_data_iterator(slot, start_index) .expect("blockstore couldn't fetch iterator") - .map(|(_, bytes)| { + .map(|(_slot, _shred_index, bytes)| { Shred::new_from_serialized_shred(bytes.to_vec()).map_err(|err| { BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( format!("Could not reconstruct shred from shred payload: {err:?}"), @@ -2244,10 +2258,10 @@ impl Blockstore { fn get_data_shreds( &self, slot: Slot, - from_index: u64, - to_index: u64, + from_index: u32, + to_index: u32, buffer: &mut [u8], - ) -> Result<(u64, usize)> { + ) -> Result<(u32, usize)> { let _lock = self.check_lowest_cleanup_slot(slot)?; let mut buffer_offset = 0; let mut last_index = 0; @@ -2256,7 +2270,7 @@ impl Blockstore { warn!("The slot is not yet full. Will not return any shreds"); return Ok((last_index, buffer_offset)); } - let to_index = cmp::min(to_index, meta.consumed); + let to_index = cmp::min(to_index, u32::try_from(meta.consumed).unwrap()); for index in from_index..to_index { if let Some(shred_data) = self.get_data_shred(slot, index)? { let shred_len = shred_data.len(); @@ -2280,18 +2294,18 @@ impl Blockstore { Ok((last_index, buffer_offset)) } - pub fn get_coding_shred(&self, slot: Slot, index: u64) -> Result>> { - self.code_shred_cf.get_bytes((slot, index)) + pub fn get_coding_shred(&self, slot: Slot, index: u32) -> Result>> { + self.code_shred_cf.get_bytes((slot, index.into())) } pub fn get_coding_shreds_for_slot( &self, slot: Slot, - start_index: u64, + start_index: u32, ) -> std::result::Result, shred::Error> { self.slot_coding_iterator(slot, start_index) .expect("blockstore couldn't fetch iterator") - .map(|code| Shred::new_from_serialized_shred(code.1.to_vec())) + .map(|(_slot, _shred, code)| Shred::new_from_serialized_shred(code.to_vec())) .collect() } @@ -3463,7 +3477,7 @@ impl Blockstore { } /// Returns the entry vector for the slot starting with `shred_start_index` - pub fn get_slot_entries(&self, slot: Slot, shred_start_index: u64) -> Result> { + pub fn get_slot_entries(&self, slot: Slot, shred_start_index: u32) -> Result> { self.get_slot_entries_with_shred_info(slot, shred_start_index, false) .map(|x| x.0) } @@ -3473,9 +3487,9 @@ impl Blockstore { pub fn get_slot_entries_with_shred_info( &self, slot: Slot, - start_index: u64, + start_index: u32, allow_dead_slots: bool, - ) -> Result<(Vec, u64, bool)> { + ) -> Result<(Vec, u32, bool)> { let (completed_ranges, slot_meta) = self.get_completed_ranges(slot, start_index)?; // Check if the slot is dead *after* fetching completed ranges to avoid a race @@ -3491,7 +3505,7 @@ impl Blockstore { let slot_meta = slot_meta.unwrap(); let num_shreds = completed_ranges .last() - .map(|(_, end_index)| u64::from(*end_index) - start_index + 1) + .map(|(_, end)| *end - start_index + 1) .unwrap_or(0); let entries = self.get_slot_entries_in_block(slot, completed_ranges, Some(&slot_meta))?; @@ -3567,7 +3581,7 @@ impl Blockstore { fn get_completed_ranges( &self, slot: Slot, - start_index: u64, + start_index: u32, ) -> Result<(CompletedRanges, Option)> { let slot_meta = self.meta_cf.get(slot)?; if slot_meta.is_none() { @@ -3577,7 +3591,7 @@ impl Blockstore { let slot_meta = slot_meta.unwrap(); // Find all the ranges for the completed data blocks let completed_ranges = Self::get_completed_data_ranges( - start_index as u32, + start_index, &slot_meta.completed_data_indexes, slot_meta.consumed as u32, ); @@ -4009,8 +4023,8 @@ impl Blockstore { pub fn is_shred_duplicate(&self, shred: &Shred) -> Option> { let (slot, index, shred_type) = shred.id().unpack(); let mut other = match shred_type { - ShredType::Data => self.get_data_shred(slot, u64::from(index)), - ShredType::Code => self.get_coding_shred(slot, u64::from(index)), + ShredType::Data => self.get_data_shred(slot, index), + ShredType::Code => self.get_coding_shred(slot, index), } .expect("fetch from DuplicateSlots column family failed")?; if let Ok(signature) = shred.retransmitter_signature() { @@ -4582,7 +4596,7 @@ fn update_completed_data_indexes( shred_indices .windows(2) .filter(|ix| { - let (begin, end) = (ix[0] as u64, ix[1] as u64); + let (begin, end) = (ix[0], ix[1]); let num_shreds = (end - begin) as usize; received_data_shreds.range(begin..end).count() == num_shreds }) @@ -4595,7 +4609,7 @@ fn update_slot_meta( is_last_in_data: bool, slot_meta: &mut SlotMeta, index: u32, - new_consumed: u64, + new_consumed: u32, reference_tick: u8, received_data_shreds: &ShredIndex, ) -> Vec<(u32, u32)> { @@ -4608,11 +4622,11 @@ fn update_slot_meta( let slot_time_elapsed = u64::from(reference_tick) * 1000 / DEFAULT_TICKS_PER_SECOND; slot_meta.first_shred_timestamp = timestamp() - slot_time_elapsed; } - slot_meta.consumed = new_consumed; + slot_meta.consumed = new_consumed.into(); // If the last index in the slot hasn't been set before, then // set it to this shred index if is_last_in_slot && slot_meta.last_index.is_none() { - slot_meta.last_index = Some(u64::from(index)); + slot_meta.last_index = Some(index.into()); } update_completed_data_indexes( is_last_in_slot || is_last_in_data, @@ -5202,7 +5216,7 @@ pub fn test_all_empty_or_min(blockstore: &Blockstore, min_slot: Slot) { pub fn make_many_slot_shreds( start_slot: u64, num_slots: u64, - num_shreds_per_slot: u64, + num_shreds_per_slot: u32, ) -> (Vec, Vec) { // Use `None` as shred_size so the default (full) value is used let num_entries = max_ticks_per_n_shreds(num_shreds_per_slot, None); @@ -5618,7 +5632,7 @@ pub mod tests { fn test_read_shred_bytes() { let slot = 0; let (shreds, _) = make_slot_entries(slot, 0, 100, /*merkle_variant:*/ true); - let num_shreds = shreds.len() as u64; + let num_shreds = shreds.len() as u32; let shred_bufs: Vec<_> = shreds.iter().map(Shred::payload).cloned().collect(); let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -5893,9 +5907,7 @@ pub mod tests { .insert_shreds(shreds, None, false) .expect("Expected successful write of shreds"); assert_eq!( - blockstore - .get_slot_entries(slot, u64::from(index - 1)) - .unwrap(), + blockstore.get_slot_entries(slot, index - 1).unwrap(), vec![last_entry], ); } @@ -7997,7 +8009,7 @@ pub mod tests { // Test that the iterator for slot 8 contains what was inserted earlier let shred_iter = blockstore.slot_data_iterator(8, 0).unwrap(); let result: Vec = shred_iter - .filter_map(|(_, bytes)| Shred::new_from_serialized_shred(bytes.to_vec()).ok()) + .filter_map(|(_, _, bytes)| Shred::new_from_serialized_shred(bytes.to_vec()).ok()) .collect(); assert_eq!(result.len(), slot_8_shreds.len()); assert_eq!(result, slot_8_shreds); @@ -10216,7 +10228,7 @@ pub mod tests { for (s, buf) in data_shreds.iter().zip(shred_bufs) { assert_eq!( blockstore - .get_data_shred(s.slot(), s.index() as u64) + .get_data_shred(s.slot(), s.index()) .unwrap() .unwrap(), buf @@ -10444,7 +10456,7 @@ pub mod tests { let data_iter = blockstore.slot_data_iterator(slot, 0).unwrap(); let mut num_data = 0; - for ((slot, index), _) in data_iter { + for (slot, index, _) in data_iter { num_data += 1; // Test that iterator and individual shred lookup yield same set assert!(blockstore.get_data_shred(slot, index).unwrap().is_some()); @@ -10458,7 +10470,7 @@ pub mod tests { let coding_iter = blockstore.slot_coding_iterator(slot, 0).unwrap(); let mut num_coding = 0; - for ((slot, index), _) in coding_iter { + for (slot, index, _) in coding_iter { num_coding += 1; // Test that the iterator and individual shred lookup yield same set assert!(blockstore.get_coding_shred(slot, index).unwrap().is_some()); @@ -10562,7 +10574,7 @@ pub mod tests { // There are 32 data shreds in slot 9. for index in 0..32 { assert_matches!( - blockstore.get_data_shred(unconfirmed_slot, index as u64), + blockstore.get_data_shred(unconfirmed_slot, index), Ok(Some(_)) ); } @@ -10632,7 +10644,7 @@ pub mod tests { let mut shred_index = ShredIndex::default(); for i in 0..10 { - shred_index.insert(i as u64); + shred_index.insert(i); assert_eq!( update_completed_data_indexes(true, i, &shred_index, &mut completed_data_indexes), vec![(i, i)] @@ -10943,16 +10955,16 @@ pub mod tests { #[test] fn test_duplicate_last_index_mark_dead() { - let num_shreds = 10; - let smaller_last_shred_index = 5; - let larger_last_shred_index = 8; + let num_shreds = 10u32; + let smaller_last_shred_index = 5u32; + let larger_last_shred_index = 8u32; let setup_test_shreds = |slot: Slot| -> Vec { let num_entries = max_ticks_per_n_shreds(num_shreds, Some(LEGACY_SHRED_DATA_CAPACITY)); let (mut shreds, _) = make_slot_entries(slot, 0, num_entries, /*merkle_variant:*/ false); - shreds[smaller_last_shred_index].set_last_in_slot(); - shreds[larger_last_shred_index].set_last_in_slot(); + shreds[smaller_last_shred_index as usize].set_last_in_slot(); + shreds[larger_last_shred_index as usize].set_last_in_slot(); shreds }; @@ -10965,7 +10977,7 @@ pub mod tests { let meta = blockstore.meta(slot).unwrap().unwrap(); assert_eq!(meta.consumed, shreds.len() as u64); let shreds_index = blockstore.get_index(slot).unwrap().unwrap(); - for i in 0..shreds.len() as u64 { + for i in 0..shreds.len() as u32 { assert!(shreds_index.data().contains(i)); } @@ -10990,7 +11002,7 @@ pub mod tests { // the "last" index shred itself is inserted. let (expected_slot_meta, expected_index) = get_expected_slot_meta_and_index_meta( &blockstore, - shreds[..=smaller_last_shred_index].to_vec(), + shreds[..=smaller_last_shred_index as usize].to_vec(), ); blockstore .insert_shreds(shreds.clone(), None, false) @@ -10998,7 +11010,7 @@ pub mod tests { assert!(blockstore.get_duplicate_slot(slot).is_some()); assert!(!blockstore.is_dead(slot)); for i in 0..num_shreds { - if i <= smaller_last_shred_index as u64 { + if i <= smaller_last_shred_index { assert_eq!( blockstore.get_data_shred(slot, i).unwrap().unwrap(), *shreds[i as usize].payload() @@ -11015,7 +11027,9 @@ pub mod tests { // Case 2: Inserting a duplicate with an even smaller last shred index should not // mark the slot as dead since the Slotmeta is full. let even_smaller_last_shred_duplicate = { - let mut payload = shreds[smaller_last_shred_index - 1].payload().clone(); + let mut payload = shreds[smaller_last_shred_index as usize - 1] + .payload() + .clone(); // Flip a byte to create a duplicate shred payload[0] = u8::MAX - payload[0]; let mut shred = Shred::new_from_serialized_shred(payload).unwrap(); @@ -11030,7 +11044,7 @@ pub mod tests { .unwrap(); assert!(!blockstore.is_dead(slot)); for i in 0..num_shreds { - if i <= smaller_last_shred_index as u64 { + if i <= smaller_last_shred_index { assert_eq!( blockstore.get_data_shred(slot, i).unwrap().unwrap(), *shreds[i as usize].payload() @@ -11060,10 +11074,8 @@ pub mod tests { // inserted into. for i in 0..num_shreds { let shred_to_check = &shreds[i as usize]; - let shred_index = shred_to_check.index() as u64; - if shred_index != smaller_last_shred_index as u64 - && shred_index != larger_last_shred_index as u64 - { + let shred_index = shred_to_check.index(); + if shred_index != smaller_last_shred_index && shred_index != larger_last_shred_index { assert_eq!( blockstore .get_data_shred(slot, shred_index) @@ -11091,10 +11103,8 @@ pub mod tests { // All the shreds will be inserted since dead slots can still be inserted into. for i in 0..num_shreds { let shred_to_check = &shreds[i as usize]; - let shred_index = shred_to_check.index() as u64; - if shred_index != smaller_last_shred_index as u64 - && shred_index != larger_last_shred_index as u64 - { + let shred_index = shred_to_check.index(); + if shred_index != smaller_last_shred_index && shred_index != larger_last_shred_index { assert_eq!( blockstore .get_data_shred(slot, shred_index) @@ -11969,7 +11979,7 @@ pub mod tests { let total_shreds = fec_set_index as u64 + data_shreds.len() as u64; // FEC set should be padded - assert_eq!(data_shreds.len(), DATA_SHREDS_PER_FEC_BLOCK); + assert_eq!(data_shreds.len(), DATA_SHREDS_PER_FEC_BLOCK as usize); // Missing slot meta assert_matches!( @@ -11980,7 +11990,7 @@ pub mod tests { // Incomplete slot blockstore .insert_shreds( - data_shreds[0..DATA_SHREDS_PER_FEC_BLOCK - 1].to_vec(), + data_shreds[0..(DATA_SHREDS_PER_FEC_BLOCK as usize) - 1].to_vec(), None, false, ) @@ -12037,7 +12047,7 @@ pub mod tests { ); let last_index = last_data_shreds.last().unwrap().index(); let total_shreds = first_data_shreds.len() + last_data_shreds.len(); - assert!(total_shreds < DATA_SHREDS_PER_FEC_BLOCK); + assert!(total_shreds < DATA_SHREDS_PER_FEC_BLOCK as usize); blockstore .insert_shreds(first_data_shreds, None, false) .unwrap(); @@ -12077,8 +12087,8 @@ pub mod tests { ); let last_index = last_data_shreds.last().unwrap().index(); let total_shreds = first_data_shreds.len() + last_data_shreds.len(); - assert!(last_data_shreds.len() < DATA_SHREDS_PER_FEC_BLOCK); - assert!(total_shreds > DATA_SHREDS_PER_FEC_BLOCK); + assert!(last_data_shreds.len() < DATA_SHREDS_PER_FEC_BLOCK as usize); + assert!(total_shreds > DATA_SHREDS_PER_FEC_BLOCK as usize); blockstore .insert_shreds(first_data_shreds, None, false) .unwrap(); @@ -12106,7 +12116,7 @@ pub mod tests { None, true, ); - assert!(first_data_shreds.len() > DATA_SHREDS_PER_FEC_BLOCK); + assert!(first_data_shreds.len() > DATA_SHREDS_PER_FEC_BLOCK as usize); let block_id = first_data_shreds[0].merkle_root().unwrap(); blockstore .insert_shreds(first_data_shreds, None, false) diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 00eea6f811ebcb..fc18e9295ae80b 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -271,14 +271,16 @@ pub mod columns { #[derive(Debug)] /// The shred data column /// - /// * index type: `(u64, u64)` + /// * index type: `(Slot, u64)` + /// Second element is the shred index and should be `u32`. /// * value type: [`Vec`] pub struct ShredData; #[derive(Debug)] /// The shred erasure code column /// - /// * index type: `(u64, u64)` + /// * index type: `(Slot, u64)` + /// Second element is the shred index and should be `u32`. /// * value type: [`Vec`] pub struct ShredCode; diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index f3940fe618444d..fd708d29149469 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -8,7 +8,7 @@ use { }, std::{ collections::BTreeSet, - ops::{Range, RangeBounds}, + ops::{Bound, Range, RangeBounds}, }, }; @@ -51,6 +51,15 @@ impl Default for ConnectedFlags { #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] /// The Meta column family +// TODO A number of fields in this struct represent shred indices, using `u64` which is incorrect. +// All shred indices must be `u32`. As already encode in the types of `completed_data_indexes`, as +// well as in the `ShredCommonHeader::index`. As some shred index values are correctly typed as +// `u32` and some are incorrectly types as `u64` there is a number of back and forth conversions +// sprinkled around the codebase. With call to `unwrap()` in case the value does not fit when +// narrowed. +// +// It would be much better to change all shred indices to be `u32`. But it would require some data +// migration. pub struct SlotMeta { /// The number of slots above the root (the genesis block). The first /// slot has slot 0. @@ -58,15 +67,21 @@ pub struct SlotMeta { /// The total number of consecutive shreds starting from index 0 we have received for this slot. /// At the same time, it is also an index of the first missing shred for this slot, while the /// slot is incomplete. + // TODO This should be `u32` as it is used as a shred index in the code. pub consumed: u64, /// The index *plus one* of the highest shred received for this slot. Useful /// for checking if the slot has received any shreds yet, and to calculate the /// range where there is one or more holes: `(consumed..received)`. + // TODO This should be `u32` as it is used as a shred index in the code. pub received: u64, /// The timestamp of the first time a shred was added for this slot pub first_shred_timestamp: u64, /// The index of the shred that is flagged as the last shred for this slot. /// None until the shred with LAST_SHRED_IN_SLOT flag is received. + // TODO This should be `Option`, it is used as a shred index in the code. Changing this + // type should not be very hard - just create a new serialization proxy, similar to + // `serde_compat` below, but for `Option` that still uses 8 bytes, with `u64::MAX` for + // `None`. #[serde(with = "serde_compat")] pub last_index: Option, /// The slot height of the block this one derives from. @@ -115,6 +130,9 @@ pub struct Index { #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] pub struct ShredIndex { /// Map representing presence/absence of shreds + // TODO This should have been `BTreeSet` as shred indices can not exceed a `u32`. But + // changing this type would require data migration in the blockstore. An exercise for the + // future. index: BTreeSet, } @@ -250,24 +268,42 @@ impl Index { } } +// To be replaced by `range.map(Into::into)` when `#![feature(bound_map)]` is stabilized. +// +// https://github.com/rust-lang/rust/issues/86026 +fn bound_into(from: Bound) -> Bound +where + F: Into, +{ + match from { + Bound::Included(x) => Bound::Included(x.into()), + Bound::Excluded(x) => Bound::Excluded(x.into()), + Bound::Unbounded => Bound::Unbounded, + } +} + impl ShredIndex { pub fn num_shreds(&self) -> usize { self.index.len() } - pub(crate) fn range(&self, bounds: R) -> impl Iterator + pub(crate) fn range(&self, bounds: R) -> impl Iterator + '_ where - R: RangeBounds, + R: RangeBounds, { - self.index.range(bounds) + let range = ( + bound_into::(bounds.start_bound().cloned()), + bound_into::(bounds.end_bound().cloned()), + ); + self.index.range(range).map(|i| u32::try_from(*i).unwrap()) } - pub(crate) fn contains(&self, index: u64) -> bool { - self.index.contains(&index) + pub(crate) fn contains(&self, index: u32) -> bool { + self.index.contains(&u64::from(index)) } - pub(crate) fn insert(&mut self, index: u64) { - self.index.insert(index); + pub(crate) fn insert(&mut self, index: u32) { + self.index.insert(u64::from(index)); } } @@ -419,15 +455,16 @@ impl ErasureMeta { self.config } - pub(crate) fn data_shreds_indices(&self) -> Range { - let num_data = self.config.num_data as u64; - let fec_set_index = u64::from(self.fec_set_index); + pub(crate) fn data_shreds_indices(&self) -> Range { + let num_data = self.config.num_data as u32; + let fec_set_index = self.fec_set_index; fec_set_index..fec_set_index + num_data } - pub(crate) fn coding_shreds_indices(&self) -> Range { - let num_coding = self.config.num_coding as u64; - self.first_coding_index..self.first_coding_index + num_coding + pub(crate) fn coding_shreds_indices(&self) -> Range { + let num_coding = self.config.num_coding as u32; + let first_coding_index = self.first_coding_index as u32; + first_coding_index..first_coding_index + num_coding } pub(crate) fn first_received_coding_shred_index(&self) -> Option { @@ -617,8 +654,8 @@ mod test { let mut rng = thread_rng(); let mut index = Index::new(0); - let data_indexes = 0..erasure_config.num_data as u64; - let coding_indexes = 0..erasure_config.num_coding as u64; + let data_indexes = 0..erasure_config.num_data as u32; + let coding_indexes = 0..erasure_config.num_coding as u32; assert_eq!(e_meta.status(&index), StillNeed(erasure_config.num_data)); @@ -637,7 +674,7 @@ mod test { .collect::>() .choose_multiple(&mut rng, erasure_config.num_data) { - index.data_mut().index.remove(&idx); + index.data_mut().index.remove(&idx.into()); assert_eq!(e_meta.status(&index), CanRecover); } @@ -650,7 +687,7 @@ mod test { .collect::>() .choose_multiple(&mut rng, erasure_config.num_coding) { - index.coding_mut().index.remove(&idx); + index.coding_mut().index.remove(&idx.into()); assert_eq!(e_meta.status(&index), DataFull); } diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 24ff5ce8fb7baa..1085a05c78457e 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1268,7 +1268,7 @@ impl ReplaySlotStats { slot: Slot, num_txs: usize, num_entries: usize, - num_shreds: u64, + num_shreds: u32, bank_complete_time_us: u64, is_unified_scheduler_enabled: bool, ) { @@ -1389,7 +1389,7 @@ impl ReplaySlotStats { pub struct ConfirmationProgress { pub last_entry: Hash, pub tick_hash_count: u64, - pub num_shreds: u64, + pub num_shreds: u32, pub num_entries: usize, pub num_txs: usize, } @@ -1455,7 +1455,7 @@ pub fn confirm_slot( fn confirm_slot_entries( bank: &BankWithScheduler, replay_tx_thread_pool: &ThreadPool, - slot_entries_load_result: (Vec, u64, bool), + slot_entries_load_result: (Vec, u32, bool), timing: &mut ConfirmationTiming, progress: &mut ConfirmationProgress, skip_verification: bool, diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 814ec2b5bf303a..6d988605964944 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -110,7 +110,7 @@ const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; // data shreds per each batch as below. The actual number of data shreds in // each erasure batch depends on the number of shreds obtained from serializing // a &[Entry]. -pub const DATA_SHREDS_PER_FEC_BLOCK: usize = 32; +pub const DATA_SHREDS_PER_FEC_BLOCK: u32 = 32; // For legacy tests and benchmarks. const_assert_eq!(LEGACY_SHRED_DATA_CAPACITY, 1051); @@ -159,6 +159,8 @@ pub enum Error { InvalidShredFlags(u8), #[error("Invalid {0:?} shred index: {1}")] InvalidShredIndex(ShredType, /*shred index:*/ u32), + #[error("Shred index exceeds u32")] + ShredIndexExceeded, #[error("Invalid shred type")] InvalidShredType, #[error("Invalid shred variant")] @@ -354,7 +356,7 @@ impl Shred { // Like Shred::erasure_shard but returning a slice. dispatch!(pub(crate) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); // Returns the shard index within the erasure coding set. - dispatch!(pub(crate) fn erasure_shard_index(&self) -> Result); + dispatch!(pub(crate) fn erasure_shard_index(&self) -> Result); dispatch!(pub(crate) fn retransmitter_signature(&self) -> Result); dispatch!(pub fn into_payload(self) -> Vec); @@ -1176,7 +1178,7 @@ pub fn should_discard_shred( }; match ShredType::from(shred_variant) { ShredType::Code => { - if index >= shred_code::MAX_CODE_SHREDS_PER_SLOT as u32 { + if index >= shred_code::MAX_CODE_SHREDS_PER_SLOT { stats.index_out_of_bounds += 1; return true; } @@ -1186,7 +1188,7 @@ pub fn should_discard_shred( } } ShredType::Data => { - if index >= MAX_DATA_SHREDS_PER_SLOT as u32 { + if index >= MAX_DATA_SHREDS_PER_SLOT { stats.index_out_of_bounds += 1; return true; } @@ -1232,14 +1234,14 @@ pub fn should_discard_shred( false } -pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option) -> u64 { +pub fn max_ticks_per_n_shreds(num_shreds: u32, shred_data_size: Option) -> u64 { let ticks = create_ticks(1, 0, Hash::default()); max_entries_per_n_shred(&ticks[0], num_shreds, shred_data_size) } pub fn max_entries_per_n_shred( entry: &Entry, - num_shreds: u64, + num_shreds: u32, shred_data_size: Option, ) -> u64 { // Default 32:32 erasure batches yields 64 shreds; log2(64) = 6. @@ -1252,7 +1254,7 @@ pub fn max_entries_per_n_shred( let entry_size = bincode::serialized_size(entry).unwrap(); let count_size = vec_size - entry_size; - (shred_data_size * num_shreds - count_size) / entry_size + (shred_data_size * u64::from(num_shreds) - count_size) / entry_size } pub fn verify_test_data_shred( @@ -1656,7 +1658,7 @@ mod tests { assert_eq!(stats.slot_out_of_range, 1); } { - let index = u32::try_from(MAX_CODE_SHREDS_PER_SLOT).unwrap(); + let index = MAX_CODE_SHREDS_PER_SLOT; { let mut cursor = Cursor::new(packet.buffer_mut()); cursor diff --git a/ledger/src/shred/legacy.rs b/ledger/src/shred/legacy.rs index ad2e59fc4dd7e3..6626e29b44ec7a 100644 --- a/ledger/src/shred/legacy.rs +++ b/ledger/src/shred/legacy.rs @@ -80,7 +80,7 @@ impl<'a> Shred<'a> for ShredData { shred.sanitize().map(|_| shred) } - fn erasure_shard_index(&self) -> Result { + fn erasure_shard_index(&self) -> Result { shred_data::erasure_shard_index(self).ok_or_else(|| { let headers = Box::new((self.common_header, self.data_header)); Error::InvalidErasureShardIndex(headers) @@ -142,7 +142,7 @@ impl<'a> Shred<'a> for ShredCode { shred.sanitize().map(|_| shred) } - fn erasure_shard_index(&self) -> Result { + fn erasure_shard_index(&self) -> Result { shred_code::erasure_shard_index(self).ok_or_else(|| { let headers = Box::new((self.common_header, self.coding_header)); Error::InvalidErasureShardIndex(headers) @@ -377,7 +377,7 @@ mod test { } { let mut shred = shred.clone(); - shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32; + shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT; assert_matches!( shred.sanitize(), Err(Error::InvalidShredIndex(ShredType::Data, 32768)) @@ -438,7 +438,7 @@ mod test { } { let mut shred = shred.clone(); - shred.common_header.index = MAX_CODE_SHREDS_PER_SLOT as u32; + shred.common_header.index = MAX_CODE_SHREDS_PER_SLOT; assert_matches!( shred.sanitize(), Err(Error::InvalidShredIndex(ShredType::Code, 32_768)) @@ -458,11 +458,11 @@ mod test { // shred has index > u32::MAX should fail. { let mut shred = shred.clone(); - shred.common_header.fec_set_index = MAX_DATA_SHREDS_PER_SLOT as u32 - 2; + shred.common_header.fec_set_index = MAX_DATA_SHREDS_PER_SLOT - 2; shred.coding_header.num_data_shreds = 3; shred.coding_header.num_coding_shreds = 4; shred.coding_header.position = 1; - shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32 - 2; + shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT - 2; assert_matches!( shred.sanitize(), Err(Error::InvalidErasureShardIndex { .. }) diff --git a/ledger/src/shred/merkle.rs b/ledger/src/shred/merkle.rs index e569999ba3d82f..680ea5c418a86b 100644 --- a/ledger/src/shred/merkle.rs +++ b/ledger/src/shred/merkle.rs @@ -16,7 +16,7 @@ use { shredder::{self, ReedSolomonCache}, }, assert_matches::debug_assert_matches, - itertools::{Either, Itertools}, + itertools::{izip, Either, Itertools}, rayon::{prelude::*, ThreadPool}, reed_solomon_erasure::Error::{InvalidIndex, TooFewParityShards, TooFewShards}, solana_perf::packet::deserialize_from_with_limit, @@ -86,7 +86,7 @@ pub(super) enum Shred { impl Shred { dispatch!(fn common_header(&self) -> &ShredCommonHeader); dispatch!(fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); - dispatch!(fn erasure_shard_index(&self) -> Result); + dispatch!(fn erasure_shard_index(&self) -> Result); dispatch!(fn merkle_node(&self) -> Result); dispatch!(fn payload(&self) -> &Vec); dispatch!(fn sanitize(&self) -> Result<(), Error>); @@ -206,10 +206,7 @@ impl ShredData { let fec_set_index = <[u8; 4]>::try_from(shred.get(79..83)?) .map(u32::from_le_bytes) .ok()?; - shred::layout::get_index(shred)? - .checked_sub(fec_set_index) - .map(usize::try_from)? - .ok()? + shred::layout::get_index(shred)?.checked_sub(fec_set_index)? }; let proof_offset = Self::get_proof_offset(proof_size, chained, resigned).ok()?; let proof = get_merkle_proof(shred, proof_offset, proof_size).ok()?; @@ -281,11 +278,11 @@ impl ShredCode { let index = { let num_data_shreds = <[u8; 2]>::try_from(shred.get(83..85)?) .map(u16::from_le_bytes) - .map(usize::from) + .map(u32::from) .ok()?; let position = <[u8; 2]>::try_from(shred.get(87..89)?) .map(u16::from_le_bytes) - .map(usize::from) + .map(u32::from) .ok()?; num_data_shreds.checked_add(position)? }; @@ -509,7 +506,7 @@ impl<'a> ShredTrait<'a> for ShredData { Ok(shred) } - fn erasure_shard_index(&self) -> Result { + fn erasure_shard_index(&self) -> Result { shred_data::erasure_shard_index(self).ok_or_else(|| { let headers = Box::new((self.common_header, self.data_header)); Error::InvalidErasureShardIndex(headers) @@ -595,7 +592,7 @@ impl<'a> ShredTrait<'a> for ShredCode { Ok(shred) } - fn erasure_shard_index(&self) -> Result { + fn erasure_shard_index(&self) -> Result { shred_code::erasure_shard_index(self).ok_or_else(|| { let headers = Box::new((self.common_header, self.coding_header)); Error::InvalidErasureShardIndex(headers) @@ -699,7 +696,7 @@ fn join_nodes, T: AsRef<[u8]>>(node: S, other: T) -> Hash { // Recovers root of the merkle tree from a leaf node // at the given index and the respective proof. -fn get_merkle_root<'a, I>(index: usize, node: Hash, proof: I) -> Result +fn get_merkle_root<'a, I>(index: u32, node: Hash, proof: I) -> Result where I: IntoIterator, { @@ -755,8 +752,8 @@ fn make_merkle_tree(mut nodes: Vec) -> Vec { } fn make_merkle_proof( - mut index: usize, // leaf index ~ shred's erasure shard index. - mut size: usize, // number of leaves ~ erasure batch size. + mut index: u32, // leaf index ~ shred's erasure shard index. + mut size: u32, // number of leaves ~ erasure batch size. tree: &[Hash], ) -> Option> { if index >= size { @@ -765,14 +762,15 @@ fn make_merkle_proof( let mut offset = 0; let mut proof = Vec::<&MerkleProofEntry>::new(); while size > 1 { - let node = tree.get(offset + (index ^ 1).min(size - 1))?; + let node_position = offset + (index ^ 1).min(size - 1); + let node = tree.get(node_position as usize)?; let entry = &node.as_ref()[..SIZE_OF_MERKLE_PROOF_ENTRY]; proof.push(<&MerkleProofEntry>::try_from(entry).unwrap()); offset += size; size = (size + 1) >> 1; index >>= 1; } - (offset + 1 == tree.len()).then_some(proof) + (offset as usize + 1 == tree.len()).then_some(proof) } pub(super) fn recover( @@ -861,18 +859,18 @@ pub(super) fn recover( } } })); - let num_data_shreds = usize::from(coding_header.num_data_shreds); - let num_coding_shreds = usize::from(coding_header.num_coding_shreds); + let num_data_shreds = u32::from(coding_header.num_data_shreds); + let num_coding_shreds = u32::from(coding_header.num_coding_shreds); let num_shards = num_data_shreds + num_coding_shreds; // Obtain erasure encoded shards from shreds. let shreds = { - let mut batch = vec![None; num_shards]; + let mut batch = vec![None; num_shards as usize]; while let Some(shred) = shreds.pop() { let index = match shred.erasure_shard_index() { - Ok(index) if index < batch.len() => index, + Ok(index) if (index as usize) < batch.len() => index, _ => return Err(Error::from(InvalidIndex)), }; - batch[index] = Some(shred); + batch[index as usize] = Some(shred); } batch }; @@ -888,8 +886,8 @@ pub(super) fn recover( let mut shreds: Vec<_> = shreds .into_iter() .zip(shards) - .enumerate() - .map(|(index, (shred, shard))| { + .zip(0..) + .map(|((shred, shard), index)| { if let Some(shred) = shred { return Ok(shred); } @@ -929,7 +927,7 @@ pub(super) fn recover( ..coding_header }; let common_header = ShredCommonHeader { - index: common_header.index + offset as u32, + index: common_header.index + offset, ..common_header }; let shred = ShredCode::from_recovered_shard( @@ -949,7 +947,7 @@ pub(super) fn recover( .map(Shred::merkle_node) .collect::>()?; let tree = make_merkle_tree(nodes); - for (index, (shred, mask)) in shreds.iter_mut().zip(&mask).enumerate() { + for (index, shred, mask) in izip!(0.., shreds.iter_mut(), &mask) { let proof = make_merkle_proof(index, num_shards, &tree).ok_or(Error::InvalidMerkleProof)?; if proof.len() != usize::from(proof_size) { return Err(Error::InvalidMerkleProof); @@ -978,8 +976,8 @@ pub(super) fn recover( } // Maps number of (code + data) shreds to merkle_proof.len(). -fn get_proof_size(num_shreds: usize) -> u8 { - let bits = usize::BITS - num_shreds.leading_zeros(); +fn get_proof_size(num_shreds: u32) -> u8 { + let bits = u32::BITS - num_shreds.leading_zeros(); let proof_size = if num_shreds.is_power_of_two() { bits.checked_sub(1).unwrap() } else { @@ -1027,7 +1025,7 @@ pub(super) fn make_shreds_from_data( shredder::get_erasure_batch_size(DATA_SHREDS_PER_FEC_BLOCK, is_last_in_slot); let proof_size = get_proof_size(erasure_batch_size); let data_buffer_size = ShredData::capacity(proof_size, chained, resigned)?; - let chunk_size = DATA_SHREDS_PER_FEC_BLOCK * data_buffer_size; + let chunk_size = DATA_SHREDS_PER_FEC_BLOCK as usize * data_buffer_size; let mut common_header = ShredCommonHeader { signature: Signature::default(), shred_variant: ShredVariant::MerkleData { @@ -1086,7 +1084,8 @@ pub(super) fn make_shreds_from_data( let (proof_size, data_buffer_size, num_data_shreds) = (1u8..32) .find_map(|proof_size| { let data_buffer_size = ShredData::capacity(proof_size, chained, resigned).ok()?; - let num_data_shreds = (data.len() + data_buffer_size - 1) / data_buffer_size; + let num_data_shreds = + u32::try_from((data.len() + data_buffer_size - 1) / data_buffer_size).ok()?; let num_data_shreds = num_data_shreds.max(min_num_data_shreds); let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot); @@ -1106,7 +1105,7 @@ pub(super) fn make_shreds_from_data( for shred in data .chunks(data_buffer_size) .chain(std::iter::repeat(&[][..])) - .take(num_data_shreds) + .take(num_data_shreds as usize) { let shred = new_shred_data(common_header, data_header, shred); shreds.push(shred); @@ -1158,11 +1157,11 @@ pub(super) fn make_shreds_from_data( .iter() .scan(next_code_index, |next_code_index, chunk| { let out = Some(*next_code_index); - let num_data_shreds = chunk.len(); + let num_data_shreds = u32::try_from(chunk.len()).ok()?; let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot); let num_coding_shreds = erasure_batch_size - num_data_shreds; - *next_code_index += num_coding_shreds as u32; + *next_code_index += num_coding_shreds; out }) .collect(); @@ -1242,7 +1241,7 @@ fn make_erasure_batch( is_last_in_slot: bool, reed_solomon_cache: &ReedSolomonCache, ) -> Result<(/*merkle root:*/ Hash, Vec), Error> { - let num_data_shreds = shreds.len(); + let num_data_shreds = u32::try_from(shreds.len()).map_err(|_| Error::ShredIndexExceeded)?; let chained = chained_merkle_root.is_some(); let resigned = chained && is_last_in_slot; let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot); @@ -1270,7 +1269,7 @@ fn make_erasure_batch( .collect::>()?; // Shreds should have erasure encoded shard of the same length. debug_assert_eq!(data.iter().map(|shard| shard.len()).dedup().count(), 1); - let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds]; + let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds as usize]; reed_solomon_cache .get(num_data_shreds, num_coding_shreds)? .encode_sep(&data, &mut parity[..])?; @@ -1316,7 +1315,7 @@ fn make_erasure_batch( let root = tree.last().ok_or(Error::InvalidMerkleProof)?; let signature = keypair.sign_message(root.as_ref()); // Populate merkle proof for all shreds and attach signature. - for (index, shred) in shreds.iter_mut().enumerate() { + for (index, shred) in izip!(0.., &mut shreds) { let proof = make_merkle_proof(index, erasure_batch_size, &tree).ok_or(Error::InvalidMerkleProof)?; debug_assert_eq!(proof.len(), usize::from(proof_size)); @@ -1429,14 +1428,14 @@ mod test { assert_eq!(entry, &bytes[..SIZE_OF_MERKLE_PROOF_ENTRY]); } - fn run_merkle_tree_round_trip(rng: &mut R, size: usize) { + fn run_merkle_tree_round_trip(rng: &mut R, size: u32) { let nodes = repeat_with(|| rng.gen::<[u8; 32]>()).map(Hash::from); - let nodes: Vec<_> = nodes.take(size).collect(); + let nodes: Vec<_> = nodes.take(size as usize).collect(); let tree = make_merkle_tree(nodes.clone()); let root = tree.last().copied().unwrap(); for index in 0..size { let proof = make_merkle_proof(index, size, &tree).unwrap(); - for (k, &node) in nodes.iter().enumerate() { + for (k, &node) in izip!(0.., &nodes) { let proof = proof.iter().copied(); if k == index { assert_eq!(root, get_merkle_root(k, node, proof).unwrap()); @@ -1464,7 +1463,7 @@ mod test { #[test_case(73, false, false)] #[test_case(73, true, false)] #[test_case(73, true, true)] - fn test_recover_merkle_shreds(num_shreds: usize, chained: bool, resigned: bool) { + fn test_recover_merkle_shreds(num_shreds: u32, chained: bool, resigned: bool) { let mut rng = rand::thread_rng(); let reed_solomon_cache = ReedSolomonCache::default(); for num_data_shreds in 1..num_shreds { @@ -1484,8 +1483,8 @@ mod test { rng: &mut R, chained: bool, resigned: bool, - num_data_shreds: usize, - num_coding_shreds: usize, + num_data_shreds: u32, + num_coding_shreds: u32, reed_solomon_cache: &ReedSolomonCache, ) { let keypair = Keypair::new(); @@ -1517,10 +1516,10 @@ mod test { num_coding_shreds: num_coding_shreds as u16, position: 0, }; - let mut shreds = Vec::with_capacity(num_shreds); + let mut shreds = Vec::with_capacity(num_shreds as usize); for i in 0..num_data_shreds { let common_header = ShredCommonHeader { - index: common_header.index + i as u32, + index: common_header.index + i, ..common_header }; let size = ShredData::SIZE_OF_HEADERS + rng.gen_range(0..capacity); @@ -1545,20 +1544,20 @@ mod test { .map(Shred::erasure_shard_as_slice) .collect::>() .unwrap(); - let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds]; + let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds as usize]; reed_solomon_cache .get(num_data_shreds, num_coding_shreds) .unwrap() .encode_sep(&data, &mut parity[..]) .unwrap(); - for (i, code) in parity.into_iter().enumerate() { + for (i, code) in izip!(0.., parity) { let common_header = ShredCommonHeader { shred_variant: ShredVariant::MerkleCode { proof_size, chained, resigned, }, - index: common_header.index + i as u32 + 7, + index: common_header.index + i + 7, ..common_header }; let coding_header = CodingShredHeader { @@ -1584,7 +1583,7 @@ mod test { .collect::>() .unwrap(); let tree = make_merkle_tree(nodes); - for (index, shred) in shreds.iter_mut().enumerate() { + for (index, shred) in izip!(0.., &mut shreds) { let proof = make_merkle_proof(index, num_shreds, &tree).unwrap(); assert_eq!(proof.len(), usize::from(proof_size)); shred.set_merkle_proof(&proof).unwrap(); @@ -1660,10 +1659,10 @@ mod test { assert_eq!(get_proof_size(63), 6); assert_eq!(get_proof_size(64), 6); assert_eq!(get_proof_size(65), 7); - assert_eq!(get_proof_size(usize::MAX - 1), 64); - assert_eq!(get_proof_size(usize::MAX), 64); + assert_eq!(get_proof_size(u32::MAX - 1), 32); + assert_eq!(get_proof_size(u32::MAX), 32); for proof_size in 1u8..9 { - let max_num_shreds = 1usize << u32::from(proof_size); + let max_num_shreds = 1u32 << proof_size; let min_num_shreds = (max_num_shreds >> 1) + 1; for num_shreds in min_num_shreds..=max_num_shreds { assert_eq!(get_proof_size(num_shreds), proof_size); diff --git a/ledger/src/shred/shred_code.rs b/ledger/src/shred/shred_code.rs index f1625c132256e7..2ffbf719be8aa2 100644 --- a/ledger/src/shred/shred_code.rs +++ b/ledger/src/shred/shred_code.rs @@ -11,7 +11,7 @@ use { }; const_assert_eq!(MAX_CODE_SHREDS_PER_SLOT, 32_768); -pub const MAX_CODE_SHREDS_PER_SLOT: usize = MAX_DATA_SHREDS_PER_SLOT; +pub const MAX_CODE_SHREDS_PER_SLOT: u32 = MAX_DATA_SHREDS_PER_SLOT; const_assert_eq!(ShredCode::SIZE_OF_PAYLOAD, 1228); @@ -29,7 +29,7 @@ impl ShredCode { dispatch!(pub(super) fn common_header(&self) -> &ShredCommonHeader); dispatch!(pub(super) fn erasure_shard(self) -> Result, Error>); dispatch!(pub(super) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); - dispatch!(pub(super) fn erasure_shard_index(&self) -> Result); + dispatch!(pub(super) fn erasure_shard_index(&self) -> Result); dispatch!(pub(super) fn first_coding_index(&self) -> Option); dispatch!(pub(super) fn into_payload(self) -> Vec); dispatch!(pub(super) fn payload(&self) -> &Vec); @@ -128,28 +128,28 @@ impl From for ShredCode { } #[inline] -pub(super) fn erasure_shard_index(shred: &T) -> Option { +pub(super) fn erasure_shard_index(shred: &T) -> Option { // Assert that the last shred index in the erasure set does not // overshoot MAX_{DATA,CODE}_SHREDS_PER_SLOT. let common_header = shred.common_header(); let coding_header = shred.coding_header(); if common_header .fec_set_index - .checked_add(u32::from(coding_header.num_data_shreds.checked_sub(1)?))? as usize + .checked_add(coding_header.num_data_shreds.checked_sub(1)?.into())? >= MAX_DATA_SHREDS_PER_SLOT { return None; } if shred .first_coding_index()? - .checked_add(u32::from(coding_header.num_coding_shreds.checked_sub(1)?))? as usize + .checked_add(coding_header.num_coding_shreds.checked_sub(1)?.into())? >= MAX_CODE_SHREDS_PER_SLOT { return None; } - let num_data_shreds = usize::from(coding_header.num_data_shreds); - let num_coding_shreds = usize::from(coding_header.num_coding_shreds); - let position = usize::from(coding_header.position); + let num_data_shreds = u32::from(coding_header.num_data_shreds); + let num_coding_shreds = u32::from(coding_header.num_coding_shreds); + let position = u32::from(coding_header.position); let fec_set_size = num_data_shreds.checked_add(num_coding_shreds)?; let index = position.checked_add(num_data_shreds)?; (index < fec_set_size).then_some(index) @@ -161,13 +161,13 @@ pub(super) fn sanitize(shred: &T) -> Result<(), Error> { } let common_header = shred.common_header(); let coding_header = shred.coding_header(); - if common_header.index as usize >= MAX_CODE_SHREDS_PER_SLOT { + if common_header.index >= MAX_CODE_SHREDS_PER_SLOT { return Err(Error::InvalidShredIndex( ShredType::Code, common_header.index, )); } - let num_coding_shreds = usize::from(coding_header.num_coding_shreds); + let num_coding_shreds = u32::from(coding_header.num_coding_shreds); if num_coding_shreds > 8 * DATA_SHREDS_PER_FEC_BLOCK { return Err(Error::InvalidNumCodingShreds( coding_header.num_coding_shreds, diff --git a/ledger/src/shred/shred_data.rs b/ledger/src/shred/shred_data.rs index 3aa97ebde9e4e1..02ab1a0e11d88b 100644 --- a/ledger/src/shred/shred_data.rs +++ b/ledger/src/shred/shred_data.rs @@ -23,7 +23,7 @@ impl ShredData { dispatch!(pub(super) fn data(&self) -> Result<&[u8], Error>); dispatch!(pub(super) fn erasure_shard(self) -> Result, Error>); dispatch!(pub(super) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); - dispatch!(pub(super) fn erasure_shard_index(&self) -> Result); + dispatch!(pub(super) fn erasure_shard_index(&self) -> Result); dispatch!(pub(super) fn into_payload(self) -> Vec); dispatch!(pub(super) fn parent(&self) -> Result); dispatch!(pub(super) fn payload(&self) -> &Vec); @@ -165,10 +165,10 @@ impl From for ShredData { } #[inline] -pub(super) fn erasure_shard_index(shred: &T) -> Option { +pub(super) fn erasure_shard_index(shred: &T) -> Option { let fec_set_index = shred.common_header().fec_set_index; let index = shred.common_header().index.checked_sub(fec_set_index)?; - usize::try_from(index).ok() + Some(index) } pub(super) fn sanitize(shred: &T) -> Result<(), Error> { @@ -177,7 +177,7 @@ pub(super) fn sanitize(shred: &T) -> Result<(), Error> { } let common_header = shred.common_header(); let data_header = shred.data_header(); - if common_header.index as usize >= MAX_DATA_SHREDS_PER_SLOT { + if common_header.index >= MAX_DATA_SHREDS_PER_SLOT { return Err(Error::InvalidShredIndex( ShredType::Data, common_header.index, diff --git a/ledger/src/shred/traits.rs b/ledger/src/shred/traits.rs index 35a6c0a617af64..06511d99d9631c 100644 --- a/ledger/src/shred/traits.rs +++ b/ledger/src/shred/traits.rs @@ -22,7 +22,7 @@ pub(super) trait Shred<'a>: Sized { fn into_payload(self) -> Vec; // Returns the shard index within the erasure coding set. - fn erasure_shard_index(&self) -> Result; + fn erasure_shard_index(&self) -> Result; // Returns the portion of the shred's payload which is erasure coded. fn erasure_shard(self) -> Result, Error>; // Like Shred::erasure_shard but returning a slice. diff --git a/ledger/src/shredder.rs b/ledger/src/shredder.rs index e1d3924e49968c..d67299d5eb5563 100644 --- a/ledger/src/shredder.rs +++ b/ledger/src/shredder.rs @@ -31,7 +31,7 @@ lazy_static! { // Maps number of data shreds to the optimal erasure batch size which has the // same recovery probabilities as a 32:32 erasure batch. -pub(crate) const ERASURE_BATCH_SIZE: [usize; 33] = [ +pub(crate) const ERASURE_BATCH_SIZE: [u32; 33] = [ 0, 18, 20, 22, 23, 25, 27, 28, 30, // 8 32, 33, 35, 36, 38, 39, 41, 42, // 16 43, 45, 46, 48, 49, 51, 52, 53, // 24 @@ -39,7 +39,7 @@ pub(crate) const ERASURE_BATCH_SIZE: [usize; 33] = [ ]; pub struct ReedSolomonCache( - Mutex>>, + Mutex>>, ); #[derive(Debug)] @@ -163,8 +163,11 @@ impl Shredder { shred }; let shreds: Vec<&[u8]> = serialized_shreds.chunks(data_buffer_size).collect(); - let fec_set_offsets: Vec = - get_fec_set_offsets(shreds.len(), DATA_SHREDS_PER_FEC_BLOCK).collect(); + let fec_set_offsets: Vec = get_fec_set_offsets( + u32::try_from(shreds.len()).unwrap(), + DATA_SHREDS_PER_FEC_BLOCK, + ) + .collect(); assert_eq!(shreds.len(), fec_set_offsets.len()); let shreds: Vec = PAR_THREAD_POOL.install(|| { shreds @@ -173,7 +176,7 @@ impl Shredder { .enumerate() .map(|(i, (shred, offset))| { let shred_index = next_shred_index + i as u32; - let fec_set_index = next_shred_index + offset as u32; + let fec_set_index = next_shred_index + offset; make_data_shred(shred, shred_index, fec_set_index) }) .collect() @@ -209,7 +212,7 @@ impl Shredder { chunks .iter() .scan(next_code_index, |next_code_index, chunk| { - let num_data_shreds = chunk.len(); + let num_data_shreds = u32::try_from(chunk.len()).unwrap(); let is_last_in_slot = chunk .last() .copied() @@ -217,7 +220,7 @@ impl Shredder { .unwrap_or(true); let erasure_batch_size = get_erasure_batch_size(num_data_shreds, is_last_in_slot); - *next_code_index += (erasure_batch_size - num_data_shreds) as u32; + *next_code_index += erasure_batch_size - num_data_shreds; Some(*next_code_index) }), ) @@ -284,7 +287,7 @@ impl Shredder { .all(|shred| shred.slot() == slot && shred.version() == version && shred.fec_set_index() == fec_set_index)); - let num_data = data.len(); + let num_data = u32::try_from(data.len()).unwrap(); let is_last_in_slot = data .last() .map(Borrow::borrow) @@ -300,7 +303,7 @@ impl Shredder { .map(Shred::erasure_shard_as_slice) .collect::>() .unwrap(); - let mut parity = vec![vec![0u8; data[0].len()]; num_coding]; + let mut parity = vec![vec![0u8; data[0].len()]; num_coding as usize]; reed_solomon_cache .get(num_data, num_coding) .unwrap() @@ -310,9 +313,9 @@ impl Shredder { let num_coding = u16::try_from(num_coding).unwrap(); parity .iter() - .enumerate() - .map(|(i, parity)| { - let index = next_code_index + u32::try_from(i).unwrap(); + .zip(0..) + .map(|(parity, i)| { + let index = next_code_index + i; Shred::new_from_parity_shard( slot, index, @@ -351,27 +354,25 @@ impl Shredder { .filter(|shred| shred.is_code()) .all(|shred| shred.num_data_shreds().unwrap() == num_data_shreds && shred.num_coding_shreds().unwrap() == num_coding_shreds)); - let num_data_shreds = num_data_shreds as usize; - let num_coding_shreds = num_coding_shreds as usize; let fec_set_size = num_data_shreds + num_coding_shreds; - if num_coding_shreds == 0 || shreds.len() >= fec_set_size { + if num_coding_shreds == 0 || shreds.len() >= fec_set_size.into() { return Ok(Vec::default()); } // Mask to exclude data shreds already received from the return value. - let mut mask = vec![false; num_data_shreds]; - let mut shards = vec![None; fec_set_size]; + let mut mask = vec![false; num_data_shreds as usize]; + let mut shards = vec![None; fec_set_size as usize]; for shred in shreds { let index = match shred.erasure_shard_index() { - Ok(index) if index < fec_set_size => index, + Ok(index) if index < fec_set_size.into() => index, _ => return Err(Error::from(InvalidIndex)), }; - shards[index] = Some(shred.erasure_shard()?); - if index < num_data_shreds { - mask[index] = true; + shards[index as usize] = Some(shred.erasure_shard()?); + if index < num_data_shreds.into() { + mask[index as usize] = true; } } reed_solomon_cache - .get(num_data_shreds, num_coding_shreds)? + .get(num_data_shreds.into(), num_coding_shreds.into())? .reconstruct_data(&mut shards)?; let recovered_data = mask .into_iter() @@ -382,7 +383,7 @@ impl Shredder { shred.slot() == slot && shred.is_data() && match shred.erasure_shard_index() { - Ok(index) => index < num_data_shreds, + Ok(index) => index < num_data_shreds.into(), Err(_) => false, } }) @@ -416,12 +417,12 @@ impl Shredder { } impl ReedSolomonCache { - const CAPACITY: usize = 4 * DATA_SHREDS_PER_FEC_BLOCK; + const CAPACITY: usize = 4 * DATA_SHREDS_PER_FEC_BLOCK as usize; pub(crate) fn get( &self, - data_shards: usize, - parity_shards: usize, + data_shards: u32, + parity_shards: u32, ) -> Result, reed_solomon_erasure::Error> { let key = (data_shards, parity_shards); { @@ -430,7 +431,7 @@ impl ReedSolomonCache { return Ok(entry.clone()); } } - let entry = ReedSolomon::new(data_shards, parity_shards)?; + let entry = ReedSolomon::new(data_shards as usize, parity_shards as usize)?; let entry = Arc::new(entry); { let entry = entry.clone(); @@ -448,9 +449,9 @@ impl Default for ReedSolomonCache { } /// Maps number of data shreds in each batch to the erasure batch size. -pub(crate) fn get_erasure_batch_size(num_data_shreds: usize, is_last_in_slot: bool) -> usize { +pub(crate) fn get_erasure_batch_size(num_data_shreds: u32, is_last_in_slot: bool) -> u32 { let erasure_batch_size = ERASURE_BATCH_SIZE - .get(num_data_shreds) + .get(num_data_shreds as usize) .copied() .unwrap_or(2 * num_data_shreds); if is_last_in_slot { @@ -461,10 +462,7 @@ pub(crate) fn get_erasure_batch_size(num_data_shreds: usize, is_last_in_slot: bo } // Returns offsets to fec_set_index when spliting shreds into erasure batches. -fn get_fec_set_offsets( - mut num_shreds: usize, - min_chunk_size: usize, -) -> impl Iterator { +fn get_fec_set_offsets(mut num_shreds: u32, min_chunk_size: u32) -> impl Iterator { let mut offset = 0; std::iter::from_fn(move || { if num_shreds == 0 { @@ -472,7 +470,7 @@ fn get_fec_set_offsets( } let num_chunks = (num_shreds / min_chunk_size).max(1); let chunk_size = (num_shreds + num_chunks - 1) / num_chunks; - let offsets = std::iter::repeat(offset).take(chunk_size); + let offsets = std::iter::repeat(offset).take(chunk_size as usize); num_shreds -= chunk_size; offset += chunk_size; Some(offsets) @@ -541,7 +539,9 @@ mod tests { let size = serialized_size(&entries).unwrap() as usize; // Integer division to ensure we have enough shreds to fit all the data let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap(); - let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size; + let num_expected_data_shreds: u32 = ((size + data_buffer_size - 1) / data_buffer_size) + .try_into() + .unwrap(); let num_expected_data_shreds = num_expected_data_shreds.max(if is_last_in_slot { DATA_SHREDS_PER_FEC_BLOCK } else { @@ -564,14 +564,14 @@ mod tests { &mut ProcessShredsStats::default(), ); let next_index = data_shreds.last().unwrap().index() + 1; - assert_eq!(next_index as usize, num_expected_data_shreds); + assert_eq!(next_index, num_expected_data_shreds); let mut data_shred_indexes = HashSet::new(); let mut coding_shred_indexes = HashSet::new(); for shred in data_shreds.iter() { assert_eq!(shred.shred_type(), ShredType::Data); let index = shred.index(); - let is_last = index as usize == num_expected_data_shreds - 1; + let is_last = index == num_expected_data_shreds - 1; verify_test_data_shred( shred, index, @@ -594,16 +594,19 @@ mod tests { coding_shred_indexes.insert(index); } - for i in start_index..start_index + num_expected_data_shreds as u32 { + for i in start_index..start_index + num_expected_data_shreds { assert!(data_shred_indexes.contains(&i)); } - for i in start_index..start_index + num_expected_coding_shreds as u32 { + for i in start_index..start_index + num_expected_coding_shreds { assert!(coding_shred_indexes.contains(&i)); } - assert_eq!(data_shred_indexes.len(), num_expected_data_shreds); - assert_eq!(coding_shred_indexes.len(), num_expected_coding_shreds); + assert_eq!(data_shred_indexes.len(), num_expected_data_shreds as usize); + assert_eq!( + coding_shred_indexes.len(), + num_expected_coding_shreds as usize + ); // Test reassembly let deshred_payload = Shredder::deshred(&data_shreds).unwrap(); @@ -815,10 +818,9 @@ mod tests { let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); let entry = Entry::new(&Hash::default(), 1, vec![tx0]); - let num_data_shreds: usize = 5; + let num_data_shreds: u32 = 5; let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap(); - let num_entries = - max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(data_buffer_size)); + let num_entries = max_entries_per_n_shred(&entry, num_data_shreds, Some(data_buffer_size)); let entries: Vec<_> = (0..num_entries) .map(|_| { let keypair0 = Keypair::new(); @@ -842,10 +844,10 @@ mod tests { &reed_solomon_cache, &mut ProcessShredsStats::default(), ); - let num_coding_shreds = coding_shreds.len(); + let num_coding_shreds: u32 = coding_shreds.len().try_into().unwrap(); // We should have 5 data shreds now - assert_eq!(data_shreds.len(), num_data_shreds); + assert_eq!(data_shreds.len(), num_data_shreds as usize); assert_eq!( num_coding_shreds, get_erasure_batch_size(num_data_shreds, is_last_in_slot) - num_data_shreds @@ -909,7 +911,7 @@ mod tests { ); shred_info.insert(3, recovered_shred); - let result = Shredder::deshred(&shred_info[..num_data_shreds]).unwrap(); + let result = Shredder::deshred(&shred_info[..num_data_shreds as usize]).unwrap(); assert!(result.len() >= serialized_entries.len()); assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); @@ -926,7 +928,7 @@ mod tests { assert_eq!(recovered_data.len(), 3); // Data shreds 0, 2, 4 were missing for (i, recovered_shred) in recovered_data.into_iter().enumerate() { let index = i * 2; - let is_last_data = recovered_shred.index() as usize == num_data_shreds - 1; + let is_last_data = recovered_shred.index() == num_data_shreds - 1; verify_test_data_shred( &recovered_shred, index.try_into().unwrap(), @@ -941,16 +943,16 @@ mod tests { shred_info.insert(i * 2, recovered_shred); } - let result = Shredder::deshred(&shred_info[..num_data_shreds]).unwrap(); + let result = Shredder::deshred(&shred_info[..num_data_shreds as usize]).unwrap(); assert!(result.len() >= serialized_entries.len()); assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); // Test4: Try reassembly with 2 missing data shreds, but keeping the last // data shred. Hint: should fail - let shreds: Vec = all_shreds[..num_data_shreds] + let shreds: Vec = all_shreds[..num_data_shreds as usize] .iter() - .enumerate() - .filter_map(|(i, s)| { + .zip(0..) + .filter_map(|(s, i)| { if (i < 4 && i % 2 != 0) || i == num_data_shreds - 1 { // Keep 1, 3, 4 Some(s.clone()) @@ -981,7 +983,7 @@ mod tests { &mut ProcessShredsStats::default(), ); // We should have 10 shreds now - assert_eq!(data_shreds.len(), num_data_shreds); + assert_eq!(data_shreds.len(), num_data_shreds as usize); let all_shreds = data_shreds .iter() @@ -1000,10 +1002,10 @@ mod tests { assert_eq!(recovered_data.len(), 3); // Data shreds 25, 27, 29 were missing for (i, recovered_shred) in recovered_data.into_iter().enumerate() { - let index = 25 + (i * 2); + let index: u32 = (25 + (i * 2)).try_into().unwrap(); verify_test_data_shred( &recovered_shred, - index.try_into().unwrap(), + index, slot, slot - 5, &keypair.pubkey(), @@ -1015,7 +1017,7 @@ mod tests { shred_info.insert(i * 2, recovered_shred); } - let result = Shredder::deshred(&shred_info[..num_data_shreds]).unwrap(); + let result = Shredder::deshred(&shred_info[..num_data_shreds as usize]).unwrap(); assert!(result.len() >= serialized_entries.len()); assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); @@ -1183,7 +1185,7 @@ mod tests { &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); - const MIN_CHUNK_SIZE: usize = DATA_SHREDS_PER_FEC_BLOCK; + const MIN_CHUNK_SIZE: usize = DATA_SHREDS_PER_FEC_BLOCK as usize; let chunks: Vec<_> = data_shreds .iter() .group_by(|shred| shred.fec_set_index()) @@ -1260,31 +1262,36 @@ mod tests { .into_iter() .map(|(_, chunk)| { let chunk: Vec<_> = chunk.collect(); - get_erasure_batch_size(chunk.len(), chunk.last().unwrap().last_in_slot()) + get_erasure_batch_size( + chunk.len().try_into().unwrap(), + chunk.last().unwrap().last_in_slot(), + ) }) - .sum(); + .sum::() + .try_into() + .unwrap(); assert_eq!(coding_shreds.len(), num_shreds - data_shreds.len()); } } #[test] fn test_get_fec_set_offsets() { - const MIN_CHUNK_SIZE: usize = 32usize; - for num_shreds in 0usize..MIN_CHUNK_SIZE { + const MIN_CHUNK_SIZE: u32 = 32; + for num_shreds in 0..MIN_CHUNK_SIZE { let offsets: Vec<_> = get_fec_set_offsets(num_shreds, MIN_CHUNK_SIZE).collect(); - assert_eq!(offsets, vec![0usize; num_shreds]); + assert_eq!(offsets, vec![0; num_shreds as usize]); } for num_shreds in MIN_CHUNK_SIZE..MIN_CHUNK_SIZE * 8 { let chunks: Vec<_> = get_fec_set_offsets(num_shreds, MIN_CHUNK_SIZE) .group_by(|offset| *offset) .into_iter() - .map(|(offset, chunk)| (offset, chunk.count())) + .map(|(offset, chunk)| (offset, chunk.count().try_into().unwrap())) .collect(); assert_eq!( chunks .iter() .map(|(_offset, chunk_size)| chunk_size) - .sum::(), + .sum::(), num_shreds ); assert!(chunks diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index b5d916892fea0c..d9c8857869ff9d 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -13,7 +13,6 @@ use { }, std::{ collections::{BTreeMap, HashSet}, - convert::TryInto, sync::Arc, }, test_case::test_case, @@ -33,11 +32,8 @@ fn test_multi_fec_block_coding(is_last_in_slot: bool) { let keypair1 = Keypair::new(); let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); let entry = Entry::new(&Hash::default(), 1, vec![tx0]); - let num_entries = max_entries_per_n_shred( - &entry, - num_data_shreds as u64, - Some(LEGACY_SHRED_DATA_CAPACITY), - ); + let num_entries = + max_entries_per_n_shred(&entry, num_data_shreds, Some(LEGACY_SHRED_DATA_CAPACITY)); let entries: Vec<_> = (0..num_entries) .map(|_| { @@ -63,9 +59,9 @@ fn test_multi_fec_block_coding(is_last_in_slot: bool) { &mut ProcessShredsStats::default(), ); let next_index = data_shreds.last().unwrap().index() + 1; - assert_eq!(next_index as usize, num_data_shreds); - assert_eq!(data_shreds.len(), num_data_shreds); - assert_eq!(coding_shreds.len(), num_data_shreds); + assert_eq!(next_index, num_data_shreds); + assert_eq!(data_shreds.len(), num_data_shreds as usize); + assert_eq!(coding_shreds.len(), num_data_shreds as usize); for c in &coding_shreds { assert!(!c.is_data()); @@ -75,10 +71,14 @@ fn test_multi_fec_block_coding(is_last_in_slot: bool) { for i in 0..num_fec_sets { let shred_start_index = DATA_SHREDS_PER_FEC_BLOCK * i; let end_index = shred_start_index + DATA_SHREDS_PER_FEC_BLOCK - 1; - let fec_set_shreds = data_shreds[shred_start_index..=end_index] + let fec_set_shreds = data_shreds[shred_start_index as usize..=end_index as usize] .iter() .cloned() - .chain(coding_shreds[shred_start_index..=end_index].iter().cloned()) + .chain( + coding_shreds[shred_start_index as usize..=end_index as usize] + .iter() + .cloned(), + ) .collect::>(); let mut shred_info: Vec = fec_set_shreds @@ -90,11 +90,11 @@ fn test_multi_fec_block_coding(is_last_in_slot: bool) { let recovered_data = Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap(); - for (i, recovered_shred) in recovered_data.into_iter().enumerate() { + for (recovered_shred, i) in recovered_data.into_iter().zip(0..) { let index = shred_start_index + (i * 2); verify_test_data_shred( &recovered_shred, - index.try_into().unwrap(), + index, slot, slot - 5, &keypair.pubkey(), @@ -103,10 +103,14 @@ fn test_multi_fec_block_coding(is_last_in_slot: bool) { index == end_index, ); - shred_info.insert(i * 2, recovered_shred); + shred_info.insert((i * 2) as usize, recovered_shred); } - all_shreds.extend(shred_info.into_iter().take(DATA_SHREDS_PER_FEC_BLOCK)); + all_shreds.extend( + shred_info + .into_iter() + .take(DATA_SHREDS_PER_FEC_BLOCK as usize), + ); } let result = Shredder::deshred(&all_shreds[..]).unwrap(); @@ -121,11 +125,14 @@ fn test_multi_fec_block_different_size_coding() { let (fec_data, fec_coding, num_shreds_per_iter) = setup_different_sized_fec_blocks(slot, parent_slot, keypair.clone()); - let total_num_data_shreds: usize = fec_data.values().map(|x| x.len()).sum(); + let total_num_data_shreds: u32 = fec_data + .values() + .map(|x| u32::try_from(x.len()).unwrap()) + .sum(); let reed_solomon_cache = ReedSolomonCache::default(); // Test recovery for (fec_data_shreds, fec_coding_shreds) in fec_data.values().zip(fec_coding.values()) { - let first_data_index = fec_data_shreds.first().unwrap().index() as usize; + let first_data_index = fec_data_shreds.first().unwrap().index(); let all_shreds: Vec = fec_data_shreds .iter() .step_by(2) @@ -137,11 +144,11 @@ fn test_multi_fec_block_different_size_coding() { // is part of the recovered set, and that the below `index` // calculation in the loop is correct assert!(fec_data_shreds.len() % 2 == 0); - for (i, recovered_shred) in recovered_data.into_iter().enumerate() { + for (recovered_shred, i) in recovered_data.into_iter().zip(0..) { let index = first_data_index + (i * 2) + 1; verify_test_data_shred( &recovered_shred, - index.try_into().unwrap(), + index, slot, parent_slot, &keypair.pubkey(), @@ -186,7 +193,7 @@ fn setup_different_sized_fec_blocks( slot: Slot, parent_slot: Slot, keypair: Arc, -) -> (IndexShredsMap, IndexShredsMap, usize) { +) -> (IndexShredsMap, IndexShredsMap, u32) { let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap(); let keypair0 = Keypair::new(); let keypair1 = Keypair::new(); @@ -200,7 +207,7 @@ fn setup_different_sized_fec_blocks( let num_shreds_per_iter = DATA_SHREDS_PER_FEC_BLOCK + 2; let num_entries = max_entries_per_n_shred( &entry, - num_shreds_per_iter as u64, + num_shreds_per_iter, Some(LEGACY_SHRED_DATA_CAPACITY), ); let entries: Vec<_> = (0..num_entries) @@ -221,7 +228,7 @@ fn setup_different_sized_fec_blocks( let mut data_slot_and_index = HashSet::new(); let mut coding_slot_and_index = HashSet::new(); - let total_num_data_shreds: usize = 2 * num_shreds_per_iter; + let total_num_data_shreds = 2 * num_shreds_per_iter; let reed_solomon_cache = ReedSolomonCache::default(); for i in 0..2 { let is_last = i == 1; @@ -237,17 +244,17 @@ fn setup_different_sized_fec_blocks( &mut ProcessShredsStats::default(), ); for shred in &data_shreds { - if (shred.index() as usize) == total_num_data_shreds - 1 { + if shred.index() == total_num_data_shreds - 1 { assert!(shred.data_complete()); assert!(shred.last_in_slot()); - } else if (shred.index() as usize) % num_shreds_per_iter == num_shreds_per_iter - 1 { + } else if shred.index() % num_shreds_per_iter == num_shreds_per_iter - 1 { assert!(shred.data_complete()); } else { assert!(!shred.data_complete()); assert!(!shred.last_in_slot()); } } - assert_eq!(data_shreds.len(), num_shreds_per_iter); + assert_eq!(data_shreds.len(), num_shreds_per_iter as usize); next_shred_index = data_shreds.last().unwrap().index() + 1; next_code_index = coding_shreds.last().unwrap().index() + 1; sort_data_coding_into_fec_sets( diff --git a/turbine/src/broadcast_stage.rs b/turbine/src/broadcast_stage.rs index cce43f4fa5aabd..d7d944ea19609e 100644 --- a/turbine/src/broadcast_stage.rs +++ b/turbine/src/broadcast_stage.rs @@ -69,7 +69,7 @@ pub enum Error { #[error("Duplicate slot broadcast: {0}")] DuplicateSlotBroadcast(Slot), #[error("Invalid Merkle root, slot: {slot}, index: {index}")] - InvalidMerkleRoot { slot: Slot, index: u64 }, + InvalidMerkleRoot { slot: Slot, index: u32 }, #[error(transparent)] Io(#[from] std::io::Error), #[error(transparent)] @@ -81,13 +81,15 @@ pub enum Error { #[error(transparent)] Serialize(#[from] std::boxed::Box), #[error("Shred not found, slot: {slot}, index: {index}")] - ShredNotFound { slot: Slot, index: u64 }, + ShredNotFound { slot: Slot, index: u32 }, #[error(transparent)] TransportError(#[from] solana_sdk::transport::TransportError), #[error("Unknown last index, slot: {0}")] UnknownLastIndex(Slot), #[error("Unknown slot meta, slot: {0}")] UnknownSlotMeta(Slot), + #[error("Slot last shred index is above u32::MAX, slot: {slot}, index: {index}")] + LastShredIndexOutOfRange { slot: Slot, index: u64 }, } type Result = std::result::Result; @@ -537,7 +539,7 @@ pub mod test { #[allow(clippy::type_complexity)] fn make_transmit_shreds( slot: Slot, - num: u64, + num: u32, ) -> ( Vec, Vec, @@ -578,21 +580,21 @@ pub mod test { fn check_all_shreds_received( transmit_receiver: &TransmitReceiver, - mut data_index: u64, - mut coding_index: u64, - num_expected_data_shreds: u64, - num_expected_coding_shreds: u64, + mut data_index: u32, + mut coding_index: u32, + num_expected_data_shreds: u32, + num_expected_coding_shreds: u32, ) { while let Ok((shreds, _)) = transmit_receiver.try_recv() { if shreds[0].is_data() { for data_shred in shreds.iter() { - assert_eq!(data_shred.index() as u64, data_index); + assert_eq!(data_shred.index(), data_index); data_index += 1; } } else { - assert_eq!(shreds[0].index() as u64, coding_index); + assert_eq!(shreds[0].index(), coding_index); for coding_shred in shreds.iter() { - assert_eq!(coding_shred.index() as u64, coding_index); + assert_eq!(coding_shred.index(), coding_index); coding_index += 1; } } @@ -614,8 +616,8 @@ pub mod test { let updated_slot = 0; let (all_data_shreds, all_coding_shreds, _, _all_coding_transmit_shreds) = make_transmit_shreds(updated_slot, 10); - let num_data_shreds = all_data_shreds.len(); - let num_coding_shreds = all_coding_shreds.len(); + let num_data_shreds = u32::try_from(all_data_shreds.len()).unwrap(); + let num_coding_shreds = u32::try_from(all_coding_shreds.len()).unwrap(); assert!(num_data_shreds >= 10); // Insert all the shreds @@ -637,13 +639,7 @@ pub mod test { ) .unwrap(); // Check all the data shreds were received only once - check_all_shreds_received( - &transmit_receiver, - 0, - 0, - num_data_shreds as u64, - num_coding_shreds as u64, - ); + check_all_shreds_received(&transmit_receiver, 0, 0, num_data_shreds, num_coding_shreds); } struct MockBroadcastStage { diff --git a/turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs b/turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs index b82ca324b61820..4002ef99a0c01f 100644 --- a/turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -54,7 +54,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { .unwrap(), Some(index) => { let shred = blockstore - .get_data_shred(bank.slot(), u64::from(index)) + .get_data_shred(bank.slot(), index) .unwrap() .unwrap(); shred::layout::get_merkle_root(&shred).unwrap() diff --git a/turbine/src/broadcast_stage/broadcast_utils.rs b/turbine/src/broadcast_stage/broadcast_utils.rs index 9c4b48bd5ca01d..a3a9881f195f2c 100644 --- a/turbine/src/broadcast_stage/broadcast_utils.rs +++ b/turbine/src/broadcast_stage/broadcast_utils.rs @@ -107,6 +107,12 @@ pub(super) fn get_chained_merkle_root_from_parent( .ok_or_else(|| Error::UnknownSlotMeta(parent))? .last_index .ok_or_else(|| Error::UnknownLastIndex(parent))?; + let index: u32 = index + .try_into() + .map_err(|_| Error::LastShredIndexOutOfRange { + slot: parent, + index, + })?; let shred = blockstore .get_data_shred(parent, index)? .ok_or(Error::ShredNotFound { diff --git a/turbine/src/broadcast_stage/standard_broadcast_run.rs b/turbine/src/broadcast_stage/standard_broadcast_run.rs index 0ddbe1020f5f98..3c7f1a4367ce42 100644 --- a/turbine/src/broadcast_stage/standard_broadcast_run.rs +++ b/turbine/src/broadcast_stage/standard_broadcast_run.rs @@ -280,8 +280,8 @@ impl StandardBroadcastRun { is_last_in_slot, cluster_type, &mut process_stats, - blockstore::MAX_DATA_SHREDS_PER_SLOT as u32, - shred_code::MAX_CODE_SHREDS_PER_SLOT as u32, + blockstore::MAX_DATA_SHREDS_PER_SLOT, + shred_code::MAX_CODE_SHREDS_PER_SLOT, ) .unwrap(); // Insert the first data shred synchronously so that blockstore stores @@ -539,7 +539,7 @@ mod test { #[allow(clippy::type_complexity)] fn setup( - num_shreds_per_slot: Slot, + num_shreds_per_slot: u32, ) -> ( Arc, GenesisConfig, @@ -650,10 +650,7 @@ mod test { &quic_endpoint_sender, ) .unwrap(); - assert_eq!( - standard_broadcast_run.next_shred_index as u64, - num_shreds_per_slot - ); + assert_eq!(standard_broadcast_run.next_shred_index, num_shreds_per_slot); assert_eq!(standard_broadcast_run.slot, 0); assert_eq!(standard_broadcast_run.parent, 0); // Make sure the slot is not complete @@ -722,7 +719,7 @@ mod test { // The shred index should have reset to 0, which makes it possible for the // index < the previous shred index for slot 0 - assert_eq!(standard_broadcast_run.next_shred_index as u64, num_shreds); + assert_eq!(standard_broadcast_run.next_shred_index, num_shreds); assert_eq!(standard_broadcast_run.slot, 2); assert_eq!(standard_broadcast_run.parent, 0);