diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index ef24ccd6f2d809..40aa8f23dadf37 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -2847,6 +2847,7 @@ impl ReplayStage { block_metadata_notifier: Option, replay_result_vec: &[ReplaySlotFromBlockstore], purge_repair_slot_counter: &mut PurgeRepairSlotCounter, + my_pubkey: &Pubkey, ) -> bool { // TODO: See if processing of blockstore replay results and bank completion can be made thread safe. let mut did_complete_bank = false; @@ -2930,6 +2931,52 @@ impl ReplayStage { } } + if bank.collector_id() != my_pubkey { + // If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK shreds in the last FEC set, + // mark it dead. No reason to perform this check on our leader block. + let is_last_fec_set_full = blockstore + .is_last_fec_set_full(bank.slot()) + .unwrap_or_else(|e| { + warn!( + "Unable to determine if last fec set is full for slot {} {}, + marking as dead: {e:?}", + bank.slot(), + bank.hash() + ); + false + }); + if !is_last_fec_set_full { + // Update metric regardless of feature flag + datapoint_warn!( + "incomplete_final_fec_set", + ("slot", bank_slot, i64), + ("hash", bank.hash().to_string(), String) + ); + if bank + .feature_set + .is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) + { + let root = bank_forks.read().unwrap().root(); + Self::mark_dead_slot( + blockstore, + bank, + root, + &BlockstoreProcessorError::IncompleteFinalFecSet, + rpc_subscriptions, + duplicate_slots_tracker, + duplicate_confirmed_slots, + epoch_slots_frozen_slots, + progress, + heaviest_subtree_fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + purge_repair_slot_counter, + ); + continue; + } + } + } + let r_replay_stats = replay_stats.read().unwrap(); let replay_progress = bank_progress.replay_progress.clone(); let r_replay_progress = replay_progress.read().unwrap(); @@ -3178,6 +3225,7 @@ impl ReplayStage { block_metadata_notifier, &replay_result_vec, purge_repair_slot_counter, + my_pubkey, ) } else { false diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 45e1e9893b0d2e..2e069192d75018 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -19,7 +19,7 @@ use { next_slots_iterator::NextSlotsIterator, shred::{ self, max_ticks_per_n_shreds, ErasureSetId, ProcessShredsStats, ReedSolomonCache, - Shred, ShredData, ShredId, ShredType, Shredder, + Shred, ShredData, ShredId, ShredType, Shredder, DATA_SHREDS_PER_FEC_BLOCK, }, slot_stats::{ShredSource, SlotsStats}, transaction_address_lookup_table_scanner::scan_transaction, @@ -28,6 +28,7 @@ use { bincode::{deserialize, serialize}, crossbeam_channel::{bounded, Receiver, Sender, TrySendError}, dashmap::DashSet, + itertools::Itertools, log::*, rand::Rng, rayon::{ @@ -88,6 +89,8 @@ use { trees::{Tree, TreeWalk}, }; pub mod blockstore_purge; +#[cfg(test)] +use static_assertions::const_assert_eq; pub use { crate::{ blockstore_db::BlockstoreError, @@ -3690,6 +3693,59 @@ impl Blockstore { entries.into_iter().flatten().collect() } + /// Returns true if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of a + /// slot have the same merkle root, indicating they are a part of the same + /// FEC set. + /// Will fail if: + /// - Slot meta is missing + /// - LAST_SHRED_IN_SLOT flag has not been received + /// - There are missing shreds in the last fec set + /// - The block contains legacy shreds + pub fn is_last_fec_set_full(&self, slot: Slot) -> Result { + // We need to check if the last FEC set index contains at least `DATA_SHREDS_PER_FEC_BLOCK` data shreds. + // We compare the merkle roots of the last `DATA_SHREDS_PER_FEC_BLOCK` shreds in this block. + // Since the merkle root contains the fec_set_index, if all of them match, we know that the last fec set has + // at least `DATA_SHREDS_PER_FEC_BLOCK` shreds. + let slot_meta = self.meta(slot)?.ok_or(BlockstoreError::SlotUnavailable)?; + let last_shred_index = slot_meta + .last_index + .ok_or(BlockstoreError::UnknownLastIndex(slot))?; + + const MINIMUM_INDEX: u64 = DATA_SHREDS_PER_FEC_BLOCK as u64 - 1; + #[cfg(test)] + const_assert_eq!(MINIMUM_INDEX, 31); + let Some(start_index) = last_shred_index.checked_sub(MINIMUM_INDEX) else { + warn!("Slot {slot} has only {} shreds, fewer than the {DATA_SHREDS_PER_FEC_BLOCK} required", last_shred_index + 1); + return Ok(false); + }; + let keys = (start_index..=last_shred_index) + .map(|index| (slot, index)) + .collect(); + + let last_merkle_roots: Vec = self + .data_shred_cf + .multi_get_bytes(keys) + .into_iter() + .enumerate() + .map(|(offset, shred_bytes)| { + let shred_bytes = shred_bytes.ok().flatten().ok_or_else(|| { + let shred_index = start_index + u64::try_from(offset).unwrap(); + warn!("Missing shred for {slot} index {shred_index}"); + BlockstoreError::MissingShred(slot, shred_index) + })?; + shred::layout::get_merkle_root(&shred_bytes).ok_or_else(|| { + let shred_index = start_index + u64::try_from(offset).unwrap(); + warn!("Found legacy shred for {slot}, index {shred_index}"); + BlockstoreError::LegacyShred(slot, shred_index) + }) + }) + .dedup_by(|res1, res2| res1.as_ref().ok() == res2.as_ref().ok()) + .collect::>>()?; + + // After the dedup there should be exactly one Hash left if the shreds were part of the same FEC set. + Ok(last_merkle_roots.len() == 1) + } + /// Returns a mapping from each elements of `slots` to a list of the /// element's children slots. pub fn get_slots_since(&self, slots: &[Slot]) -> Result>> { @@ -10254,6 +10310,24 @@ pub mod tests { num_entries: u64, fec_set_index: u32, chained_merkle_root: Option, + ) -> (Vec, Vec, Arc) { + setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( + slot, + parent_slot, + num_entries, + fec_set_index, + chained_merkle_root, + true, + ) + } + + fn setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( + slot: u64, + parent_slot: u64, + num_entries: u64, + fec_set_index: u32, + chained_merkle_root: Option, + is_last_in_slot: bool, ) -> (Vec, Vec, Arc) { let entries = make_slot_entries_with_transactions(num_entries); let leader_keypair = Arc::new(Keypair::new()); @@ -10261,7 +10335,7 @@ pub mod tests { let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &leader_keypair, &entries, - true, // is_last_in_slot + is_last_in_slot, chained_merkle_root, fec_set_index, // next_shred_index fec_set_index, // next_code_index @@ -11781,4 +11855,136 @@ pub mod tests { .insert_shred_return_duplicate(coding_shred, &leader_schedule) .is_empty()); } + + #[test] + fn test_is_last_fec_set_full() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let parent_slot = 0; + let slot = 1; + + let fec_set_index = 30; + let (data_shreds, _, _) = + setup_erasure_shreds_with_index(slot, parent_slot, 10, fec_set_index); + let total_shreds = fec_set_index as u64 + data_shreds.len() as u64; + + // FEC set should be padded + assert_eq!(data_shreds.len(), DATA_SHREDS_PER_FEC_BLOCK); + + // Missing slot meta + assert_matches!( + blockstore.is_last_fec_set_full(0), + Err(BlockstoreError::SlotUnavailable) + ); + + // Incomplete slot + blockstore + .insert_shreds( + data_shreds[0..DATA_SHREDS_PER_FEC_BLOCK - 1].to_vec(), + None, + false, + ) + .unwrap(); + let meta = blockstore.meta(slot).unwrap().unwrap(); + assert!(meta.last_index.is_none()); + assert_matches!( + blockstore.is_last_fec_set_full(slot), + Err(BlockstoreError::UnknownLastIndex(_)) + ); + blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); + + // Missing shreds + blockstore + .insert_shreds(data_shreds[1..].to_vec(), None, false) + .unwrap(); + let meta = blockstore.meta(slot).unwrap().unwrap(); + assert_eq!(meta.last_index, Some(total_shreds - 1)); + assert_matches!( + blockstore.is_last_fec_set_full(slot), + Err(BlockstoreError::MissingShred(_, _)) + ); + blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); + + // Full slot + blockstore.insert_shreds(data_shreds, None, false).unwrap(); + assert!(blockstore.is_last_fec_set_full(slot).unwrap()); + blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); + + // Slot has less than DATA_SHREDS_PER_FEC_BLOCK shreds in total + let mut fec_set_index = 0; + let (first_data_shreds, _, _) = + setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( + slot, + parent_slot, + 10, + fec_set_index, + None, + false, + ); + let merkle_root = first_data_shreds[0].merkle_root().unwrap(); + fec_set_index += first_data_shreds.len() as u32; + let (last_data_shreds, _, _) = + setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( + slot, + parent_slot, + 40, + fec_set_index, + Some(merkle_root), + false, // No padding + ); + let last_index = last_data_shreds.last().unwrap().index(); + let total_shreds = first_data_shreds.len() + last_data_shreds.len(); + assert!(total_shreds < DATA_SHREDS_PER_FEC_BLOCK); + blockstore + .insert_shreds(first_data_shreds, None, false) + .unwrap(); + blockstore + .insert_shreds(last_data_shreds, None, false) + .unwrap(); + // Manually update last index flag + let mut slot_meta = blockstore.meta(slot).unwrap().unwrap(); + slot_meta.last_index = Some(last_index as u64); + blockstore.put_meta(slot, &slot_meta).unwrap(); + assert!(!blockstore.is_last_fec_set_full(slot).unwrap()); + blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); + + // Slot has more than DATA_SHREDS_PER_FEC_BLOCK in total, but last FEC set has less + let mut fec_set_index = 0; + let (first_data_shreds, _, _) = + setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( + slot, + parent_slot, + 100, + fec_set_index, + None, + false, + ); + let merkle_root = first_data_shreds[0].merkle_root().unwrap(); + fec_set_index += first_data_shreds.len() as u32; + let (last_data_shreds, _, _) = + setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( + slot, + parent_slot, + 100, + fec_set_index, + Some(merkle_root), + false, // No padding + ); + let last_index = last_data_shreds.last().unwrap().index(); + let total_shreds = first_data_shreds.len() + last_data_shreds.len(); + assert!(last_data_shreds.len() < DATA_SHREDS_PER_FEC_BLOCK); + assert!(total_shreds > DATA_SHREDS_PER_FEC_BLOCK); + blockstore + .insert_shreds(first_data_shreds, None, false) + .unwrap(); + blockstore + .insert_shreds(last_data_shreds, None, false) + .unwrap(); + // Manually update last index flag + let mut slot_meta = blockstore.meta(slot).unwrap().unwrap(); + slot_meta.last_index = Some(last_index as u64); + blockstore.put_meta(slot, &slot_meta).unwrap(); + assert!(!blockstore.is_last_fec_set_full(slot).unwrap()); + } } diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index ab7517453584a2..3a6beb9c26e919 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -153,6 +153,12 @@ pub enum BlockstoreError { TransactionIndexOverflow, #[error("invalid erasure config")] InvalidErasureConfig, + #[error("last shred index missing slot {0}")] + UnknownLastIndex(Slot), + #[error("missing shred slot {0}, index {1}")] + MissingShred(Slot, u64), + #[error("legacy shred slot {0}, index {1}")] + LegacyShred(Slot, u64), } pub type Result = std::result::Result; diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 4fa5fa6f3aa808..36c40b612f1f45 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -672,6 +672,9 @@ pub enum BlockstoreProcessorError { #[error("root bank with mismatched capitalization at {0}")] RootBankWithMismatchedCapitalization(Slot), + + #[error("incomplete final fec set")] + IncompleteFinalFecSet, } /// Callback for accessing bank state while processing the blockstore diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 32c9973c2eb946..824657871aa9f5 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -789,6 +789,10 @@ pub mod deprecate_unused_legacy_vote_plumbing { solana_sdk::declare_id!("6Uf8S75PVh91MYgPQSHnjRAPQq6an5BDv9vomrCwDqLe"); } +pub mod vote_only_full_fec_sets { + solana_sdk::declare_id!("ffecLRhhakKSGhMuc6Fz2Lnfq4uT9q3iu9ZsNaPLxPc"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -981,6 +985,7 @@ lazy_static! { (enable_chained_merkle_shreds::id(), "Enable chained Merkle shreds #34916"), (deprecate_unused_legacy_vote_plumbing::id(), "Deprecate unused legacy vote tx plumbing"), (chained_merkle_conflict_duplicate_proofs::id(), "generate duplicate proofs for chained merkle root conflicts"), + (vote_only_full_fec_sets::id(), "vote only full fec sets"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()