From 28ae49bae4a333a40954fd6087e8388e02da32b1 Mon Sep 17 00:00:00 2001 From: Illia Bobyr Date: Thu, 30 Mar 2023 22:48:34 -0700 Subject: [PATCH] ledger/blockstore: Use `u32` for shred indices. Shred indices, within a slot, can not exceed a `u32` value. Unfortunately, at the moment, there is no common type that is used when a shred index needs to be stored. Part of the code use `u32`, another considerable part uses `u64` and a few places use `usize`. Ideally, it would be better to have a dedicated type, wrapping the primitive `u32` value. This change is trying to unify the types used, moving most of the code that works with shred indices to `u32`. `u64` is used in a few locations that are serialized into the blockstore or are part of the wire protocol. Those are left as is for now. As the conversion is not 100%, there are still a few conversions left, that could be avoided should both the serialization and the wire protocol code use `u32`. I also was not completely sure about the `usize` types used by the Merkel tree code - those might be convertible to `u32` as well. Stats (roughly): * Removed `as u{32,64}`: $ git diff --cached | grep '^-.*as u\(32\|64\)' | wc -l 50 * Removed `unwrap()`/`expect()`: $ git diff --cached | grep '^-.*\(unwrap()\|expect()\)' | wc -l 20 * Added `as u{32,64}`: $ git diff --cached | grep '^+.*as u\(32\|64\)' | wc -l 7 * Added `unwrap()`/`expect()`: $ git diff --cached | grep '^+.*\(unwrap()\|expect()\)' | wc -l 32 Total is 50 + 20 - 8 - 32 = 30 conversions removed. And a lot of the removed conversion are unchecked, while a lot of the added ones are checked. Current version would still panic, if a value outside the `u32` scope is used - it will just panic further away from the bug location, complicating the debugging efforts. This change also prepares the grounds for removal of most of the conversion that are still present. We would need to convert the serialization and the wire code to `u32` as well. There are a number of `as usize` conversions added in the new code. Those should be safe, as they convert from `u32`. Most of them are used when indexing into `Vec`s. --- core/benches/shredder.rs | 18 +- core/src/repair/repair_response.rs | 2 +- core/src/repair/repair_service.rs | 6 +- core/src/repair/serve_repair.rs | 32 +-- ledger/benches/blockstore.rs | 12 +- ledger/src/blockstore.rs | 196 +++++++++--------- ledger/src/blockstore_db.rs | 6 +- ledger/src/blockstore_meta.rs | 73 +++++-- ledger/src/blockstore_processor.rs | 6 +- ledger/src/shred.rs | 18 +- ledger/src/shred/legacy.rs | 12 +- ledger/src/shred/merkle.rs | 97 +++++---- ledger/src/shred/shred_code.rs | 20 +- ledger/src/shred/shred_data.rs | 8 +- ledger/src/shred/traits.rs | 2 +- ledger/src/shredder.rs | 135 ++++++------ ledger/tests/shred.rs | 57 ++--- turbine/src/broadcast_stage.rs | 34 ++- .../broadcast_fake_shreds_run.rs | 2 +- .../src/broadcast_stage/broadcast_utils.rs | 6 + .../broadcast_stage/standard_broadcast_run.rs | 13 +- 21 files changed, 415 insertions(+), 340 deletions(-) diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 333db34bcef2f4..39d098ba3d38a7 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -28,11 +28,11 @@ fn make_large_unchained_entries(txs_per_entry: u64, num_entries: u64) -> Vec Vec { +fn make_shreds(num_shreds: u32) -> Vec { let txs_per_entry = 128; let num_entries = max_entries_per_n_shred( &make_test_entry(txs_per_entry), - 2 * num_shreds as u64, + 2 * num_shreds, Some(LEGACY_SHRED_DATA_CAPACITY), ); let entries = make_large_unchained_entries(txs_per_entry, num_entries); @@ -49,7 +49,7 @@ fn make_shreds(num_shreds: usize) -> Vec { &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); - assert!(data_shreds.len() >= num_shreds); + assert!(data_shreds.len() >= num_shreds as usize); data_shreds } @@ -57,7 +57,7 @@ fn make_shreds(num_shreds: usize) -> Vec { fn bench_shredder_ticks(bencher: &mut Bencher) { let kp = Keypair::new(); let shred_size = LEGACY_SHRED_DATA_CAPACITY; - let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size; + let num_shreds = u32::try_from(((1000 * 1000) + (shred_size - 1)) / shred_size).unwrap(); // ~1Mb let num_ticks = max_ticks_per_n_shreds(1, Some(LEGACY_SHRED_DATA_CAPACITY)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); @@ -83,11 +83,11 @@ fn bench_shredder_ticks(bencher: &mut Bencher) { fn bench_shredder_large_entries(bencher: &mut Bencher) { let kp = Keypair::new(); let shred_size = LEGACY_SHRED_DATA_CAPACITY; - let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size; + let num_shreds = u32::try_from(((1000 * 1000) + (shred_size - 1)) / shred_size).unwrap(); let txs_per_entry = 128; let num_entries = max_entries_per_n_shred( &make_test_entry(txs_per_entry), - num_shreds as u64, + num_shreds, Some(shred_size), ); let entries = make_large_unchained_entries(txs_per_entry, num_entries); @@ -115,7 +115,7 @@ fn bench_deshredder(bencher: &mut Bencher) { let kp = Keypair::new(); let shred_size = LEGACY_SHRED_DATA_CAPACITY; // ~10Mb - let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size; + let num_shreds = u32::try_from(((10000 * 1000) + (shred_size - 1)) / shred_size).unwrap(); let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); let shredder = Shredder::new(1, 0, 0, 0).unwrap(); @@ -156,7 +156,7 @@ fn bench_shredder_coding(bencher: &mut Bencher) { let reed_solomon_cache = ReedSolomonCache::default(); bencher.iter(|| { Shredder::generate_coding_shreds( - &data_shreds[..symbol_count], + &data_shreds[..symbol_count as usize], 0, // next_code_index &reed_solomon_cache, ) @@ -170,7 +170,7 @@ fn bench_shredder_decoding(bencher: &mut Bencher) { let data_shreds = make_shreds(symbol_count); let reed_solomon_cache = ReedSolomonCache::default(); let coding_shreds = Shredder::generate_coding_shreds( - &data_shreds[..symbol_count], + &data_shreds[..symbol_count as usize], 0, // next_code_index &reed_solomon_cache, ); diff --git a/core/src/repair/repair_response.rs b/core/src/repair/repair_response.rs index 0a82935e89892c..a02b07d891fc40 100644 --- a/core/src/repair/repair_response.rs +++ b/core/src/repair/repair_response.rs @@ -10,7 +10,7 @@ use { pub fn repair_response_packet( blockstore: &Blockstore, slot: Slot, - shred_index: u64, + shred_index: u32, dest: &SocketAddr, nonce: Nonce, ) -> Option { diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index 7d3d6dd54213e4..c92cb65ea384bd 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -666,7 +666,11 @@ impl RepairService { if let Some(reference_tick) = slot_meta .received .checked_sub(1) - .and_then(|index| blockstore.get_data_shred(slot, index).ok()?) + .and_then(|index| { + blockstore + .get_data_shred(slot, u32::try_from(index).ok()?) + .ok()? + }) .and_then(|shred| shred::layout::get_reference_tick(&shred).ok()) .map(u64::from) { diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index 44c221f2a97877..d02802109859ba 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -96,8 +96,12 @@ pub enum ShredRepairType { /// Requesting `MAX_ORPHAN_REPAIR_RESPONSES ` parent shreds Orphan(Slot), /// Requesting any shred with index greater than or equal to the particular index + // TODO As the value is a shred index, it should use `u32`. But as changing the type would + // affect the serialized representation, we would need to introduce a conversion. HighestShred(Slot, u64), /// Requesting the missing shred at a particular index + // TODO As the value is a shred index, it should use `u32`. But as changing the type would + // affect the serialized representation, we would need to introduce a conversion. Shred(Slot, u64), } @@ -221,6 +225,7 @@ impl RepairRequestHeader { pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>; /// Window protocol messages +// TODO All `u64`s in the branches of this enum are shred indices, and they should be used `u32`. #[cfg_attr( feature = "frozen-abi", derive(AbiEnumVisitor, AbiExample), @@ -437,7 +442,7 @@ impl ServeRepair { from_addr, blockstore, *slot, - *shred_index, + u32::try_from(*shred_index).expect("All shred indices fit into u32"), *nonce, ); if batch.is_none() { @@ -457,7 +462,7 @@ impl ServeRepair { from_addr, blockstore, *slot, - *highest_index, + u32::try_from(*highest_index).expect("All shred indices fit into u32"), *nonce, ), "HighestWindowIndexWithNonce", @@ -1290,7 +1295,7 @@ impl ServeRepair { from_addr: &SocketAddr, blockstore: &Blockstore, slot: Slot, - shred_index: u64, + shred_index: u32, nonce: Nonce, ) -> Option { // Try to find the requested index in one of the slots @@ -1313,17 +1318,17 @@ impl ServeRepair { from_addr: &SocketAddr, blockstore: &Blockstore, slot: Slot, - highest_index: u64, + highest_index: u32, nonce: Nonce, ) -> Option { // Try to find the requested index in one of the slots let meta = blockstore.meta(slot).ok()??; - if meta.received > highest_index { + if meta.received > highest_index.into() { // meta.received must be at least 1 by this point let packet = repair_response::repair_response_packet( blockstore, slot, - meta.received - 1, + u32::try_from(meta.received - 1).expect("All shred indices fit into u32"), from_addr, nonce, )?; @@ -1354,7 +1359,8 @@ impl ServeRepair { repair_response::repair_response_packet( blockstore, meta.slot, - meta.received.checked_sub(1u64)?, + u32::try_from(meta.received.checked_sub(1u64)?) + .expect("All shred indices fit into u32"), from_addr, nonce, ) @@ -1909,7 +1915,7 @@ mod tests { nonce, ) .expect("packets"); - let request = ShredRepairType::HighestShred(slot, index); + let request = ShredRepairType::HighestShred(slot, index.into()); verify_responses(&request, rv.iter()); let rv: Vec = rv @@ -1920,8 +1926,10 @@ mod tests { }) .collect(); assert!(!rv.is_empty()); - let index = blockstore.meta(slot).unwrap().unwrap().received - 1; - assert_eq!(rv[0].index(), index as u32); + let index: u32 = (blockstore.meta(slot).unwrap().unwrap().received - 1) + .try_into() + .unwrap(); + assert_eq!(rv[0].index(), index); assert_eq!(rv[0].slot(), slot); let rv = ServeRepair::run_highest_window_request( @@ -1971,7 +1979,7 @@ mod tests { nonce, ) .expect("packets"); - let request = ShredRepairType::Shred(slot, index); + let request = ShredRepairType::Shred(slot, index.into()); verify_responses(&request, rv.iter()); let rv: Vec = rv .into_iter() @@ -2163,7 +2171,7 @@ mod tests { repair_response::repair_response_packet( &blockstore, slot, - index, + index.try_into().unwrap(), &socketaddr_any!(), nonce, ) diff --git a/ledger/benches/blockstore.rs b/ledger/benches/blockstore.rs index 27296d412d7ab3..edc032f459a441 100644 --- a/ledger/benches/blockstore.rs +++ b/ledger/benches/blockstore.rs @@ -28,13 +28,13 @@ fn bench_write_shreds(bench: &mut Bencher, entries: Vec, ledger_path: &Pa // Insert some shreds into the ledger in preparation for read benchmarks fn setup_read_bench( blockstore: &Blockstore, - num_small_shreds: u64, - num_large_shreds: u64, + num_small_shreds: u32, + num_large_shreds: u32, slot: Slot, ) { // Make some big and small entries let entries = create_ticks( - num_large_shreds * 4 + num_small_shreds * 2, + (num_large_shreds * 4 + num_small_shreds * 2).into(), 0, Hash::default(), ); @@ -117,12 +117,12 @@ fn bench_read_random(bench: &mut Bencher) { // Generate a num_reads sized random sample of indexes in range [0, total_shreds - 1], // simulating random reads let mut rng = rand::thread_rng(); - let indexes: Vec = (0..num_reads) - .map(|_| rng.gen_range(0..total_shreds) as usize) + let indexes: Vec = (0..num_reads) + .map(|_| rng.gen_range(0..total_shreds)) .collect(); bench.iter(move || { for i in indexes.iter() { - let _ = blockstore.get_data_shred(slot, *i as u64); + let _ = blockstore.get_data_shred(slot, *i); } }); } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 21d5418b7c6038..6da8919810097b 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -106,7 +106,7 @@ pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; // An upper bound on maximum number of data shreds we can handle in a slot // 32K shreds would allow ~320K peak TPS // (32K shreds per slot * 4 TX per shred * 2.5 slots per sec) -pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768; +pub const MAX_DATA_SHREDS_PER_SLOT: u32 = 32_768; pub type CompletedSlotsSender = Sender>; pub type CompletedSlotsReceiver = Receiver>; @@ -632,25 +632,41 @@ impl Blockstore { pub fn slot_data_iterator( &self, slot: Slot, - index: u64, - ) -> Result)> + '_> { + index: u32, + ) -> Result)> + '_> { let slot_iterator = self.db.iter::(IteratorMode::From( - (slot, index), + (slot, index.into()), IteratorDirection::Forward, ))?; - Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) + Ok(slot_iterator + .take_while(move |((shred_slot, _), _)| *shred_slot == slot) + .map(|((slot, index), data)| { + ( + slot, + u32::try_from(index).expect("shred index fits into u32"), + data, + ) + })) } pub fn slot_coding_iterator( &self, slot: Slot, - index: u64, - ) -> Result)> + '_> { + index: u32, + ) -> Result)> + '_> { let slot_iterator = self.db.iter::(IteratorMode::From( - (slot, index), + (slot, index.into()), IteratorDirection::Forward, ))?; - Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) + Ok(slot_iterator + .take_while(move |((shred_slot, _), _)| *shred_slot == slot) + .map(|((slot, index), data)| { + ( + slot, + u32::try_from(index).expect("shred index fits into u32"), + data, + ) + })) } fn prepare_rooted_slot_iterator( @@ -718,14 +734,14 @@ impl Blockstore { data_cf: &'a LedgerColumn, ) -> impl Iterator + 'a { erasure_meta.data_shreds_indices().filter_map(move |i| { - let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Data); + let key = ShredId::new(slot, i, ShredType::Data); if let Some(shred) = prev_inserted_shreds.get(&key) { return Some(shred.clone()); } if !index.data().contains(i) { return None; } - match data_cf.get_bytes((slot, i)).unwrap() { + match data_cf.get_bytes((slot, i.into())).unwrap() { None => { error!( "Unable to read the data shred with slot {slot}, index {i} for shred \ @@ -747,14 +763,14 @@ impl Blockstore { code_cf: &'a LedgerColumn, ) -> impl Iterator + 'a { erasure_meta.coding_shreds_indices().filter_map(move |i| { - let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Code); + let key = ShredId::new(slot, i, ShredType::Code); if let Some(shred) = prev_inserted_shreds.get(&key) { return Some(shred.clone()); } if !index.coding().contains(i) { return None; } - match code_cf.get_bytes((slot, i)).unwrap() { + match code_cf.get_bytes((slot, i.into())).unwrap() { None => { error!( "Unable to read the coding shred with slot {slot}, index {i} for shred \ @@ -1393,7 +1409,7 @@ impl Blockstore { metrics: &mut BlockstoreInsertionMetrics, ) -> bool { let slot = shred.slot(); - let shred_index = u64::from(shred.index()); + let shred_index = shred.index(); let index_meta_working_set_entry = self.get_index_meta_entry(slot, index_working_set, index_meta_time_us); @@ -1546,7 +1562,7 @@ impl Blockstore { return Some(Cow::Owned(potential_shred.into_payload())); } } else if let Some(potential_shred) = { - let key = ShredId::new(slot, u32::try_from(coding_index).unwrap(), ShredType::Code); + let key = ShredId::new(slot, coding_index, ShredType::Code); just_received_shreds.get(&key) } { if shred.erasure_mismatch(potential_shred).unwrap() { @@ -1720,7 +1736,7 @@ impl Blockstore { write_batch: &mut WriteBatch, ) -> Result<()> { let slot = shred.slot(); - let shred_index = u64::from(shred.index()); + let shred_index = shred.index(); // Assert guaranteed by integrity checks on the shred that happen before // `insert_coding_shred` is called @@ -1729,16 +1745,16 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, shred_index), shred.payload())?; + write_batch.put_bytes::((slot, shred_index.into()), shred.payload())?; index_meta.coding_mut().insert(shred_index); Ok(()) } fn is_data_shred_present(shred: &Shred, slot_meta: &SlotMeta, data_index: &ShredIndex) -> bool { - let shred_index = u64::from(shred.index()); + let shred_index = shred.index(); // Check that the shred doesn't already exist in blockstore - shred_index < slot_meta.consumed || data_index.contains(shred_index) + u64::from(shred_index) < slot_meta.consumed || data_index.contains(shred_index) } /// Finds the corresponding shred at `shred_id` in the just inserted @@ -1753,14 +1769,8 @@ impl Blockstore { (Some(shred), _) => Some(Cow::Borrowed(shred.payload())), // If it doesn't exist in the just inserted set, it must exist in // the backing store - (_, ShredType::Data) => self - .get_data_shred(slot, u64::from(index)) - .unwrap() - .map(Cow::Owned), - (_, ShredType::Code) => self - .get_coding_shred(slot, u64::from(index)) - .unwrap() - .map(Cow::Owned), + (_, ShredType::Data) => self.get_data_shred(slot, index).unwrap().map(Cow::Owned), + (_, ShredType::Code) => self.get_coding_shred(slot, index).unwrap().map(Cow::Owned), } } @@ -2025,7 +2035,10 @@ impl Blockstore { if !self.has_duplicate_shreds_in_slot(slot) { let shred_id = ShredId::new( slot, - u32::try_from(last_index.unwrap()).unwrap(), + last_index + .unwrap() + .try_into() + .expect("A valid `last_index` fits into u32."), ShredType::Data, ); let Some(ending_shred) = self @@ -2075,7 +2088,8 @@ impl Blockstore { if !self.has_duplicate_shreds_in_slot(slot) { let shred_id = ShredId::new( slot, - u32::try_from(slot_meta.received - 1).unwrap(), + u32::try_from(slot_meta.received - 1) + .expect("`slot_meta.received` is a shred index, so it fits into u32"), ShredType::Data, ); let Some(ending_shred) = self @@ -2149,7 +2163,7 @@ impl Blockstore { shred_source: ShredSource, ) -> Result> { let slot = shred.slot(); - let index = u64::from(shred.index()); + let index = shred.index(); let last_in_slot = if shred.last_in_slot() { debug!("got last in slot"); @@ -2168,7 +2182,7 @@ impl Blockstore { // Parent for slot meta should have been set by this point assert!(!slot_meta.is_orphan()); - let new_consumed = if slot_meta.consumed == index { + let new_consumed = if slot_meta.consumed == u64::from(index) { let mut current_index = index + 1; while data_index.contains(current_index) { @@ -2176,18 +2190,18 @@ impl Blockstore { } current_index } else { - slot_meta.consumed + u32::try_from(slot_meta.consumed).expect("`consumed` is a valid shred index") }; // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, index), shred.bytes_to_store())?; + write_batch.put_bytes::((slot, index.into()), shred.bytes_to_store())?; data_index.insert(index); let newly_completed_data_sets = update_slot_meta( last_in_slot, last_in_data, slot_meta, - index as u32, + index, new_consumed, shred.reference_tick(), data_index, @@ -2217,8 +2231,8 @@ impl Blockstore { Ok(newly_completed_data_sets) } - pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result>> { - let shred = self.data_shred_cf.get_bytes((slot, index))?; + pub fn get_data_shred(&self, slot: Slot, index: u32) -> Result>> { + let shred = self.data_shred_cf.get_bytes((slot, index.into()))?; let shred = shred.map(ShredData::resize_stored_shred).transpose(); shred.map_err(|err| { let err = format!("Invalid stored shred: {err}"); @@ -2227,10 +2241,10 @@ impl Blockstore { }) } - pub fn get_data_shreds_for_slot(&self, slot: Slot, start_index: u64) -> Result> { + pub fn get_data_shreds_for_slot(&self, slot: Slot, start_index: u32) -> Result> { self.slot_data_iterator(slot, start_index) .expect("blockstore couldn't fetch iterator") - .map(|(_, bytes)| { + .map(|(_slot, _shred_index, bytes)| { Shred::new_from_serialized_shred(bytes.to_vec()).map_err(|err| { BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( format!("Could not reconstruct shred from shred payload: {err:?}"), @@ -2244,10 +2258,10 @@ impl Blockstore { fn get_data_shreds( &self, slot: Slot, - from_index: u64, - to_index: u64, + from_index: u32, + to_index: u32, buffer: &mut [u8], - ) -> Result<(u64, usize)> { + ) -> Result<(u32, usize)> { let _lock = self.check_lowest_cleanup_slot(slot)?; let mut buffer_offset = 0; let mut last_index = 0; @@ -2256,7 +2270,7 @@ impl Blockstore { warn!("The slot is not yet full. Will not return any shreds"); return Ok((last_index, buffer_offset)); } - let to_index = cmp::min(to_index, meta.consumed); + let to_index = cmp::min(to_index, u32::try_from(meta.consumed).unwrap()); for index in from_index..to_index { if let Some(shred_data) = self.get_data_shred(slot, index)? { let shred_len = shred_data.len(); @@ -2280,18 +2294,18 @@ impl Blockstore { Ok((last_index, buffer_offset)) } - pub fn get_coding_shred(&self, slot: Slot, index: u64) -> Result>> { - self.code_shred_cf.get_bytes((slot, index)) + pub fn get_coding_shred(&self, slot: Slot, index: u32) -> Result>> { + self.code_shred_cf.get_bytes((slot, index.into())) } pub fn get_coding_shreds_for_slot( &self, slot: Slot, - start_index: u64, + start_index: u32, ) -> std::result::Result, shred::Error> { self.slot_coding_iterator(slot, start_index) .expect("blockstore couldn't fetch iterator") - .map(|code| Shred::new_from_serialized_shred(code.1.to_vec())) + .map(|(_slot, _shred, code)| Shred::new_from_serialized_shred(code.to_vec())) .collect() } @@ -3463,7 +3477,7 @@ impl Blockstore { } /// Returns the entry vector for the slot starting with `shred_start_index` - pub fn get_slot_entries(&self, slot: Slot, shred_start_index: u64) -> Result> { + pub fn get_slot_entries(&self, slot: Slot, shred_start_index: u32) -> Result> { self.get_slot_entries_with_shred_info(slot, shred_start_index, false) .map(|x| x.0) } @@ -3473,9 +3487,9 @@ impl Blockstore { pub fn get_slot_entries_with_shred_info( &self, slot: Slot, - start_index: u64, + start_index: u32, allow_dead_slots: bool, - ) -> Result<(Vec, u64, bool)> { + ) -> Result<(Vec, u32, bool)> { let (completed_ranges, slot_meta) = self.get_completed_ranges(slot, start_index)?; // Check if the slot is dead *after* fetching completed ranges to avoid a race @@ -3491,7 +3505,7 @@ impl Blockstore { let slot_meta = slot_meta.unwrap(); let num_shreds = completed_ranges .last() - .map(|(_, end_index)| u64::from(*end_index) - start_index + 1) + .map(|(_, end)| *end - start_index + 1) .unwrap_or(0); let entries = self.get_slot_entries_in_block(slot, completed_ranges, Some(&slot_meta))?; @@ -3567,7 +3581,7 @@ impl Blockstore { fn get_completed_ranges( &self, slot: Slot, - start_index: u64, + start_index: u32, ) -> Result<(CompletedRanges, Option)> { let slot_meta = self.meta_cf.get(slot)?; if slot_meta.is_none() { @@ -3577,7 +3591,7 @@ impl Blockstore { let slot_meta = slot_meta.unwrap(); // Find all the ranges for the completed data blocks let completed_ranges = Self::get_completed_data_ranges( - start_index as u32, + start_index, &slot_meta.completed_data_indexes, slot_meta.consumed as u32, ); @@ -4009,8 +4023,8 @@ impl Blockstore { pub fn is_shred_duplicate(&self, shred: &Shred) -> Option> { let (slot, index, shred_type) = shred.id().unpack(); let mut other = match shred_type { - ShredType::Data => self.get_data_shred(slot, u64::from(index)), - ShredType::Code => self.get_coding_shred(slot, u64::from(index)), + ShredType::Data => self.get_data_shred(slot, index), + ShredType::Code => self.get_coding_shred(slot, index), } .expect("fetch from DuplicateSlots column family failed")?; if let Ok(signature) = shred.retransmitter_signature() { @@ -4582,7 +4596,7 @@ fn update_completed_data_indexes( shred_indices .windows(2) .filter(|ix| { - let (begin, end) = (ix[0] as u64, ix[1] as u64); + let (begin, end) = (ix[0], ix[1]); let num_shreds = (end - begin) as usize; received_data_shreds.range(begin..end).count() == num_shreds }) @@ -4595,7 +4609,7 @@ fn update_slot_meta( is_last_in_data: bool, slot_meta: &mut SlotMeta, index: u32, - new_consumed: u64, + new_consumed: u32, reference_tick: u8, received_data_shreds: &ShredIndex, ) -> Vec<(u32, u32)> { @@ -4608,11 +4622,11 @@ fn update_slot_meta( let slot_time_elapsed = u64::from(reference_tick) * 1000 / DEFAULT_TICKS_PER_SECOND; slot_meta.first_shred_timestamp = timestamp() - slot_time_elapsed; } - slot_meta.consumed = new_consumed; + slot_meta.consumed = new_consumed.into(); // If the last index in the slot hasn't been set before, then // set it to this shred index if is_last_in_slot && slot_meta.last_index.is_none() { - slot_meta.last_index = Some(u64::from(index)); + slot_meta.last_index = Some(index.into()); } update_completed_data_indexes( is_last_in_slot || is_last_in_data, @@ -5202,7 +5216,7 @@ pub fn test_all_empty_or_min(blockstore: &Blockstore, min_slot: Slot) { pub fn make_many_slot_shreds( start_slot: u64, num_slots: u64, - num_shreds_per_slot: u64, + num_shreds_per_slot: u32, ) -> (Vec, Vec) { // Use `None` as shred_size so the default (full) value is used let num_entries = max_ticks_per_n_shreds(num_shreds_per_slot, None); @@ -5618,7 +5632,7 @@ pub mod tests { fn test_read_shred_bytes() { let slot = 0; let (shreds, _) = make_slot_entries(slot, 0, 100, /*merkle_variant:*/ true); - let num_shreds = shreds.len() as u64; + let num_shreds = shreds.len() as u32; let shred_bufs: Vec<_> = shreds.iter().map(Shred::payload).cloned().collect(); let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -5893,9 +5907,7 @@ pub mod tests { .insert_shreds(shreds, None, false) .expect("Expected successful write of shreds"); assert_eq!( - blockstore - .get_slot_entries(slot, u64::from(index - 1)) - .unwrap(), + blockstore.get_slot_entries(slot, index - 1).unwrap(), vec![last_entry], ); } @@ -7997,7 +8009,7 @@ pub mod tests { // Test that the iterator for slot 8 contains what was inserted earlier let shred_iter = blockstore.slot_data_iterator(8, 0).unwrap(); let result: Vec = shred_iter - .filter_map(|(_, bytes)| Shred::new_from_serialized_shred(bytes.to_vec()).ok()) + .filter_map(|(_, _, bytes)| Shred::new_from_serialized_shred(bytes.to_vec()).ok()) .collect(); assert_eq!(result.len(), slot_8_shreds.len()); assert_eq!(result, slot_8_shreds); @@ -10216,7 +10228,7 @@ pub mod tests { for (s, buf) in data_shreds.iter().zip(shred_bufs) { assert_eq!( blockstore - .get_data_shred(s.slot(), s.index() as u64) + .get_data_shred(s.slot(), s.index()) .unwrap() .unwrap(), buf @@ -10444,7 +10456,7 @@ pub mod tests { let data_iter = blockstore.slot_data_iterator(slot, 0).unwrap(); let mut num_data = 0; - for ((slot, index), _) in data_iter { + for (slot, index, _) in data_iter { num_data += 1; // Test that iterator and individual shred lookup yield same set assert!(blockstore.get_data_shred(slot, index).unwrap().is_some()); @@ -10458,7 +10470,7 @@ pub mod tests { let coding_iter = blockstore.slot_coding_iterator(slot, 0).unwrap(); let mut num_coding = 0; - for ((slot, index), _) in coding_iter { + for (slot, index, _) in coding_iter { num_coding += 1; // Test that the iterator and individual shred lookup yield same set assert!(blockstore.get_coding_shred(slot, index).unwrap().is_some()); @@ -10562,7 +10574,7 @@ pub mod tests { // There are 32 data shreds in slot 9. for index in 0..32 { assert_matches!( - blockstore.get_data_shred(unconfirmed_slot, index as u64), + blockstore.get_data_shred(unconfirmed_slot, index), Ok(Some(_)) ); } @@ -10632,7 +10644,7 @@ pub mod tests { let mut shred_index = ShredIndex::default(); for i in 0..10 { - shred_index.insert(i as u64); + shred_index.insert(i); assert_eq!( update_completed_data_indexes(true, i, &shred_index, &mut completed_data_indexes), vec![(i, i)] @@ -10943,16 +10955,16 @@ pub mod tests { #[test] fn test_duplicate_last_index_mark_dead() { - let num_shreds = 10; - let smaller_last_shred_index = 5; - let larger_last_shred_index = 8; + let num_shreds = 10u32; + let smaller_last_shred_index = 5u32; + let larger_last_shred_index = 8u32; let setup_test_shreds = |slot: Slot| -> Vec { let num_entries = max_ticks_per_n_shreds(num_shreds, Some(LEGACY_SHRED_DATA_CAPACITY)); let (mut shreds, _) = make_slot_entries(slot, 0, num_entries, /*merkle_variant:*/ false); - shreds[smaller_last_shred_index].set_last_in_slot(); - shreds[larger_last_shred_index].set_last_in_slot(); + shreds[smaller_last_shred_index as usize].set_last_in_slot(); + shreds[larger_last_shred_index as usize].set_last_in_slot(); shreds }; @@ -10965,7 +10977,7 @@ pub mod tests { let meta = blockstore.meta(slot).unwrap().unwrap(); assert_eq!(meta.consumed, shreds.len() as u64); let shreds_index = blockstore.get_index(slot).unwrap().unwrap(); - for i in 0..shreds.len() as u64 { + for i in 0..shreds.len() as u32 { assert!(shreds_index.data().contains(i)); } @@ -10990,7 +11002,7 @@ pub mod tests { // the "last" index shred itself is inserted. let (expected_slot_meta, expected_index) = get_expected_slot_meta_and_index_meta( &blockstore, - shreds[..=smaller_last_shred_index].to_vec(), + shreds[..=smaller_last_shred_index as usize].to_vec(), ); blockstore .insert_shreds(shreds.clone(), None, false) @@ -10998,7 +11010,7 @@ pub mod tests { assert!(blockstore.get_duplicate_slot(slot).is_some()); assert!(!blockstore.is_dead(slot)); for i in 0..num_shreds { - if i <= smaller_last_shred_index as u64 { + if i <= smaller_last_shred_index { assert_eq!( blockstore.get_data_shred(slot, i).unwrap().unwrap(), *shreds[i as usize].payload() @@ -11015,7 +11027,9 @@ pub mod tests { // Case 2: Inserting a duplicate with an even smaller last shred index should not // mark the slot as dead since the Slotmeta is full. let even_smaller_last_shred_duplicate = { - let mut payload = shreds[smaller_last_shred_index - 1].payload().clone(); + let mut payload = shreds[smaller_last_shred_index as usize - 1] + .payload() + .clone(); // Flip a byte to create a duplicate shred payload[0] = u8::MAX - payload[0]; let mut shred = Shred::new_from_serialized_shred(payload).unwrap(); @@ -11030,7 +11044,7 @@ pub mod tests { .unwrap(); assert!(!blockstore.is_dead(slot)); for i in 0..num_shreds { - if i <= smaller_last_shred_index as u64 { + if i <= smaller_last_shred_index { assert_eq!( blockstore.get_data_shred(slot, i).unwrap().unwrap(), *shreds[i as usize].payload() @@ -11060,10 +11074,8 @@ pub mod tests { // inserted into. for i in 0..num_shreds { let shred_to_check = &shreds[i as usize]; - let shred_index = shred_to_check.index() as u64; - if shred_index != smaller_last_shred_index as u64 - && shred_index != larger_last_shred_index as u64 - { + let shred_index = shred_to_check.index(); + if shred_index != smaller_last_shred_index && shred_index != larger_last_shred_index { assert_eq!( blockstore .get_data_shred(slot, shred_index) @@ -11091,10 +11103,8 @@ pub mod tests { // All the shreds will be inserted since dead slots can still be inserted into. for i in 0..num_shreds { let shred_to_check = &shreds[i as usize]; - let shred_index = shred_to_check.index() as u64; - if shred_index != smaller_last_shred_index as u64 - && shred_index != larger_last_shred_index as u64 - { + let shred_index = shred_to_check.index(); + if shred_index != smaller_last_shred_index && shred_index != larger_last_shred_index { assert_eq!( blockstore .get_data_shred(slot, shred_index) @@ -11969,7 +11979,7 @@ pub mod tests { let total_shreds = fec_set_index as u64 + data_shreds.len() as u64; // FEC set should be padded - assert_eq!(data_shreds.len(), DATA_SHREDS_PER_FEC_BLOCK); + assert_eq!(data_shreds.len(), DATA_SHREDS_PER_FEC_BLOCK as usize); // Missing slot meta assert_matches!( @@ -11980,7 +11990,7 @@ pub mod tests { // Incomplete slot blockstore .insert_shreds( - data_shreds[0..DATA_SHREDS_PER_FEC_BLOCK - 1].to_vec(), + data_shreds[0..(DATA_SHREDS_PER_FEC_BLOCK as usize) - 1].to_vec(), None, false, ) @@ -12037,7 +12047,7 @@ pub mod tests { ); let last_index = last_data_shreds.last().unwrap().index(); let total_shreds = first_data_shreds.len() + last_data_shreds.len(); - assert!(total_shreds < DATA_SHREDS_PER_FEC_BLOCK); + assert!(total_shreds < DATA_SHREDS_PER_FEC_BLOCK as usize); blockstore .insert_shreds(first_data_shreds, None, false) .unwrap(); @@ -12077,8 +12087,8 @@ pub mod tests { ); let last_index = last_data_shreds.last().unwrap().index(); let total_shreds = first_data_shreds.len() + last_data_shreds.len(); - assert!(last_data_shreds.len() < DATA_SHREDS_PER_FEC_BLOCK); - assert!(total_shreds > DATA_SHREDS_PER_FEC_BLOCK); + assert!(last_data_shreds.len() < DATA_SHREDS_PER_FEC_BLOCK as usize); + assert!(total_shreds > DATA_SHREDS_PER_FEC_BLOCK as usize); blockstore .insert_shreds(first_data_shreds, None, false) .unwrap(); @@ -12106,7 +12116,7 @@ pub mod tests { None, true, ); - assert!(first_data_shreds.len() > DATA_SHREDS_PER_FEC_BLOCK); + assert!(first_data_shreds.len() > DATA_SHREDS_PER_FEC_BLOCK as usize); let block_id = first_data_shreds[0].merkle_root().unwrap(); blockstore .insert_shreds(first_data_shreds, None, false) diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 00eea6f811ebcb..fc18e9295ae80b 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -271,14 +271,16 @@ pub mod columns { #[derive(Debug)] /// The shred data column /// - /// * index type: `(u64, u64)` + /// * index type: `(Slot, u64)` + /// Second element is the shred index and should be `u32`. /// * value type: [`Vec`] pub struct ShredData; #[derive(Debug)] /// The shred erasure code column /// - /// * index type: `(u64, u64)` + /// * index type: `(Slot, u64)` + /// Second element is the shred index and should be `u32`. /// * value type: [`Vec`] pub struct ShredCode; diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index f3940fe618444d..fd708d29149469 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -8,7 +8,7 @@ use { }, std::{ collections::BTreeSet, - ops::{Range, RangeBounds}, + ops::{Bound, Range, RangeBounds}, }, }; @@ -51,6 +51,15 @@ impl Default for ConnectedFlags { #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] /// The Meta column family +// TODO A number of fields in this struct represent shred indices, using `u64` which is incorrect. +// All shred indices must be `u32`. As already encode in the types of `completed_data_indexes`, as +// well as in the `ShredCommonHeader::index`. As some shred index values are correctly typed as +// `u32` and some are incorrectly types as `u64` there is a number of back and forth conversions +// sprinkled around the codebase. With call to `unwrap()` in case the value does not fit when +// narrowed. +// +// It would be much better to change all shred indices to be `u32`. But it would require some data +// migration. pub struct SlotMeta { /// The number of slots above the root (the genesis block). The first /// slot has slot 0. @@ -58,15 +67,21 @@ pub struct SlotMeta { /// The total number of consecutive shreds starting from index 0 we have received for this slot. /// At the same time, it is also an index of the first missing shred for this slot, while the /// slot is incomplete. + // TODO This should be `u32` as it is used as a shred index in the code. pub consumed: u64, /// The index *plus one* of the highest shred received for this slot. Useful /// for checking if the slot has received any shreds yet, and to calculate the /// range where there is one or more holes: `(consumed..received)`. + // TODO This should be `u32` as it is used as a shred index in the code. pub received: u64, /// The timestamp of the first time a shred was added for this slot pub first_shred_timestamp: u64, /// The index of the shred that is flagged as the last shred for this slot. /// None until the shred with LAST_SHRED_IN_SLOT flag is received. + // TODO This should be `Option`, it is used as a shred index in the code. Changing this + // type should not be very hard - just create a new serialization proxy, similar to + // `serde_compat` below, but for `Option` that still uses 8 bytes, with `u64::MAX` for + // `None`. #[serde(with = "serde_compat")] pub last_index: Option, /// The slot height of the block this one derives from. @@ -115,6 +130,9 @@ pub struct Index { #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] pub struct ShredIndex { /// Map representing presence/absence of shreds + // TODO This should have been `BTreeSet` as shred indices can not exceed a `u32`. But + // changing this type would require data migration in the blockstore. An exercise for the + // future. index: BTreeSet, } @@ -250,24 +268,42 @@ impl Index { } } +// To be replaced by `range.map(Into::into)` when `#![feature(bound_map)]` is stabilized. +// +// https://github.com/rust-lang/rust/issues/86026 +fn bound_into(from: Bound) -> Bound +where + F: Into, +{ + match from { + Bound::Included(x) => Bound::Included(x.into()), + Bound::Excluded(x) => Bound::Excluded(x.into()), + Bound::Unbounded => Bound::Unbounded, + } +} + impl ShredIndex { pub fn num_shreds(&self) -> usize { self.index.len() } - pub(crate) fn range(&self, bounds: R) -> impl Iterator + pub(crate) fn range(&self, bounds: R) -> impl Iterator + '_ where - R: RangeBounds, + R: RangeBounds, { - self.index.range(bounds) + let range = ( + bound_into::(bounds.start_bound().cloned()), + bound_into::(bounds.end_bound().cloned()), + ); + self.index.range(range).map(|i| u32::try_from(*i).unwrap()) } - pub(crate) fn contains(&self, index: u64) -> bool { - self.index.contains(&index) + pub(crate) fn contains(&self, index: u32) -> bool { + self.index.contains(&u64::from(index)) } - pub(crate) fn insert(&mut self, index: u64) { - self.index.insert(index); + pub(crate) fn insert(&mut self, index: u32) { + self.index.insert(u64::from(index)); } } @@ -419,15 +455,16 @@ impl ErasureMeta { self.config } - pub(crate) fn data_shreds_indices(&self) -> Range { - let num_data = self.config.num_data as u64; - let fec_set_index = u64::from(self.fec_set_index); + pub(crate) fn data_shreds_indices(&self) -> Range { + let num_data = self.config.num_data as u32; + let fec_set_index = self.fec_set_index; fec_set_index..fec_set_index + num_data } - pub(crate) fn coding_shreds_indices(&self) -> Range { - let num_coding = self.config.num_coding as u64; - self.first_coding_index..self.first_coding_index + num_coding + pub(crate) fn coding_shreds_indices(&self) -> Range { + let num_coding = self.config.num_coding as u32; + let first_coding_index = self.first_coding_index as u32; + first_coding_index..first_coding_index + num_coding } pub(crate) fn first_received_coding_shred_index(&self) -> Option { @@ -617,8 +654,8 @@ mod test { let mut rng = thread_rng(); let mut index = Index::new(0); - let data_indexes = 0..erasure_config.num_data as u64; - let coding_indexes = 0..erasure_config.num_coding as u64; + let data_indexes = 0..erasure_config.num_data as u32; + let coding_indexes = 0..erasure_config.num_coding as u32; assert_eq!(e_meta.status(&index), StillNeed(erasure_config.num_data)); @@ -637,7 +674,7 @@ mod test { .collect::>() .choose_multiple(&mut rng, erasure_config.num_data) { - index.data_mut().index.remove(&idx); + index.data_mut().index.remove(&idx.into()); assert_eq!(e_meta.status(&index), CanRecover); } @@ -650,7 +687,7 @@ mod test { .collect::>() .choose_multiple(&mut rng, erasure_config.num_coding) { - index.coding_mut().index.remove(&idx); + index.coding_mut().index.remove(&idx.into()); assert_eq!(e_meta.status(&index), DataFull); } diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 24ff5ce8fb7baa..1085a05c78457e 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1268,7 +1268,7 @@ impl ReplaySlotStats { slot: Slot, num_txs: usize, num_entries: usize, - num_shreds: u64, + num_shreds: u32, bank_complete_time_us: u64, is_unified_scheduler_enabled: bool, ) { @@ -1389,7 +1389,7 @@ impl ReplaySlotStats { pub struct ConfirmationProgress { pub last_entry: Hash, pub tick_hash_count: u64, - pub num_shreds: u64, + pub num_shreds: u32, pub num_entries: usize, pub num_txs: usize, } @@ -1455,7 +1455,7 @@ pub fn confirm_slot( fn confirm_slot_entries( bank: &BankWithScheduler, replay_tx_thread_pool: &ThreadPool, - slot_entries_load_result: (Vec, u64, bool), + slot_entries_load_result: (Vec, u32, bool), timing: &mut ConfirmationTiming, progress: &mut ConfirmationProgress, skip_verification: bool, diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 814ec2b5bf303a..6d988605964944 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -110,7 +110,7 @@ const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; // data shreds per each batch as below. The actual number of data shreds in // each erasure batch depends on the number of shreds obtained from serializing // a &[Entry]. -pub const DATA_SHREDS_PER_FEC_BLOCK: usize = 32; +pub const DATA_SHREDS_PER_FEC_BLOCK: u32 = 32; // For legacy tests and benchmarks. const_assert_eq!(LEGACY_SHRED_DATA_CAPACITY, 1051); @@ -159,6 +159,8 @@ pub enum Error { InvalidShredFlags(u8), #[error("Invalid {0:?} shred index: {1}")] InvalidShredIndex(ShredType, /*shred index:*/ u32), + #[error("Shred index exceeds u32")] + ShredIndexExceeded, #[error("Invalid shred type")] InvalidShredType, #[error("Invalid shred variant")] @@ -354,7 +356,7 @@ impl Shred { // Like Shred::erasure_shard but returning a slice. dispatch!(pub(crate) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); // Returns the shard index within the erasure coding set. - dispatch!(pub(crate) fn erasure_shard_index(&self) -> Result); + dispatch!(pub(crate) fn erasure_shard_index(&self) -> Result); dispatch!(pub(crate) fn retransmitter_signature(&self) -> Result); dispatch!(pub fn into_payload(self) -> Vec); @@ -1176,7 +1178,7 @@ pub fn should_discard_shred( }; match ShredType::from(shred_variant) { ShredType::Code => { - if index >= shred_code::MAX_CODE_SHREDS_PER_SLOT as u32 { + if index >= shred_code::MAX_CODE_SHREDS_PER_SLOT { stats.index_out_of_bounds += 1; return true; } @@ -1186,7 +1188,7 @@ pub fn should_discard_shred( } } ShredType::Data => { - if index >= MAX_DATA_SHREDS_PER_SLOT as u32 { + if index >= MAX_DATA_SHREDS_PER_SLOT { stats.index_out_of_bounds += 1; return true; } @@ -1232,14 +1234,14 @@ pub fn should_discard_shred( false } -pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option) -> u64 { +pub fn max_ticks_per_n_shreds(num_shreds: u32, shred_data_size: Option) -> u64 { let ticks = create_ticks(1, 0, Hash::default()); max_entries_per_n_shred(&ticks[0], num_shreds, shred_data_size) } pub fn max_entries_per_n_shred( entry: &Entry, - num_shreds: u64, + num_shreds: u32, shred_data_size: Option, ) -> u64 { // Default 32:32 erasure batches yields 64 shreds; log2(64) = 6. @@ -1252,7 +1254,7 @@ pub fn max_entries_per_n_shred( let entry_size = bincode::serialized_size(entry).unwrap(); let count_size = vec_size - entry_size; - (shred_data_size * num_shreds - count_size) / entry_size + (shred_data_size * u64::from(num_shreds) - count_size) / entry_size } pub fn verify_test_data_shred( @@ -1656,7 +1658,7 @@ mod tests { assert_eq!(stats.slot_out_of_range, 1); } { - let index = u32::try_from(MAX_CODE_SHREDS_PER_SLOT).unwrap(); + let index = MAX_CODE_SHREDS_PER_SLOT; { let mut cursor = Cursor::new(packet.buffer_mut()); cursor diff --git a/ledger/src/shred/legacy.rs b/ledger/src/shred/legacy.rs index ad2e59fc4dd7e3..6626e29b44ec7a 100644 --- a/ledger/src/shred/legacy.rs +++ b/ledger/src/shred/legacy.rs @@ -80,7 +80,7 @@ impl<'a> Shred<'a> for ShredData { shred.sanitize().map(|_| shred) } - fn erasure_shard_index(&self) -> Result { + fn erasure_shard_index(&self) -> Result { shred_data::erasure_shard_index(self).ok_or_else(|| { let headers = Box::new((self.common_header, self.data_header)); Error::InvalidErasureShardIndex(headers) @@ -142,7 +142,7 @@ impl<'a> Shred<'a> for ShredCode { shred.sanitize().map(|_| shred) } - fn erasure_shard_index(&self) -> Result { + fn erasure_shard_index(&self) -> Result { shred_code::erasure_shard_index(self).ok_or_else(|| { let headers = Box::new((self.common_header, self.coding_header)); Error::InvalidErasureShardIndex(headers) @@ -377,7 +377,7 @@ mod test { } { let mut shred = shred.clone(); - shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32; + shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT; assert_matches!( shred.sanitize(), Err(Error::InvalidShredIndex(ShredType::Data, 32768)) @@ -438,7 +438,7 @@ mod test { } { let mut shred = shred.clone(); - shred.common_header.index = MAX_CODE_SHREDS_PER_SLOT as u32; + shred.common_header.index = MAX_CODE_SHREDS_PER_SLOT; assert_matches!( shred.sanitize(), Err(Error::InvalidShredIndex(ShredType::Code, 32_768)) @@ -458,11 +458,11 @@ mod test { // shred has index > u32::MAX should fail. { let mut shred = shred.clone(); - shred.common_header.fec_set_index = MAX_DATA_SHREDS_PER_SLOT as u32 - 2; + shred.common_header.fec_set_index = MAX_DATA_SHREDS_PER_SLOT - 2; shred.coding_header.num_data_shreds = 3; shred.coding_header.num_coding_shreds = 4; shred.coding_header.position = 1; - shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32 - 2; + shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT - 2; assert_matches!( shred.sanitize(), Err(Error::InvalidErasureShardIndex { .. }) diff --git a/ledger/src/shred/merkle.rs b/ledger/src/shred/merkle.rs index e569999ba3d82f..680ea5c418a86b 100644 --- a/ledger/src/shred/merkle.rs +++ b/ledger/src/shred/merkle.rs @@ -16,7 +16,7 @@ use { shredder::{self, ReedSolomonCache}, }, assert_matches::debug_assert_matches, - itertools::{Either, Itertools}, + itertools::{izip, Either, Itertools}, rayon::{prelude::*, ThreadPool}, reed_solomon_erasure::Error::{InvalidIndex, TooFewParityShards, TooFewShards}, solana_perf::packet::deserialize_from_with_limit, @@ -86,7 +86,7 @@ pub(super) enum Shred { impl Shred { dispatch!(fn common_header(&self) -> &ShredCommonHeader); dispatch!(fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); - dispatch!(fn erasure_shard_index(&self) -> Result); + dispatch!(fn erasure_shard_index(&self) -> Result); dispatch!(fn merkle_node(&self) -> Result); dispatch!(fn payload(&self) -> &Vec); dispatch!(fn sanitize(&self) -> Result<(), Error>); @@ -206,10 +206,7 @@ impl ShredData { let fec_set_index = <[u8; 4]>::try_from(shred.get(79..83)?) .map(u32::from_le_bytes) .ok()?; - shred::layout::get_index(shred)? - .checked_sub(fec_set_index) - .map(usize::try_from)? - .ok()? + shred::layout::get_index(shred)?.checked_sub(fec_set_index)? }; let proof_offset = Self::get_proof_offset(proof_size, chained, resigned).ok()?; let proof = get_merkle_proof(shred, proof_offset, proof_size).ok()?; @@ -281,11 +278,11 @@ impl ShredCode { let index = { let num_data_shreds = <[u8; 2]>::try_from(shred.get(83..85)?) .map(u16::from_le_bytes) - .map(usize::from) + .map(u32::from) .ok()?; let position = <[u8; 2]>::try_from(shred.get(87..89)?) .map(u16::from_le_bytes) - .map(usize::from) + .map(u32::from) .ok()?; num_data_shreds.checked_add(position)? }; @@ -509,7 +506,7 @@ impl<'a> ShredTrait<'a> for ShredData { Ok(shred) } - fn erasure_shard_index(&self) -> Result { + fn erasure_shard_index(&self) -> Result { shred_data::erasure_shard_index(self).ok_or_else(|| { let headers = Box::new((self.common_header, self.data_header)); Error::InvalidErasureShardIndex(headers) @@ -595,7 +592,7 @@ impl<'a> ShredTrait<'a> for ShredCode { Ok(shred) } - fn erasure_shard_index(&self) -> Result { + fn erasure_shard_index(&self) -> Result { shred_code::erasure_shard_index(self).ok_or_else(|| { let headers = Box::new((self.common_header, self.coding_header)); Error::InvalidErasureShardIndex(headers) @@ -699,7 +696,7 @@ fn join_nodes, T: AsRef<[u8]>>(node: S, other: T) -> Hash { // Recovers root of the merkle tree from a leaf node // at the given index and the respective proof. -fn get_merkle_root<'a, I>(index: usize, node: Hash, proof: I) -> Result +fn get_merkle_root<'a, I>(index: u32, node: Hash, proof: I) -> Result where I: IntoIterator, { @@ -755,8 +752,8 @@ fn make_merkle_tree(mut nodes: Vec) -> Vec { } fn make_merkle_proof( - mut index: usize, // leaf index ~ shred's erasure shard index. - mut size: usize, // number of leaves ~ erasure batch size. + mut index: u32, // leaf index ~ shred's erasure shard index. + mut size: u32, // number of leaves ~ erasure batch size. tree: &[Hash], ) -> Option> { if index >= size { @@ -765,14 +762,15 @@ fn make_merkle_proof( let mut offset = 0; let mut proof = Vec::<&MerkleProofEntry>::new(); while size > 1 { - let node = tree.get(offset + (index ^ 1).min(size - 1))?; + let node_position = offset + (index ^ 1).min(size - 1); + let node = tree.get(node_position as usize)?; let entry = &node.as_ref()[..SIZE_OF_MERKLE_PROOF_ENTRY]; proof.push(<&MerkleProofEntry>::try_from(entry).unwrap()); offset += size; size = (size + 1) >> 1; index >>= 1; } - (offset + 1 == tree.len()).then_some(proof) + (offset as usize + 1 == tree.len()).then_some(proof) } pub(super) fn recover( @@ -861,18 +859,18 @@ pub(super) fn recover( } } })); - let num_data_shreds = usize::from(coding_header.num_data_shreds); - let num_coding_shreds = usize::from(coding_header.num_coding_shreds); + let num_data_shreds = u32::from(coding_header.num_data_shreds); + let num_coding_shreds = u32::from(coding_header.num_coding_shreds); let num_shards = num_data_shreds + num_coding_shreds; // Obtain erasure encoded shards from shreds. let shreds = { - let mut batch = vec![None; num_shards]; + let mut batch = vec![None; num_shards as usize]; while let Some(shred) = shreds.pop() { let index = match shred.erasure_shard_index() { - Ok(index) if index < batch.len() => index, + Ok(index) if (index as usize) < batch.len() => index, _ => return Err(Error::from(InvalidIndex)), }; - batch[index] = Some(shred); + batch[index as usize] = Some(shred); } batch }; @@ -888,8 +886,8 @@ pub(super) fn recover( let mut shreds: Vec<_> = shreds .into_iter() .zip(shards) - .enumerate() - .map(|(index, (shred, shard))| { + .zip(0..) + .map(|((shred, shard), index)| { if let Some(shred) = shred { return Ok(shred); } @@ -929,7 +927,7 @@ pub(super) fn recover( ..coding_header }; let common_header = ShredCommonHeader { - index: common_header.index + offset as u32, + index: common_header.index + offset, ..common_header }; let shred = ShredCode::from_recovered_shard( @@ -949,7 +947,7 @@ pub(super) fn recover( .map(Shred::merkle_node) .collect::>()?; let tree = make_merkle_tree(nodes); - for (index, (shred, mask)) in shreds.iter_mut().zip(&mask).enumerate() { + for (index, shred, mask) in izip!(0.., shreds.iter_mut(), &mask) { let proof = make_merkle_proof(index, num_shards, &tree).ok_or(Error::InvalidMerkleProof)?; if proof.len() != usize::from(proof_size) { return Err(Error::InvalidMerkleProof); @@ -978,8 +976,8 @@ pub(super) fn recover( } // Maps number of (code + data) shreds to merkle_proof.len(). -fn get_proof_size(num_shreds: usize) -> u8 { - let bits = usize::BITS - num_shreds.leading_zeros(); +fn get_proof_size(num_shreds: u32) -> u8 { + let bits = u32::BITS - num_shreds.leading_zeros(); let proof_size = if num_shreds.is_power_of_two() { bits.checked_sub(1).unwrap() } else { @@ -1027,7 +1025,7 @@ pub(super) fn make_shreds_from_data( shredder::get_erasure_batch_size(DATA_SHREDS_PER_FEC_BLOCK, is_last_in_slot); let proof_size = get_proof_size(erasure_batch_size); let data_buffer_size = ShredData::capacity(proof_size, chained, resigned)?; - let chunk_size = DATA_SHREDS_PER_FEC_BLOCK * data_buffer_size; + let chunk_size = DATA_SHREDS_PER_FEC_BLOCK as usize * data_buffer_size; let mut common_header = ShredCommonHeader { signature: Signature::default(), shred_variant: ShredVariant::MerkleData { @@ -1086,7 +1084,8 @@ pub(super) fn make_shreds_from_data( let (proof_size, data_buffer_size, num_data_shreds) = (1u8..32) .find_map(|proof_size| { let data_buffer_size = ShredData::capacity(proof_size, chained, resigned).ok()?; - let num_data_shreds = (data.len() + data_buffer_size - 1) / data_buffer_size; + let num_data_shreds = + u32::try_from((data.len() + data_buffer_size - 1) / data_buffer_size).ok()?; let num_data_shreds = num_data_shreds.max(min_num_data_shreds); let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot); @@ -1106,7 +1105,7 @@ pub(super) fn make_shreds_from_data( for shred in data .chunks(data_buffer_size) .chain(std::iter::repeat(&[][..])) - .take(num_data_shreds) + .take(num_data_shreds as usize) { let shred = new_shred_data(common_header, data_header, shred); shreds.push(shred); @@ -1158,11 +1157,11 @@ pub(super) fn make_shreds_from_data( .iter() .scan(next_code_index, |next_code_index, chunk| { let out = Some(*next_code_index); - let num_data_shreds = chunk.len(); + let num_data_shreds = u32::try_from(chunk.len()).ok()?; let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot); let num_coding_shreds = erasure_batch_size - num_data_shreds; - *next_code_index += num_coding_shreds as u32; + *next_code_index += num_coding_shreds; out }) .collect(); @@ -1242,7 +1241,7 @@ fn make_erasure_batch( is_last_in_slot: bool, reed_solomon_cache: &ReedSolomonCache, ) -> Result<(/*merkle root:*/ Hash, Vec), Error> { - let num_data_shreds = shreds.len(); + let num_data_shreds = u32::try_from(shreds.len()).map_err(|_| Error::ShredIndexExceeded)?; let chained = chained_merkle_root.is_some(); let resigned = chained && is_last_in_slot; let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot); @@ -1270,7 +1269,7 @@ fn make_erasure_batch( .collect::>()?; // Shreds should have erasure encoded shard of the same length. debug_assert_eq!(data.iter().map(|shard| shard.len()).dedup().count(), 1); - let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds]; + let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds as usize]; reed_solomon_cache .get(num_data_shreds, num_coding_shreds)? .encode_sep(&data, &mut parity[..])?; @@ -1316,7 +1315,7 @@ fn make_erasure_batch( let root = tree.last().ok_or(Error::InvalidMerkleProof)?; let signature = keypair.sign_message(root.as_ref()); // Populate merkle proof for all shreds and attach signature. - for (index, shred) in shreds.iter_mut().enumerate() { + for (index, shred) in izip!(0.., &mut shreds) { let proof = make_merkle_proof(index, erasure_batch_size, &tree).ok_or(Error::InvalidMerkleProof)?; debug_assert_eq!(proof.len(), usize::from(proof_size)); @@ -1429,14 +1428,14 @@ mod test { assert_eq!(entry, &bytes[..SIZE_OF_MERKLE_PROOF_ENTRY]); } - fn run_merkle_tree_round_trip(rng: &mut R, size: usize) { + fn run_merkle_tree_round_trip(rng: &mut R, size: u32) { let nodes = repeat_with(|| rng.gen::<[u8; 32]>()).map(Hash::from); - let nodes: Vec<_> = nodes.take(size).collect(); + let nodes: Vec<_> = nodes.take(size as usize).collect(); let tree = make_merkle_tree(nodes.clone()); let root = tree.last().copied().unwrap(); for index in 0..size { let proof = make_merkle_proof(index, size, &tree).unwrap(); - for (k, &node) in nodes.iter().enumerate() { + for (k, &node) in izip!(0.., &nodes) { let proof = proof.iter().copied(); if k == index { assert_eq!(root, get_merkle_root(k, node, proof).unwrap()); @@ -1464,7 +1463,7 @@ mod test { #[test_case(73, false, false)] #[test_case(73, true, false)] #[test_case(73, true, true)] - fn test_recover_merkle_shreds(num_shreds: usize, chained: bool, resigned: bool) { + fn test_recover_merkle_shreds(num_shreds: u32, chained: bool, resigned: bool) { let mut rng = rand::thread_rng(); let reed_solomon_cache = ReedSolomonCache::default(); for num_data_shreds in 1..num_shreds { @@ -1484,8 +1483,8 @@ mod test { rng: &mut R, chained: bool, resigned: bool, - num_data_shreds: usize, - num_coding_shreds: usize, + num_data_shreds: u32, + num_coding_shreds: u32, reed_solomon_cache: &ReedSolomonCache, ) { let keypair = Keypair::new(); @@ -1517,10 +1516,10 @@ mod test { num_coding_shreds: num_coding_shreds as u16, position: 0, }; - let mut shreds = Vec::with_capacity(num_shreds); + let mut shreds = Vec::with_capacity(num_shreds as usize); for i in 0..num_data_shreds { let common_header = ShredCommonHeader { - index: common_header.index + i as u32, + index: common_header.index + i, ..common_header }; let size = ShredData::SIZE_OF_HEADERS + rng.gen_range(0..capacity); @@ -1545,20 +1544,20 @@ mod test { .map(Shred::erasure_shard_as_slice) .collect::>() .unwrap(); - let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds]; + let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds as usize]; reed_solomon_cache .get(num_data_shreds, num_coding_shreds) .unwrap() .encode_sep(&data, &mut parity[..]) .unwrap(); - for (i, code) in parity.into_iter().enumerate() { + for (i, code) in izip!(0.., parity) { let common_header = ShredCommonHeader { shred_variant: ShredVariant::MerkleCode { proof_size, chained, resigned, }, - index: common_header.index + i as u32 + 7, + index: common_header.index + i + 7, ..common_header }; let coding_header = CodingShredHeader { @@ -1584,7 +1583,7 @@ mod test { .collect::>() .unwrap(); let tree = make_merkle_tree(nodes); - for (index, shred) in shreds.iter_mut().enumerate() { + for (index, shred) in izip!(0.., &mut shreds) { let proof = make_merkle_proof(index, num_shreds, &tree).unwrap(); assert_eq!(proof.len(), usize::from(proof_size)); shred.set_merkle_proof(&proof).unwrap(); @@ -1660,10 +1659,10 @@ mod test { assert_eq!(get_proof_size(63), 6); assert_eq!(get_proof_size(64), 6); assert_eq!(get_proof_size(65), 7); - assert_eq!(get_proof_size(usize::MAX - 1), 64); - assert_eq!(get_proof_size(usize::MAX), 64); + assert_eq!(get_proof_size(u32::MAX - 1), 32); + assert_eq!(get_proof_size(u32::MAX), 32); for proof_size in 1u8..9 { - let max_num_shreds = 1usize << u32::from(proof_size); + let max_num_shreds = 1u32 << proof_size; let min_num_shreds = (max_num_shreds >> 1) + 1; for num_shreds in min_num_shreds..=max_num_shreds { assert_eq!(get_proof_size(num_shreds), proof_size); diff --git a/ledger/src/shred/shred_code.rs b/ledger/src/shred/shred_code.rs index f1625c132256e7..2ffbf719be8aa2 100644 --- a/ledger/src/shred/shred_code.rs +++ b/ledger/src/shred/shred_code.rs @@ -11,7 +11,7 @@ use { }; const_assert_eq!(MAX_CODE_SHREDS_PER_SLOT, 32_768); -pub const MAX_CODE_SHREDS_PER_SLOT: usize = MAX_DATA_SHREDS_PER_SLOT; +pub const MAX_CODE_SHREDS_PER_SLOT: u32 = MAX_DATA_SHREDS_PER_SLOT; const_assert_eq!(ShredCode::SIZE_OF_PAYLOAD, 1228); @@ -29,7 +29,7 @@ impl ShredCode { dispatch!(pub(super) fn common_header(&self) -> &ShredCommonHeader); dispatch!(pub(super) fn erasure_shard(self) -> Result, Error>); dispatch!(pub(super) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); - dispatch!(pub(super) fn erasure_shard_index(&self) -> Result); + dispatch!(pub(super) fn erasure_shard_index(&self) -> Result); dispatch!(pub(super) fn first_coding_index(&self) -> Option); dispatch!(pub(super) fn into_payload(self) -> Vec); dispatch!(pub(super) fn payload(&self) -> &Vec); @@ -128,28 +128,28 @@ impl From for ShredCode { } #[inline] -pub(super) fn erasure_shard_index(shred: &T) -> Option { +pub(super) fn erasure_shard_index(shred: &T) -> Option { // Assert that the last shred index in the erasure set does not // overshoot MAX_{DATA,CODE}_SHREDS_PER_SLOT. let common_header = shred.common_header(); let coding_header = shred.coding_header(); if common_header .fec_set_index - .checked_add(u32::from(coding_header.num_data_shreds.checked_sub(1)?))? as usize + .checked_add(coding_header.num_data_shreds.checked_sub(1)?.into())? >= MAX_DATA_SHREDS_PER_SLOT { return None; } if shred .first_coding_index()? - .checked_add(u32::from(coding_header.num_coding_shreds.checked_sub(1)?))? as usize + .checked_add(coding_header.num_coding_shreds.checked_sub(1)?.into())? >= MAX_CODE_SHREDS_PER_SLOT { return None; } - let num_data_shreds = usize::from(coding_header.num_data_shreds); - let num_coding_shreds = usize::from(coding_header.num_coding_shreds); - let position = usize::from(coding_header.position); + let num_data_shreds = u32::from(coding_header.num_data_shreds); + let num_coding_shreds = u32::from(coding_header.num_coding_shreds); + let position = u32::from(coding_header.position); let fec_set_size = num_data_shreds.checked_add(num_coding_shreds)?; let index = position.checked_add(num_data_shreds)?; (index < fec_set_size).then_some(index) @@ -161,13 +161,13 @@ pub(super) fn sanitize(shred: &T) -> Result<(), Error> { } let common_header = shred.common_header(); let coding_header = shred.coding_header(); - if common_header.index as usize >= MAX_CODE_SHREDS_PER_SLOT { + if common_header.index >= MAX_CODE_SHREDS_PER_SLOT { return Err(Error::InvalidShredIndex( ShredType::Code, common_header.index, )); } - let num_coding_shreds = usize::from(coding_header.num_coding_shreds); + let num_coding_shreds = u32::from(coding_header.num_coding_shreds); if num_coding_shreds > 8 * DATA_SHREDS_PER_FEC_BLOCK { return Err(Error::InvalidNumCodingShreds( coding_header.num_coding_shreds, diff --git a/ledger/src/shred/shred_data.rs b/ledger/src/shred/shred_data.rs index 3aa97ebde9e4e1..02ab1a0e11d88b 100644 --- a/ledger/src/shred/shred_data.rs +++ b/ledger/src/shred/shred_data.rs @@ -23,7 +23,7 @@ impl ShredData { dispatch!(pub(super) fn data(&self) -> Result<&[u8], Error>); dispatch!(pub(super) fn erasure_shard(self) -> Result, Error>); dispatch!(pub(super) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>); - dispatch!(pub(super) fn erasure_shard_index(&self) -> Result); + dispatch!(pub(super) fn erasure_shard_index(&self) -> Result); dispatch!(pub(super) fn into_payload(self) -> Vec); dispatch!(pub(super) fn parent(&self) -> Result); dispatch!(pub(super) fn payload(&self) -> &Vec); @@ -165,10 +165,10 @@ impl From for ShredData { } #[inline] -pub(super) fn erasure_shard_index(shred: &T) -> Option { +pub(super) fn erasure_shard_index(shred: &T) -> Option { let fec_set_index = shred.common_header().fec_set_index; let index = shred.common_header().index.checked_sub(fec_set_index)?; - usize::try_from(index).ok() + Some(index) } pub(super) fn sanitize(shred: &T) -> Result<(), Error> { @@ -177,7 +177,7 @@ pub(super) fn sanitize(shred: &T) -> Result<(), Error> { } let common_header = shred.common_header(); let data_header = shred.data_header(); - if common_header.index as usize >= MAX_DATA_SHREDS_PER_SLOT { + if common_header.index >= MAX_DATA_SHREDS_PER_SLOT { return Err(Error::InvalidShredIndex( ShredType::Data, common_header.index, diff --git a/ledger/src/shred/traits.rs b/ledger/src/shred/traits.rs index 35a6c0a617af64..06511d99d9631c 100644 --- a/ledger/src/shred/traits.rs +++ b/ledger/src/shred/traits.rs @@ -22,7 +22,7 @@ pub(super) trait Shred<'a>: Sized { fn into_payload(self) -> Vec; // Returns the shard index within the erasure coding set. - fn erasure_shard_index(&self) -> Result; + fn erasure_shard_index(&self) -> Result; // Returns the portion of the shred's payload which is erasure coded. fn erasure_shard(self) -> Result, Error>; // Like Shred::erasure_shard but returning a slice. diff --git a/ledger/src/shredder.rs b/ledger/src/shredder.rs index e1d3924e49968c..d67299d5eb5563 100644 --- a/ledger/src/shredder.rs +++ b/ledger/src/shredder.rs @@ -31,7 +31,7 @@ lazy_static! { // Maps number of data shreds to the optimal erasure batch size which has the // same recovery probabilities as a 32:32 erasure batch. -pub(crate) const ERASURE_BATCH_SIZE: [usize; 33] = [ +pub(crate) const ERASURE_BATCH_SIZE: [u32; 33] = [ 0, 18, 20, 22, 23, 25, 27, 28, 30, // 8 32, 33, 35, 36, 38, 39, 41, 42, // 16 43, 45, 46, 48, 49, 51, 52, 53, // 24 @@ -39,7 +39,7 @@ pub(crate) const ERASURE_BATCH_SIZE: [usize; 33] = [ ]; pub struct ReedSolomonCache( - Mutex>>, + Mutex>>, ); #[derive(Debug)] @@ -163,8 +163,11 @@ impl Shredder { shred }; let shreds: Vec<&[u8]> = serialized_shreds.chunks(data_buffer_size).collect(); - let fec_set_offsets: Vec = - get_fec_set_offsets(shreds.len(), DATA_SHREDS_PER_FEC_BLOCK).collect(); + let fec_set_offsets: Vec = get_fec_set_offsets( + u32::try_from(shreds.len()).unwrap(), + DATA_SHREDS_PER_FEC_BLOCK, + ) + .collect(); assert_eq!(shreds.len(), fec_set_offsets.len()); let shreds: Vec = PAR_THREAD_POOL.install(|| { shreds @@ -173,7 +176,7 @@ impl Shredder { .enumerate() .map(|(i, (shred, offset))| { let shred_index = next_shred_index + i as u32; - let fec_set_index = next_shred_index + offset as u32; + let fec_set_index = next_shred_index + offset; make_data_shred(shred, shred_index, fec_set_index) }) .collect() @@ -209,7 +212,7 @@ impl Shredder { chunks .iter() .scan(next_code_index, |next_code_index, chunk| { - let num_data_shreds = chunk.len(); + let num_data_shreds = u32::try_from(chunk.len()).unwrap(); let is_last_in_slot = chunk .last() .copied() @@ -217,7 +220,7 @@ impl Shredder { .unwrap_or(true); let erasure_batch_size = get_erasure_batch_size(num_data_shreds, is_last_in_slot); - *next_code_index += (erasure_batch_size - num_data_shreds) as u32; + *next_code_index += erasure_batch_size - num_data_shreds; Some(*next_code_index) }), ) @@ -284,7 +287,7 @@ impl Shredder { .all(|shred| shred.slot() == slot && shred.version() == version && shred.fec_set_index() == fec_set_index)); - let num_data = data.len(); + let num_data = u32::try_from(data.len()).unwrap(); let is_last_in_slot = data .last() .map(Borrow::borrow) @@ -300,7 +303,7 @@ impl Shredder { .map(Shred::erasure_shard_as_slice) .collect::>() .unwrap(); - let mut parity = vec![vec![0u8; data[0].len()]; num_coding]; + let mut parity = vec![vec![0u8; data[0].len()]; num_coding as usize]; reed_solomon_cache .get(num_data, num_coding) .unwrap() @@ -310,9 +313,9 @@ impl Shredder { let num_coding = u16::try_from(num_coding).unwrap(); parity .iter() - .enumerate() - .map(|(i, parity)| { - let index = next_code_index + u32::try_from(i).unwrap(); + .zip(0..) + .map(|(parity, i)| { + let index = next_code_index + i; Shred::new_from_parity_shard( slot, index, @@ -351,27 +354,25 @@ impl Shredder { .filter(|shred| shred.is_code()) .all(|shred| shred.num_data_shreds().unwrap() == num_data_shreds && shred.num_coding_shreds().unwrap() == num_coding_shreds)); - let num_data_shreds = num_data_shreds as usize; - let num_coding_shreds = num_coding_shreds as usize; let fec_set_size = num_data_shreds + num_coding_shreds; - if num_coding_shreds == 0 || shreds.len() >= fec_set_size { + if num_coding_shreds == 0 || shreds.len() >= fec_set_size.into() { return Ok(Vec::default()); } // Mask to exclude data shreds already received from the return value. - let mut mask = vec![false; num_data_shreds]; - let mut shards = vec![None; fec_set_size]; + let mut mask = vec![false; num_data_shreds as usize]; + let mut shards = vec![None; fec_set_size as usize]; for shred in shreds { let index = match shred.erasure_shard_index() { - Ok(index) if index < fec_set_size => index, + Ok(index) if index < fec_set_size.into() => index, _ => return Err(Error::from(InvalidIndex)), }; - shards[index] = Some(shred.erasure_shard()?); - if index < num_data_shreds { - mask[index] = true; + shards[index as usize] = Some(shred.erasure_shard()?); + if index < num_data_shreds.into() { + mask[index as usize] = true; } } reed_solomon_cache - .get(num_data_shreds, num_coding_shreds)? + .get(num_data_shreds.into(), num_coding_shreds.into())? .reconstruct_data(&mut shards)?; let recovered_data = mask .into_iter() @@ -382,7 +383,7 @@ impl Shredder { shred.slot() == slot && shred.is_data() && match shred.erasure_shard_index() { - Ok(index) => index < num_data_shreds, + Ok(index) => index < num_data_shreds.into(), Err(_) => false, } }) @@ -416,12 +417,12 @@ impl Shredder { } impl ReedSolomonCache { - const CAPACITY: usize = 4 * DATA_SHREDS_PER_FEC_BLOCK; + const CAPACITY: usize = 4 * DATA_SHREDS_PER_FEC_BLOCK as usize; pub(crate) fn get( &self, - data_shards: usize, - parity_shards: usize, + data_shards: u32, + parity_shards: u32, ) -> Result, reed_solomon_erasure::Error> { let key = (data_shards, parity_shards); { @@ -430,7 +431,7 @@ impl ReedSolomonCache { return Ok(entry.clone()); } } - let entry = ReedSolomon::new(data_shards, parity_shards)?; + let entry = ReedSolomon::new(data_shards as usize, parity_shards as usize)?; let entry = Arc::new(entry); { let entry = entry.clone(); @@ -448,9 +449,9 @@ impl Default for ReedSolomonCache { } /// Maps number of data shreds in each batch to the erasure batch size. -pub(crate) fn get_erasure_batch_size(num_data_shreds: usize, is_last_in_slot: bool) -> usize { +pub(crate) fn get_erasure_batch_size(num_data_shreds: u32, is_last_in_slot: bool) -> u32 { let erasure_batch_size = ERASURE_BATCH_SIZE - .get(num_data_shreds) + .get(num_data_shreds as usize) .copied() .unwrap_or(2 * num_data_shreds); if is_last_in_slot { @@ -461,10 +462,7 @@ pub(crate) fn get_erasure_batch_size(num_data_shreds: usize, is_last_in_slot: bo } // Returns offsets to fec_set_index when spliting shreds into erasure batches. -fn get_fec_set_offsets( - mut num_shreds: usize, - min_chunk_size: usize, -) -> impl Iterator { +fn get_fec_set_offsets(mut num_shreds: u32, min_chunk_size: u32) -> impl Iterator { let mut offset = 0; std::iter::from_fn(move || { if num_shreds == 0 { @@ -472,7 +470,7 @@ fn get_fec_set_offsets( } let num_chunks = (num_shreds / min_chunk_size).max(1); let chunk_size = (num_shreds + num_chunks - 1) / num_chunks; - let offsets = std::iter::repeat(offset).take(chunk_size); + let offsets = std::iter::repeat(offset).take(chunk_size as usize); num_shreds -= chunk_size; offset += chunk_size; Some(offsets) @@ -541,7 +539,9 @@ mod tests { let size = serialized_size(&entries).unwrap() as usize; // Integer division to ensure we have enough shreds to fit all the data let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap(); - let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size; + let num_expected_data_shreds: u32 = ((size + data_buffer_size - 1) / data_buffer_size) + .try_into() + .unwrap(); let num_expected_data_shreds = num_expected_data_shreds.max(if is_last_in_slot { DATA_SHREDS_PER_FEC_BLOCK } else { @@ -564,14 +564,14 @@ mod tests { &mut ProcessShredsStats::default(), ); let next_index = data_shreds.last().unwrap().index() + 1; - assert_eq!(next_index as usize, num_expected_data_shreds); + assert_eq!(next_index, num_expected_data_shreds); let mut data_shred_indexes = HashSet::new(); let mut coding_shred_indexes = HashSet::new(); for shred in data_shreds.iter() { assert_eq!(shred.shred_type(), ShredType::Data); let index = shred.index(); - let is_last = index as usize == num_expected_data_shreds - 1; + let is_last = index == num_expected_data_shreds - 1; verify_test_data_shred( shred, index, @@ -594,16 +594,19 @@ mod tests { coding_shred_indexes.insert(index); } - for i in start_index..start_index + num_expected_data_shreds as u32 { + for i in start_index..start_index + num_expected_data_shreds { assert!(data_shred_indexes.contains(&i)); } - for i in start_index..start_index + num_expected_coding_shreds as u32 { + for i in start_index..start_index + num_expected_coding_shreds { assert!(coding_shred_indexes.contains(&i)); } - assert_eq!(data_shred_indexes.len(), num_expected_data_shreds); - assert_eq!(coding_shred_indexes.len(), num_expected_coding_shreds); + assert_eq!(data_shred_indexes.len(), num_expected_data_shreds as usize); + assert_eq!( + coding_shred_indexes.len(), + num_expected_coding_shreds as usize + ); // Test reassembly let deshred_payload = Shredder::deshred(&data_shreds).unwrap(); @@ -815,10 +818,9 @@ mod tests { let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); let entry = Entry::new(&Hash::default(), 1, vec![tx0]); - let num_data_shreds: usize = 5; + let num_data_shreds: u32 = 5; let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap(); - let num_entries = - max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(data_buffer_size)); + let num_entries = max_entries_per_n_shred(&entry, num_data_shreds, Some(data_buffer_size)); let entries: Vec<_> = (0..num_entries) .map(|_| { let keypair0 = Keypair::new(); @@ -842,10 +844,10 @@ mod tests { &reed_solomon_cache, &mut ProcessShredsStats::default(), ); - let num_coding_shreds = coding_shreds.len(); + let num_coding_shreds: u32 = coding_shreds.len().try_into().unwrap(); // We should have 5 data shreds now - assert_eq!(data_shreds.len(), num_data_shreds); + assert_eq!(data_shreds.len(), num_data_shreds as usize); assert_eq!( num_coding_shreds, get_erasure_batch_size(num_data_shreds, is_last_in_slot) - num_data_shreds @@ -909,7 +911,7 @@ mod tests { ); shred_info.insert(3, recovered_shred); - let result = Shredder::deshred(&shred_info[..num_data_shreds]).unwrap(); + let result = Shredder::deshred(&shred_info[..num_data_shreds as usize]).unwrap(); assert!(result.len() >= serialized_entries.len()); assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); @@ -926,7 +928,7 @@ mod tests { assert_eq!(recovered_data.len(), 3); // Data shreds 0, 2, 4 were missing for (i, recovered_shred) in recovered_data.into_iter().enumerate() { let index = i * 2; - let is_last_data = recovered_shred.index() as usize == num_data_shreds - 1; + let is_last_data = recovered_shred.index() == num_data_shreds - 1; verify_test_data_shred( &recovered_shred, index.try_into().unwrap(), @@ -941,16 +943,16 @@ mod tests { shred_info.insert(i * 2, recovered_shred); } - let result = Shredder::deshred(&shred_info[..num_data_shreds]).unwrap(); + let result = Shredder::deshred(&shred_info[..num_data_shreds as usize]).unwrap(); assert!(result.len() >= serialized_entries.len()); assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); // Test4: Try reassembly with 2 missing data shreds, but keeping the last // data shred. Hint: should fail - let shreds: Vec = all_shreds[..num_data_shreds] + let shreds: Vec = all_shreds[..num_data_shreds as usize] .iter() - .enumerate() - .filter_map(|(i, s)| { + .zip(0..) + .filter_map(|(s, i)| { if (i < 4 && i % 2 != 0) || i == num_data_shreds - 1 { // Keep 1, 3, 4 Some(s.clone()) @@ -981,7 +983,7 @@ mod tests { &mut ProcessShredsStats::default(), ); // We should have 10 shreds now - assert_eq!(data_shreds.len(), num_data_shreds); + assert_eq!(data_shreds.len(), num_data_shreds as usize); let all_shreds = data_shreds .iter() @@ -1000,10 +1002,10 @@ mod tests { assert_eq!(recovered_data.len(), 3); // Data shreds 25, 27, 29 were missing for (i, recovered_shred) in recovered_data.into_iter().enumerate() { - let index = 25 + (i * 2); + let index: u32 = (25 + (i * 2)).try_into().unwrap(); verify_test_data_shred( &recovered_shred, - index.try_into().unwrap(), + index, slot, slot - 5, &keypair.pubkey(), @@ -1015,7 +1017,7 @@ mod tests { shred_info.insert(i * 2, recovered_shred); } - let result = Shredder::deshred(&shred_info[..num_data_shreds]).unwrap(); + let result = Shredder::deshred(&shred_info[..num_data_shreds as usize]).unwrap(); assert!(result.len() >= serialized_entries.len()); assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); @@ -1183,7 +1185,7 @@ mod tests { &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); - const MIN_CHUNK_SIZE: usize = DATA_SHREDS_PER_FEC_BLOCK; + const MIN_CHUNK_SIZE: usize = DATA_SHREDS_PER_FEC_BLOCK as usize; let chunks: Vec<_> = data_shreds .iter() .group_by(|shred| shred.fec_set_index()) @@ -1260,31 +1262,36 @@ mod tests { .into_iter() .map(|(_, chunk)| { let chunk: Vec<_> = chunk.collect(); - get_erasure_batch_size(chunk.len(), chunk.last().unwrap().last_in_slot()) + get_erasure_batch_size( + chunk.len().try_into().unwrap(), + chunk.last().unwrap().last_in_slot(), + ) }) - .sum(); + .sum::() + .try_into() + .unwrap(); assert_eq!(coding_shreds.len(), num_shreds - data_shreds.len()); } } #[test] fn test_get_fec_set_offsets() { - const MIN_CHUNK_SIZE: usize = 32usize; - for num_shreds in 0usize..MIN_CHUNK_SIZE { + const MIN_CHUNK_SIZE: u32 = 32; + for num_shreds in 0..MIN_CHUNK_SIZE { let offsets: Vec<_> = get_fec_set_offsets(num_shreds, MIN_CHUNK_SIZE).collect(); - assert_eq!(offsets, vec![0usize; num_shreds]); + assert_eq!(offsets, vec![0; num_shreds as usize]); } for num_shreds in MIN_CHUNK_SIZE..MIN_CHUNK_SIZE * 8 { let chunks: Vec<_> = get_fec_set_offsets(num_shreds, MIN_CHUNK_SIZE) .group_by(|offset| *offset) .into_iter() - .map(|(offset, chunk)| (offset, chunk.count())) + .map(|(offset, chunk)| (offset, chunk.count().try_into().unwrap())) .collect(); assert_eq!( chunks .iter() .map(|(_offset, chunk_size)| chunk_size) - .sum::(), + .sum::(), num_shreds ); assert!(chunks diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index b5d916892fea0c..d9c8857869ff9d 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -13,7 +13,6 @@ use { }, std::{ collections::{BTreeMap, HashSet}, - convert::TryInto, sync::Arc, }, test_case::test_case, @@ -33,11 +32,8 @@ fn test_multi_fec_block_coding(is_last_in_slot: bool) { let keypair1 = Keypair::new(); let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); let entry = Entry::new(&Hash::default(), 1, vec![tx0]); - let num_entries = max_entries_per_n_shred( - &entry, - num_data_shreds as u64, - Some(LEGACY_SHRED_DATA_CAPACITY), - ); + let num_entries = + max_entries_per_n_shred(&entry, num_data_shreds, Some(LEGACY_SHRED_DATA_CAPACITY)); let entries: Vec<_> = (0..num_entries) .map(|_| { @@ -63,9 +59,9 @@ fn test_multi_fec_block_coding(is_last_in_slot: bool) { &mut ProcessShredsStats::default(), ); let next_index = data_shreds.last().unwrap().index() + 1; - assert_eq!(next_index as usize, num_data_shreds); - assert_eq!(data_shreds.len(), num_data_shreds); - assert_eq!(coding_shreds.len(), num_data_shreds); + assert_eq!(next_index, num_data_shreds); + assert_eq!(data_shreds.len(), num_data_shreds as usize); + assert_eq!(coding_shreds.len(), num_data_shreds as usize); for c in &coding_shreds { assert!(!c.is_data()); @@ -75,10 +71,14 @@ fn test_multi_fec_block_coding(is_last_in_slot: bool) { for i in 0..num_fec_sets { let shred_start_index = DATA_SHREDS_PER_FEC_BLOCK * i; let end_index = shred_start_index + DATA_SHREDS_PER_FEC_BLOCK - 1; - let fec_set_shreds = data_shreds[shred_start_index..=end_index] + let fec_set_shreds = data_shreds[shred_start_index as usize..=end_index as usize] .iter() .cloned() - .chain(coding_shreds[shred_start_index..=end_index].iter().cloned()) + .chain( + coding_shreds[shred_start_index as usize..=end_index as usize] + .iter() + .cloned(), + ) .collect::>(); let mut shred_info: Vec = fec_set_shreds @@ -90,11 +90,11 @@ fn test_multi_fec_block_coding(is_last_in_slot: bool) { let recovered_data = Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap(); - for (i, recovered_shred) in recovered_data.into_iter().enumerate() { + for (recovered_shred, i) in recovered_data.into_iter().zip(0..) { let index = shred_start_index + (i * 2); verify_test_data_shred( &recovered_shred, - index.try_into().unwrap(), + index, slot, slot - 5, &keypair.pubkey(), @@ -103,10 +103,14 @@ fn test_multi_fec_block_coding(is_last_in_slot: bool) { index == end_index, ); - shred_info.insert(i * 2, recovered_shred); + shred_info.insert((i * 2) as usize, recovered_shred); } - all_shreds.extend(shred_info.into_iter().take(DATA_SHREDS_PER_FEC_BLOCK)); + all_shreds.extend( + shred_info + .into_iter() + .take(DATA_SHREDS_PER_FEC_BLOCK as usize), + ); } let result = Shredder::deshred(&all_shreds[..]).unwrap(); @@ -121,11 +125,14 @@ fn test_multi_fec_block_different_size_coding() { let (fec_data, fec_coding, num_shreds_per_iter) = setup_different_sized_fec_blocks(slot, parent_slot, keypair.clone()); - let total_num_data_shreds: usize = fec_data.values().map(|x| x.len()).sum(); + let total_num_data_shreds: u32 = fec_data + .values() + .map(|x| u32::try_from(x.len()).unwrap()) + .sum(); let reed_solomon_cache = ReedSolomonCache::default(); // Test recovery for (fec_data_shreds, fec_coding_shreds) in fec_data.values().zip(fec_coding.values()) { - let first_data_index = fec_data_shreds.first().unwrap().index() as usize; + let first_data_index = fec_data_shreds.first().unwrap().index(); let all_shreds: Vec = fec_data_shreds .iter() .step_by(2) @@ -137,11 +144,11 @@ fn test_multi_fec_block_different_size_coding() { // is part of the recovered set, and that the below `index` // calculation in the loop is correct assert!(fec_data_shreds.len() % 2 == 0); - for (i, recovered_shred) in recovered_data.into_iter().enumerate() { + for (recovered_shred, i) in recovered_data.into_iter().zip(0..) { let index = first_data_index + (i * 2) + 1; verify_test_data_shred( &recovered_shred, - index.try_into().unwrap(), + index, slot, parent_slot, &keypair.pubkey(), @@ -186,7 +193,7 @@ fn setup_different_sized_fec_blocks( slot: Slot, parent_slot: Slot, keypair: Arc, -) -> (IndexShredsMap, IndexShredsMap, usize) { +) -> (IndexShredsMap, IndexShredsMap, u32) { let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap(); let keypair0 = Keypair::new(); let keypair1 = Keypair::new(); @@ -200,7 +207,7 @@ fn setup_different_sized_fec_blocks( let num_shreds_per_iter = DATA_SHREDS_PER_FEC_BLOCK + 2; let num_entries = max_entries_per_n_shred( &entry, - num_shreds_per_iter as u64, + num_shreds_per_iter, Some(LEGACY_SHRED_DATA_CAPACITY), ); let entries: Vec<_> = (0..num_entries) @@ -221,7 +228,7 @@ fn setup_different_sized_fec_blocks( let mut data_slot_and_index = HashSet::new(); let mut coding_slot_and_index = HashSet::new(); - let total_num_data_shreds: usize = 2 * num_shreds_per_iter; + let total_num_data_shreds = 2 * num_shreds_per_iter; let reed_solomon_cache = ReedSolomonCache::default(); for i in 0..2 { let is_last = i == 1; @@ -237,17 +244,17 @@ fn setup_different_sized_fec_blocks( &mut ProcessShredsStats::default(), ); for shred in &data_shreds { - if (shred.index() as usize) == total_num_data_shreds - 1 { + if shred.index() == total_num_data_shreds - 1 { assert!(shred.data_complete()); assert!(shred.last_in_slot()); - } else if (shred.index() as usize) % num_shreds_per_iter == num_shreds_per_iter - 1 { + } else if shred.index() % num_shreds_per_iter == num_shreds_per_iter - 1 { assert!(shred.data_complete()); } else { assert!(!shred.data_complete()); assert!(!shred.last_in_slot()); } } - assert_eq!(data_shreds.len(), num_shreds_per_iter); + assert_eq!(data_shreds.len(), num_shreds_per_iter as usize); next_shred_index = data_shreds.last().unwrap().index() + 1; next_code_index = coding_shreds.last().unwrap().index() + 1; sort_data_coding_into_fec_sets( diff --git a/turbine/src/broadcast_stage.rs b/turbine/src/broadcast_stage.rs index cce43f4fa5aabd..d7d944ea19609e 100644 --- a/turbine/src/broadcast_stage.rs +++ b/turbine/src/broadcast_stage.rs @@ -69,7 +69,7 @@ pub enum Error { #[error("Duplicate slot broadcast: {0}")] DuplicateSlotBroadcast(Slot), #[error("Invalid Merkle root, slot: {slot}, index: {index}")] - InvalidMerkleRoot { slot: Slot, index: u64 }, + InvalidMerkleRoot { slot: Slot, index: u32 }, #[error(transparent)] Io(#[from] std::io::Error), #[error(transparent)] @@ -81,13 +81,15 @@ pub enum Error { #[error(transparent)] Serialize(#[from] std::boxed::Box), #[error("Shred not found, slot: {slot}, index: {index}")] - ShredNotFound { slot: Slot, index: u64 }, + ShredNotFound { slot: Slot, index: u32 }, #[error(transparent)] TransportError(#[from] solana_sdk::transport::TransportError), #[error("Unknown last index, slot: {0}")] UnknownLastIndex(Slot), #[error("Unknown slot meta, slot: {0}")] UnknownSlotMeta(Slot), + #[error("Slot last shred index is above u32::MAX, slot: {slot}, index: {index}")] + LastShredIndexOutOfRange { slot: Slot, index: u64 }, } type Result = std::result::Result; @@ -537,7 +539,7 @@ pub mod test { #[allow(clippy::type_complexity)] fn make_transmit_shreds( slot: Slot, - num: u64, + num: u32, ) -> ( Vec, Vec, @@ -578,21 +580,21 @@ pub mod test { fn check_all_shreds_received( transmit_receiver: &TransmitReceiver, - mut data_index: u64, - mut coding_index: u64, - num_expected_data_shreds: u64, - num_expected_coding_shreds: u64, + mut data_index: u32, + mut coding_index: u32, + num_expected_data_shreds: u32, + num_expected_coding_shreds: u32, ) { while let Ok((shreds, _)) = transmit_receiver.try_recv() { if shreds[0].is_data() { for data_shred in shreds.iter() { - assert_eq!(data_shred.index() as u64, data_index); + assert_eq!(data_shred.index(), data_index); data_index += 1; } } else { - assert_eq!(shreds[0].index() as u64, coding_index); + assert_eq!(shreds[0].index(), coding_index); for coding_shred in shreds.iter() { - assert_eq!(coding_shred.index() as u64, coding_index); + assert_eq!(coding_shred.index(), coding_index); coding_index += 1; } } @@ -614,8 +616,8 @@ pub mod test { let updated_slot = 0; let (all_data_shreds, all_coding_shreds, _, _all_coding_transmit_shreds) = make_transmit_shreds(updated_slot, 10); - let num_data_shreds = all_data_shreds.len(); - let num_coding_shreds = all_coding_shreds.len(); + let num_data_shreds = u32::try_from(all_data_shreds.len()).unwrap(); + let num_coding_shreds = u32::try_from(all_coding_shreds.len()).unwrap(); assert!(num_data_shreds >= 10); // Insert all the shreds @@ -637,13 +639,7 @@ pub mod test { ) .unwrap(); // Check all the data shreds were received only once - check_all_shreds_received( - &transmit_receiver, - 0, - 0, - num_data_shreds as u64, - num_coding_shreds as u64, - ); + check_all_shreds_received(&transmit_receiver, 0, 0, num_data_shreds, num_coding_shreds); } struct MockBroadcastStage { diff --git a/turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs b/turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs index b82ca324b61820..4002ef99a0c01f 100644 --- a/turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -54,7 +54,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { .unwrap(), Some(index) => { let shred = blockstore - .get_data_shred(bank.slot(), u64::from(index)) + .get_data_shred(bank.slot(), index) .unwrap() .unwrap(); shred::layout::get_merkle_root(&shred).unwrap() diff --git a/turbine/src/broadcast_stage/broadcast_utils.rs b/turbine/src/broadcast_stage/broadcast_utils.rs index 9c4b48bd5ca01d..a3a9881f195f2c 100644 --- a/turbine/src/broadcast_stage/broadcast_utils.rs +++ b/turbine/src/broadcast_stage/broadcast_utils.rs @@ -107,6 +107,12 @@ pub(super) fn get_chained_merkle_root_from_parent( .ok_or_else(|| Error::UnknownSlotMeta(parent))? .last_index .ok_or_else(|| Error::UnknownLastIndex(parent))?; + let index: u32 = index + .try_into() + .map_err(|_| Error::LastShredIndexOutOfRange { + slot: parent, + index, + })?; let shred = blockstore .get_data_shred(parent, index)? .ok_or(Error::ShredNotFound { diff --git a/turbine/src/broadcast_stage/standard_broadcast_run.rs b/turbine/src/broadcast_stage/standard_broadcast_run.rs index 0ddbe1020f5f98..3c7f1a4367ce42 100644 --- a/turbine/src/broadcast_stage/standard_broadcast_run.rs +++ b/turbine/src/broadcast_stage/standard_broadcast_run.rs @@ -280,8 +280,8 @@ impl StandardBroadcastRun { is_last_in_slot, cluster_type, &mut process_stats, - blockstore::MAX_DATA_SHREDS_PER_SLOT as u32, - shred_code::MAX_CODE_SHREDS_PER_SLOT as u32, + blockstore::MAX_DATA_SHREDS_PER_SLOT, + shred_code::MAX_CODE_SHREDS_PER_SLOT, ) .unwrap(); // Insert the first data shred synchronously so that blockstore stores @@ -539,7 +539,7 @@ mod test { #[allow(clippy::type_complexity)] fn setup( - num_shreds_per_slot: Slot, + num_shreds_per_slot: u32, ) -> ( Arc, GenesisConfig, @@ -650,10 +650,7 @@ mod test { &quic_endpoint_sender, ) .unwrap(); - assert_eq!( - standard_broadcast_run.next_shred_index as u64, - num_shreds_per_slot - ); + assert_eq!(standard_broadcast_run.next_shred_index, num_shreds_per_slot); assert_eq!(standard_broadcast_run.slot, 0); assert_eq!(standard_broadcast_run.parent, 0); // Make sure the slot is not complete @@ -722,7 +719,7 @@ mod test { // The shred index should have reset to 0, which makes it possible for the // index < the previous shred index for slot 0 - assert_eq!(standard_broadcast_run.next_shred_index as u64, num_shreds); + assert_eq!(standard_broadcast_run.next_shred_index, num_shreds); assert_eq!(standard_broadcast_run.slot, 2); assert_eq!(standard_broadcast_run.parent, 0);