diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index d1a29ddd3e6d74..d54e855ef345b1 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -123,8 +123,14 @@ fn bench_shredder_coding(bencher: &mut Bencher) { let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; let data_shreds = make_shreds(symbol_count); bencher.iter(|| { - Shredder::generate_coding_shreds(0, RECOMMENDED_FEC_RATE, &data_shreds[..symbol_count], 0) - .len(); + Shredder::generate_coding_shreds( + 0, + RECOMMENDED_FEC_RATE, + &data_shreds[..symbol_count], + 0, + symbol_count, + ) + .len(); }) } @@ -132,8 +138,13 @@ fn bench_shredder_coding(bencher: &mut Bencher) { fn bench_shredder_decoding(bencher: &mut Bencher) { let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; let data_shreds = make_shreds(symbol_count); - let coding_shreds = - Shredder::generate_coding_shreds(0, RECOMMENDED_FEC_RATE, &data_shreds[..symbol_count], 0); + let coding_shreds = Shredder::generate_coding_shreds( + 0, + RECOMMENDED_FEC_RATE, + &data_shreds[..symbol_count], + 0, + symbol_count, + ); bencher.iter(|| { Shredder::try_recovery( coding_shreds[..].to_vec(), diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index b80356d777f6f2..cf9f07137fd6f2 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -973,29 +973,43 @@ impl ReplayStage { // errors related to the slot being purged let slot = bank.slot(); warn!("Fatal replay error in slot: {}, err: {:?}", slot, err); - if let BlockstoreProcessorError::InvalidBlock(BlockError::InvalidTickCount) = err { - datapoint_info!( - "replay-stage-mark_dead_slot", - ("error", format!("error: {:?}", err), String), - ("slot", slot, i64) - ); - } else { - datapoint_error!( - "replay-stage-mark_dead_slot", - ("error", format!("error: {:?}", err), String), - ("slot", slot, i64) - ); - } - bank_progress.is_dead = true; - blockstore - .set_dead_slot(slot) - .expect("Failed to mark slot as dead in blockstore"); + let is_serious = matches!( + err, + BlockstoreProcessorError::InvalidBlock(BlockError::InvalidTickCount) + ); + Self::mark_dead_slot(blockstore, bank_progress, slot, &err, is_serious); err })?; Ok(tx_count) } + fn mark_dead_slot( + blockstore: &Blockstore, + bank_progress: &mut ForkProgress, + slot: Slot, + err: &BlockstoreProcessorError, + is_serious: bool, + ) { + if is_serious { + datapoint_error!( + "replay-stage-mark_dead_slot", + ("error", format!("error: {:?}", err), String), + ("slot", slot, i64) + ); + } else { + datapoint_info!( + "replay-stage-mark_dead_slot", + ("error", format!("error: {:?}", err), String), + ("slot", slot, i64) + ); + } + bank_progress.is_dead = true; + blockstore + .set_dead_slot(slot) + .expect("Failed to mark slot as dead in blockstore"); + } + #[allow(clippy::too_many_arguments)] fn handle_votable_bank( bank: &Arc, @@ -1307,23 +1321,40 @@ impl ReplayStage { } assert_eq!(*bank_slot, bank.slot()); if bank.is_complete() { - bank_progress.replay_stats.report_stats( - bank.slot(), - bank_progress.replay_progress.num_entries, - bank_progress.replay_progress.num_shreds, - ); - did_complete_bank = true; - info!("bank frozen: {}", bank.slot()); - bank.freeze(); - heaviest_subtree_fork_choice - .add_new_leaf_slot(bank.slot(), Some(bank.parent_slot())); - if let Some(sender) = bank_notification_sender { - sender - .send(BankNotification::Frozen(bank.clone())) - .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); - } + if !blockstore.has_duplicate_shreds_in_slot(bank.slot()) { + bank_progress.replay_stats.report_stats( + bank.slot(), + bank_progress.replay_progress.num_entries, + bank_progress.replay_progress.num_shreds, + ); + did_complete_bank = true; + info!("bank frozen: {}", bank.slot()); + bank.freeze(); + heaviest_subtree_fork_choice + .add_new_leaf_slot(bank.slot(), Some(bank.parent_slot())); + if let Some(sender) = bank_notification_sender { + sender + .send(BankNotification::Frozen(bank.clone())) + .unwrap_or_else(|err| { + warn!("bank_notification_sender failed: {:?}", err) + }); + } - Self::record_rewards(&bank, &rewards_recorder_sender); + Self::record_rewards(&bank, &rewards_recorder_sender); + } else { + Self::mark_dead_slot( + blockstore, + bank_progress, + bank.slot(), + &BlockstoreProcessorError::InvalidBlock(BlockError::DuplicateBlock), + true, + ); + warn!( + "{} duplicate shreds detected, not freezing bank {}", + my_pubkey, + bank.slot() + ); + } } else { trace!( "bank {} not completed tick_height: {}, max_tick_height: {}", diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index f4cc2cb173578e..781e0c90950c94 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -305,7 +305,7 @@ mod tests { assert!(!packet.meta.discard); let coding = - solana_ledger::shred::Shredder::generate_coding_shreds(slot, 1.0f32, &[shred], 10); + solana_ledger::shred::Shredder::generate_coding_shreds(slot, 1.0f32, &[shred], 10, 1); coding[0].copy_to_packet(&mut packet); ShredFetchStage::process_packet( &mut packet, diff --git a/ledger/src/block_error.rs b/ledger/src/block_error.rs index 89505ab3fa34f4..523d0ac8370548 100644 --- a/ledger/src/block_error.rs +++ b/ledger/src/block_error.rs @@ -26,4 +26,7 @@ pub enum BlockError { /// that each block has the same number of hashes #[error("trailing entry")] TrailingEntry, + + #[error("duplicate block")] + DuplicateBlock, } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 65d15c2f730e2e..f46219c2695ec9 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -809,6 +809,7 @@ impl Blockstore { &mut index_working_set, &mut just_inserted_coding_shreds, &mut index_meta_time, + handle_duplicate, is_trusted, ); } else { @@ -1012,15 +1013,24 @@ impl Blockstore { .is_ok() } - fn check_cache_coding_shred( + fn erasure_mismatch(shred1: &Shred, shred2: &Shred) -> bool { + shred1.coding_header.num_coding_shreds != shred2.coding_header.num_coding_shreds + || shred1.coding_header.num_data_shreds != shred2.coding_header.num_data_shreds + } + + fn check_cache_coding_shred( &self, shred: Shred, erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, index_working_set: &mut HashMap, just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>, index_meta_time: &mut u64, + handle_duplicate: &F, is_trusted: bool, - ) -> bool { + ) -> bool + where + F: Fn(Shred), + { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -1028,52 +1038,109 @@ impl Blockstore { get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time); let index_meta = &mut index_meta_working_set_entry.index; + // This gives the index of first coding shred in this FEC block // So, all coding shreds in a given FEC block will have the same set index - if is_trusted - || Blockstore::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) - { - let set_index = u64::from(shred.common_header.fec_set_index); - let erasure_config = ErasureConfig::new( - shred.coding_header.num_data_shreds as usize, - shred.coding_header.num_coding_shreds as usize, - ); - let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| { - let first_coding_index = - u64::from(shred.index()) - u64::from(shred.coding_header.position); - self.erasure_meta_cf - .get((slot, set_index)) - .expect("Expect database get to succeed") - .unwrap_or_else(|| { - ErasureMeta::new(set_index, first_coding_index, &erasure_config) - }) - }); + if !is_trusted { + if index_meta.coding().is_present(shred_index) { + handle_duplicate(shred); + return false; + } - if erasure_config != erasure_meta.config { - // ToDo: This is a potential slashing condition - warn!("Received multiple erasure configs for the same erasure set!!!"); - warn!( - "Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}", - slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config, erasure_config - ); + if !Blockstore::should_insert_coding_shred(&shred, &self.last_root) { + return false; } + } - // Should be safe to modify index_meta here. Two cases - // 1) Recovery happens: Then all inserted erasure metas are removed - // from just_received_coding_shreds, and nothing will be committed by - // `check_insert_coding_shred`, so the coding index meta will not be - // committed - index_meta.coding_mut().set_present(shred_index, true); + let set_index = u64::from(shred.common_header.fec_set_index); + let erasure_config = ErasureConfig::new( + shred.coding_header.num_data_shreds as usize, + shred.coding_header.num_coding_shreds as usize, + ); - just_received_coding_shreds - .entry((slot, shred_index)) - .or_insert_with(|| shred); + let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| { + let first_coding_index = + u64::from(shred.index()) - u64::from(shred.coding_header.position); + self.erasure_meta_cf + .get((slot, set_index)) + .expect("Expect database get to succeed") + .unwrap_or_else(|| ErasureMeta::new(set_index, first_coding_index, &erasure_config)) + }); - true - } else { - false + if erasure_config != erasure_meta.config { + let conflicting_shred = self.find_conflicting_coding_shred( + &shred, + slot, + erasure_meta, + just_received_coding_shreds, + ); + if let Some(conflicting_shred) = conflicting_shred { + if self + .store_duplicate_if_not_existing(slot, conflicting_shred, shred.payload.clone()) + .is_err() + { + warn!("bad duplicate store.."); + } + } else { + datapoint_info!("bad-conflict-shred", ("slot", slot, i64)); + } + + // ToDo: This is a potential slashing condition + warn!("Received multiple erasure configs for the same erasure set!!!"); + warn!( + "Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}", + slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config, erasure_config + ); + + return false; } + + // Should be safe to modify index_meta here. Two cases + // 1) Recovery happens: Then all inserted erasure metas are removed + // from just_received_coding_shreds, and nothing will be committed by + // `check_insert_coding_shred`, so the coding index meta will not be + // committed + index_meta.coding_mut().set_present(shred_index, true); + + just_received_coding_shreds + .entry((slot, shred_index)) + .or_insert_with(|| shred); + + true + } + + fn find_conflicting_coding_shred( + &self, + shred: &Shred, + slot: Slot, + erasure_meta: &ErasureMeta, + just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>, + ) -> Option> { + // Search for the shred which set the initial erasure config, either inserted, + // or in the current batch in just_received_coding_shreds. + let coding_start = erasure_meta.first_coding_index; + let coding_end = coding_start + erasure_meta.config.num_coding() as u64; + let mut conflicting_shred = None; + for coding_index in coding_start..coding_end { + let maybe_shred = self.get_coding_shred(slot, coding_index); + if let Ok(Some(shred_data)) = maybe_shred { + let potential_shred = Shred::new_from_serialized_shred(shred_data).unwrap(); + if Self::erasure_mismatch(&potential_shred, &shred) { + conflicting_shred = Some(potential_shred.payload); + } + break; + } else if let Some(potential_shred) = + just_received_coding_shreds.get(&(slot, coding_index)) + { + if Self::erasure_mismatch(&potential_shred, &shred) { + conflicting_shred = Some(potential_shred.payload.clone()); + } + break; + } + } + + conflicting_shred } #[allow(clippy::too_many_arguments)] @@ -1110,7 +1177,7 @@ impl Blockstore { if Self::is_data_shred_present(&shred, slot_meta, &index_meta.data()) { handle_duplicate(shred); return Err(InsertDataShredError::Exists); - } else if !Blockstore::should_insert_data_shred( + } else if !self.should_insert_data_shred( &shred, slot_meta, &self.last_root, @@ -1139,11 +1206,7 @@ impl Blockstore { Ok(newly_completed_data_sets) } - fn should_insert_coding_shred( - shred: &Shred, - coding_index: &ShredIndex, - last_root: &RwLock, - ) -> bool { + fn should_insert_coding_shred(shred: &Shred, last_root: &RwLock) -> bool { let slot = shred.slot(); let shred_index = shred.index(); @@ -1152,11 +1215,13 @@ impl Blockstore { } let set_index = shred.common_header.fec_set_index; + !(shred.coding_header.num_coding_shreds == 0 || shred.coding_header.position >= shred.coding_header.num_coding_shreds || std::u32::MAX - set_index < u32::from(shred.coding_header.num_coding_shreds) - 1 - || coding_index.is_present(u64::from(shred_index)) - || slot <= *last_root.read().unwrap()) + || slot <= *last_root.read().unwrap() + || shred.coding_header.num_coding_shreds as u32 + > (8 * crate::shred::MAX_DATA_SHREDS_PER_FEC_BLOCK)) } fn insert_coding_shred( @@ -1187,6 +1252,7 @@ impl Blockstore { } fn should_insert_data_shred( + &self, shred: &Shred, slot_meta: &SlotMeta, last_root: &RwLock, @@ -1209,6 +1275,15 @@ impl Blockstore { let leader_pubkey = leader_schedule .map(|leader_schedule| leader_schedule.slot_leader_at(slot, None)) .unwrap_or(None); + + let ending_shred = self.get_data_shred(slot, last_index).unwrap().unwrap(); + if self + .store_duplicate_if_not_existing(slot, ending_shred, shred.payload.clone()) + .is_err() + { + warn!("store duplicate error"); + } + datapoint_error!( "blockstore_error", ( @@ -1228,6 +1303,18 @@ impl Blockstore { let leader_pubkey = leader_schedule .map(|leader_schedule| leader_schedule.slot_leader_at(slot, None)) .unwrap_or(None); + + let ending_shred = self + .get_data_shred(slot, slot_meta.received - 1) + .unwrap() + .unwrap(); + if self + .store_duplicate_if_not_existing(slot, ending_shred, shred.payload.clone()) + .is_err() + { + warn!("store duplicate error"); + } + datapoint_error!( "blockstore_error", ( @@ -2599,6 +2686,19 @@ impl Blockstore { self.dead_slots_cf.put(slot, &true) } + pub fn store_duplicate_if_not_existing( + &self, + slot: Slot, + shred1: Vec, + shred2: Vec, + ) -> Result<()> { + if !self.has_duplicate_shreds_in_slot(slot) { + self.store_duplicate_slot(slot, shred1, shred2) + } else { + Ok(()) + } + } + pub fn store_duplicate_slot(&self, slot: Slot, shred1: Vec, shred2: Vec) -> Result<()> { let duplicate_slot_proof = DuplicateSlotProof::new(shred1, shred2); self.duplicate_slots_cf.put(slot, &duplicate_slot_proof) @@ -5067,6 +5167,7 @@ pub mod tests { #[test] pub fn test_should_insert_data_shred() { + solana_logger::setup(); let (mut shreds, _) = make_slot_entries(0, 0, 200); let blockstore_path = get_tmp_ledger_path!(); { @@ -5094,9 +5195,10 @@ pub mod tests { } }; assert_eq!( - Blockstore::should_insert_data_shred(&shred7, &slot_meta, &last_root, None, false), + blockstore.should_insert_data_shred(&shred7, &slot_meta, &last_root, None, false), false ); + assert!(blockstore.has_duplicate_shreds_in_slot(0)); // Insert all pending shreds let mut shred8 = shreds[8].clone(); @@ -5110,7 +5212,7 @@ pub mod tests { panic!("Shred in unexpected format") } assert_eq!( - Blockstore::should_insert_data_shred(&shred7, &slot_meta, &last_root, None, false), + blockstore.should_insert_data_shred(&shred7, &slot_meta, &last_root, None, false), false ); } @@ -5154,12 +5256,56 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + #[test] + pub fn test_check_cache_coding_shred() { + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + + let slot = 1; + let (shred, coding) = Shredder::new_coding_shred_header(slot, 11, 11, 11, 11, 10, 0); + let coding_shred = + Shred::new_empty_from_header(shred, DataShredHeader::default(), coding); + + let mut erasure_metas = HashMap::new(); + let mut index_working_set = HashMap::new(); + let mut just_received_coding_shreds = HashMap::new(); + let mut index_meta_time = 0; + assert!(blockstore.check_cache_coding_shred( + coding_shred.clone(), + &mut erasure_metas, + &mut index_working_set, + &mut just_received_coding_shreds, + &mut index_meta_time, + &|_shred| { + panic!("no dupes"); + }, + false, + )); + + // insert again fails on dupe + use std::sync::atomic::{AtomicUsize, Ordering}; + let counter = AtomicUsize::new(0); + assert!(!blockstore.check_cache_coding_shred( + coding_shred, + &mut erasure_metas, + &mut index_working_set, + &mut just_received_coding_shreds, + &mut index_meta_time, + &|_shred| { + counter.fetch_add(1, Ordering::Relaxed); + }, + false, + )); + assert_eq!(counter.load(Ordering::Relaxed), 1); + } + } + #[test] pub fn test_should_insert_coding_shred() { let blockstore_path = get_tmp_ledger_path!(); { let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let index_cf = blockstore.db.column::(); let last_root = RwLock::new(0); let slot = 1; @@ -5174,7 +5320,6 @@ pub mod tests { // Insert a good coding shred assert!(Blockstore::should_insert_coding_shred( &coding_shred, - Index::new(slot).coding(), &last_root )); @@ -5183,12 +5328,11 @@ pub mod tests { .insert_shreds(vec![coding_shred.clone()], None, false) .unwrap(); - // Trying to insert the same shred again should fail + // Trying to insert the same shred again should pass since this doesn't check for + // duplicate index { - let index = index_cf.get(shred.slot).unwrap().unwrap(); - assert!(!Blockstore::should_insert_coding_shred( + assert!(Blockstore::should_insert_coding_shred( &coding_shred, - index.coding(), &last_root )); } @@ -5202,10 +5346,8 @@ pub mod tests { DataShredHeader::default(), coding.clone(), ); - let index = index_cf.get(shred.slot).unwrap().unwrap(); assert!(Blockstore::should_insert_coding_shred( &coding_shred, - index.coding(), &last_root )); } @@ -5220,10 +5362,8 @@ pub mod tests { let index = coding_shred.coding_header.position - 1; coding_shred.set_index(index as u32); - let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blockstore::should_insert_coding_shred( &coding_shred, - index.coding(), &last_root )); } @@ -5236,10 +5376,8 @@ pub mod tests { coding.clone(), ); coding_shred.coding_header.num_coding_shreds = 0; - let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blockstore::should_insert_coding_shred( &coding_shred, - index.coding(), &last_root )); } @@ -5252,10 +5390,8 @@ pub mod tests { coding.clone(), ); coding_shred.coding_header.num_coding_shreds = coding_shred.coding_header.position; - let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blockstore::should_insert_coding_shred( &coding_shred, - index.coding(), &last_root )); } @@ -5272,10 +5408,14 @@ pub mod tests { coding_shred.coding_header.num_coding_shreds = 3; coding_shred.common_header.index = std::u32::MAX - 1; coding_shred.coding_header.position = 0; - let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blockstore::should_insert_coding_shred( &coding_shred, - index.coding(), + &last_root + )); + + coding_shred.coding_header.num_coding_shreds = 2000; + assert!(!Blockstore::should_insert_coding_shred( + &coding_shred, &last_root )); @@ -5283,7 +5423,6 @@ pub mod tests { coding_shred.coding_header.num_coding_shreds = 2; assert!(Blockstore::should_insert_coding_shred( &coding_shred, - index.coding(), &last_root )); @@ -5297,11 +5436,9 @@ pub mod tests { { let mut coding_shred = Shred::new_empty_from_header(shred, DataShredHeader::default(), coding); - let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); coding_shred.set_slot(*last_root.read().unwrap()); assert!(!Blockstore::should_insert_coding_shred( &coding_shred, - index.coding(), &last_root )); } @@ -5312,6 +5449,7 @@ pub mod tests { #[test] pub fn test_insert_multiple_is_last() { + solana_logger::setup(); let (shreds, _) = make_slot_entries(0, 0, 20); let num_shreds = shreds.len() as u64; let blockstore_path = get_tmp_ledger_path!(); @@ -5334,6 +5472,8 @@ pub mod tests { assert_eq!(slot_meta.last_index, num_shreds - 1); assert!(slot_meta.is_full()); + assert!(blockstore.has_duplicate_shreds_in_slot(0)); + drop(blockstore); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } @@ -7327,4 +7467,75 @@ pub mod tests { assert!(stored_shred.last_in_slot()); assert_eq!(entries, ledger.get_any_valid_slot_entries(0, 0)); } + + fn make_large_tx_entry(num_txs: usize) -> Entry { + let txs: Vec<_> = (0..num_txs) + .into_iter() + .map(|_| { + let keypair0 = Keypair::new(); + let to = solana_sdk::pubkey::new_rand(); + solana_sdk::system_transaction::transfer(&keypair0, &to, 1, Hash::default()) + }) + .collect(); + + Entry::new(&Hash::default(), 1, txs) + } + + #[test] + fn erasure_multiple_config() { + solana_logger::setup(); + let slot = 1; + let parent = 0; + let num_txs = 20; + let entry = make_large_tx_entry(num_txs); + let shreds = entries_to_test_shreds(vec![entry], slot, parent, true, 0); + assert!(shreds.len() > 1); + + let ledger_path = get_tmp_ledger_path!(); + let ledger = Blockstore::open(&ledger_path).unwrap(); + + let coding1 = Shredder::generate_coding_shreds(slot, 0.5f32, &shreds, 0x42, usize::MAX); + let coding2 = Shredder::generate_coding_shreds(slot, 1.0f32, &shreds, 0x42, usize::MAX); + for shred in &shreds { + info!("shred {:?}", shred); + } + for shred in &coding1 { + info!("coding1 {:?}", shred); + } + for shred in &coding2 { + info!("coding2 {:?}", shred); + } + ledger + .insert_shreds(shreds[..shreds.len() - 2].to_vec(), None, false) + .unwrap(); + ledger + .insert_shreds(vec![coding1[0].clone(), coding2[1].clone()], None, false) + .unwrap(); + assert!(ledger.has_duplicate_shreds_in_slot(slot)); + } + + #[test] + fn test_large_num_coding() { + solana_logger::setup(); + let slot = 1; + let (_data_shreds, mut coding_shreds, leader_schedule_cache) = + setup_erasure_shreds(slot, 0, 100, 1.0); + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + coding_shreds[1].coding_header.num_coding_shreds = u16::MAX; + blockstore + .insert_shreds( + vec![coding_shreds[1].clone()], + Some(&leader_schedule_cache), + false, + ) + .unwrap(); + + // Check no coding shreds are inserted + let res = blockstore.get_coding_shreds_for_slot(slot, 0).unwrap(); + assert!(res.is_empty()); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 547afe16e7cf7a..0e756b3b27589c 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -581,6 +581,7 @@ impl Shredder { self.fec_rate, shred_data_batch, self.version, + shred_data_batch.len(), ) }) .collect() @@ -648,12 +649,14 @@ impl Shredder { fec_rate: f32, data_shred_batch: &[Shred], version: u16, + max_coding_shreds: usize, ) -> Vec { assert!(!data_shred_batch.is_empty()); if fec_rate != 0.0 { let num_data = data_shred_batch.len(); // always generate at least 1 coding shred even if the fec_rate doesn't allow it - let num_coding = Self::calculate_num_coding_shreds(num_data, fec_rate); + let num_coding = + Self::calculate_num_coding_shreds(num_data, fec_rate, max_coding_shreds); let session = Session::new(num_data, num_coding).expect("Failed to create erasure session"); let start_index = data_shred_batch[0].common_header.index; @@ -721,11 +724,15 @@ impl Shredder { } } - fn calculate_num_coding_shreds(num_data_shreds: usize, fec_rate: f32) -> usize { + fn calculate_num_coding_shreds( + num_data_shreds: usize, + fec_rate: f32, + max_coding_shreds: usize, + ) -> usize { if num_data_shreds == 0 { 0 } else { - num_data_shreds.min(1.max((fec_rate * num_data_shreds as f32) as usize)) + max_coding_shreds.min(1.max((fec_rate * num_data_shreds as f32) as usize)) } } @@ -1068,8 +1075,11 @@ pub mod tests { let size = serialized_size(&entries).unwrap(); let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD as u64; let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size; - let num_expected_coding_shreds = - Shredder::calculate_num_coding_shreds(num_expected_data_shreds as usize, fec_rate); + let num_expected_coding_shreds = Shredder::calculate_num_coding_shreds( + num_expected_data_shreds as usize, + fec_rate, + num_expected_data_shreds as usize, + ); let start_index = 0; let (data_shreds, coding_shreds, next_index) =