From 9b3728d621b5e9b1ebdc050efe2df8e072d4c071 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 11 Jul 2024 21:31:48 +0000 Subject: [PATCH] replay: extend last fec set check for 32+ retransmitter signed shreds --- core/src/replay_stage.rs | 74 +++++++++--------- ledger/src/blockstore.rs | 116 +++++++++++++++++++++++------ ledger/src/blockstore_db.rs | 2 + ledger/src/blockstore_processor.rs | 3 + ledger/src/shred.rs | 16 ++++ 5 files changed, 154 insertions(+), 57 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 627e0175c89e71..29c9a9925e9e1a 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3071,48 +3071,50 @@ impl ReplayStage { } if bank.collector_id() != my_pubkey { - // If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK shreds in the last FEC set, - // mark it dead. No reason to perform this check on our leader block. - if !blockstore - .is_last_fec_set_full(bank.slot()) - .inspect_err(|e| { + // If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK correctly retransmitted + // shreds in the last FEC set, mark it dead. No reason to perform this check on our leader block. + let check_result = match blockstore.check_last_fec_set(bank.slot()) { + Ok(last_fec_set_check_results) => { + // Update metrics regardless of feature flag + last_fec_set_check_results.report_metrics(bank_slot, bank.hash()); + // Get a final result based on the feature flags + last_fec_set_check_results.get_result(&bank.feature_set) + } + Err(e) => { warn!( - "Unable to determine if last fec set is full for slot {} {}, + "Unable to check the last fec set for slot {} {}, marking as dead: {e:?}", bank.slot(), bank.hash() - ) - }) - .unwrap_or(false) - { - // Update metric regardless of feature flag - datapoint_warn!( - "incomplete_final_fec_set", - ("slot", bank_slot, i64), - ("hash", bank.hash().to_string(), String) - ); - if bank - .feature_set - .is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) - { - let root = bank_forks.read().unwrap().root(); - Self::mark_dead_slot( - blockstore, - bank, - root, - &BlockstoreProcessorError::IncompleteFinalFecSet, - rpc_subscriptions, - duplicate_slots_tracker, - duplicate_confirmed_slots, - epoch_slots_frozen_slots, - progress, - heaviest_subtree_fork_choice, - duplicate_slots_to_repair, - ancestor_hashes_replay_update_sender, - purge_repair_slot_counter, ); - continue; + if bank.feature_set + .is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) + { + Err(BlockstoreProcessorError::IncompleteFinalFecSet) + } else { + Ok(()) + } } + }; + + if let Err(result_err) = check_result { + let root = bank_forks.read().unwrap().root(); + Self::mark_dead_slot( + blockstore, + bank, + root, + &result_err, + rpc_subscriptions, + duplicate_slots_tracker, + duplicate_confirmed_slots, + epoch_slots_frozen_slots, + progress, + heaviest_subtree_fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + purge_repair_slot_counter, + ); + continue; } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 0342a323905876..b2f7a1d7b39678 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -15,6 +15,7 @@ use { AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_FIFO, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, }, + blockstore_processor::BlockstoreProcessorError, leader_schedule_cache::LeaderScheduleCache, next_slots_iterator::NextSlotsIterator, shred::{ @@ -47,6 +48,7 @@ use { account::ReadableAccount, address_lookup_table::state::AddressLookupTable, clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND}, + feature_set::FeatureSet, genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE}, hash::Hash, pubkey::Pubkey, @@ -170,6 +172,48 @@ impl AsRef for WorkingEntry { } } +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct LastFECSetCheckResults { + is_full: bool, + is_retransmitter_signed: bool, +} + +impl LastFECSetCheckResults { + pub fn report_metrics(&self, slot: Slot, hash: Hash) { + if !self.is_full { + datapoint_warn!( + "incomplete_final_fec_set", + ("slot", slot, i64), + ("hash", hash.to_string(), String) + ); + } + if !self.is_retransmitter_signed { + datapoint_warn!( + "invalid_retransmitter_signature_final_fec_set", + ("slot", slot, i64), + ("hash", hash.to_string(), String) + ); + } + } + + pub fn get_result( + &self, + feature_set: &FeatureSet, + ) -> std::result::Result<(), BlockstoreProcessorError> { + if feature_set.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) + && !self.is_full + { + return Err(BlockstoreProcessorError::IncompleteFinalFecSet); + } else if feature_set + .is_active(&solana_sdk::feature_set::verify_retransmitter_signature::id()) + && !self.is_retransmitter_signed + { + return Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet); + } + Ok(()) + } +} + pub struct InsertResults { completed_data_set_infos: Vec, duplicate_shreds: Vec, @@ -3680,15 +3724,21 @@ impl Blockstore { self.get_slot_entries_in_block(slot, vec![(start_index, end_index)], slot_meta) } - /// Returns true if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of a - /// slot have the same merkle root, indicating they are a part of the same - /// FEC set. + /// Performs checks on the last FEC set for this slot. + /// - `is_full` will be true if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of + /// `slot` have the same merkle root, indicating they are a part of the same FEC set. + /// This indicates that the last FEC set is sufficiently sized. + /// - `is_retransmitter_signed` will be true if the last `DATA_SHREDS_PER_FEC_BLOCK` + /// data shreds of `slot` are of the retransmitter variant. Since we already discard + /// invalid signatures on ingestion, this indicates that the last FEC set is properly + /// signed by retransmitters. + /// /// Will fail if: /// - Slot meta is missing /// - LAST_SHRED_IN_SLOT flag has not been received /// - There are missing shreds in the last fec set /// - The block contains legacy shreds - pub fn is_last_fec_set_full(&self, slot: Slot) -> Result { + pub fn check_last_fec_set(&self, slot: Slot) -> Result { // We need to check if the last FEC set index contains at least `DATA_SHREDS_PER_FEC_BLOCK` data shreds. // We compare the merkle roots of the last `DATA_SHREDS_PER_FEC_BLOCK` shreds in this block. // Since the merkle root contains the fec_set_index, if all of them match, we know that the last fec set has @@ -3703,11 +3753,14 @@ impl Blockstore { const_assert_eq!(MINIMUM_INDEX, 31); let Some(start_index) = last_shred_index.checked_sub(MINIMUM_INDEX) else { warn!("Slot {slot} has only {} shreds, fewer than the {DATA_SHREDS_PER_FEC_BLOCK} required", last_shred_index + 1); - return Ok(false); + return Ok(LastFECSetCheckResults { + is_full: false, + is_retransmitter_signed: false, + }); }; let keys = (start_index..=last_shred_index).map(|index| (slot, index)); - let last_merkle_roots: Vec = self + let (merkle_root, is_retransmitter_signed) = self .data_shred_cf .multi_get_bytes(keys) .into_iter() @@ -3718,17 +3771,38 @@ impl Blockstore { warn!("Missing shred for {slot} index {shred_index}"); BlockstoreError::MissingShred(slot, shred_index) })?; - shred::layout::get_merkle_root(&shred_bytes).ok_or_else(|| { - let shred_index = start_index + u64::try_from(offset).unwrap(); - warn!("Found legacy shred for {slot}, index {shred_index}"); - BlockstoreError::LegacyShred(slot, shred_index) - }) + let is_retransmitter_signed = shred::layout::is_retransmitter_signed(&shred_bytes) + .map_err(|_| { + let shred_index = start_index + u64::try_from(offset).unwrap(); + warn!("Found legacy shred for {slot}, index {shred_index}"); + BlockstoreError::LegacyShred(slot, shred_index) + })?; + let merkle_root = + shred::layout::get_merkle_root(&shred_bytes).ok_or_else(|| { + let shred_index = start_index + u64::try_from(offset).unwrap(); + warn!("Unable to read merkle root for {slot}, index {shred_index}"); + BlockstoreError::MissingMerkleRoot(slot, shred_index) + })?; + Ok((merkle_root, is_retransmitter_signed)) }) - .dedup_by(|res1, res2| res1.as_ref().ok() == res2.as_ref().ok()) - .collect::>>()?; + .process_results::<_, _, BlockstoreError, _>(|iter| { + // lift and dedup merkle root and retransmitter individually to avoid allocation + // and still be able to report metrics accurately for each check. + iter.map(|(mr, rs)| (Some(mr), Some(rs))) + .reduce(|(mr1, rs1), (mr2, rs2)| { + ( + if mr1 == mr2 { mr1 } else { None }, + if rs1 == rs2 { rs1 } else { None }, + ) + }) + .expect("There must be at least 1 shred in order to reach the above reduce without early error") + })?; - // After the dedup there should be exactly one Hash left if the shreds were part of the same FEC set. - Ok(last_merkle_roots.len() == 1) + // After the dedup there should be exactly one Hash left and one true value + Ok(LastFECSetCheckResults { + is_full: merkle_root.is_some(), + is_retransmitter_signed: is_retransmitter_signed.unwrap_or(false), + }) } /// Returns a mapping from each elements of `slots` to a list of the @@ -11887,7 +11961,7 @@ pub mod tests { // Missing slot meta assert_matches!( - blockstore.is_last_fec_set_full(0), + blockstore.check_last_fec_set(0), Err(BlockstoreError::SlotUnavailable) ); @@ -11902,7 +11976,7 @@ pub mod tests { let meta = blockstore.meta(slot).unwrap().unwrap(); assert!(meta.last_index.is_none()); assert_matches!( - blockstore.is_last_fec_set_full(slot), + blockstore.check_last_fec_set(slot), Err(BlockstoreError::UnknownLastIndex(_)) ); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); @@ -11914,14 +11988,14 @@ pub mod tests { let meta = blockstore.meta(slot).unwrap().unwrap(); assert_eq!(meta.last_index, Some(total_shreds - 1)); assert_matches!( - blockstore.is_last_fec_set_full(slot), + blockstore.check_last_fec_set(slot), Err(BlockstoreError::MissingShred(_, _)) ); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); // Full slot blockstore.insert_shreds(data_shreds, None, false).unwrap(); - assert!(blockstore.is_last_fec_set_full(slot).unwrap()); + assert!(blockstore.check_last_fec_set(slot).unwrap().is_full); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); // Slot has less than DATA_SHREDS_PER_FEC_BLOCK shreds in total @@ -11959,7 +12033,7 @@ pub mod tests { let mut slot_meta = blockstore.meta(slot).unwrap().unwrap(); slot_meta.last_index = Some(last_index as u64); blockstore.put_meta(slot, &slot_meta).unwrap(); - assert!(!blockstore.is_last_fec_set_full(slot).unwrap()); + assert!(!blockstore.check_last_fec_set(slot).unwrap().is_full); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); // Slot has more than DATA_SHREDS_PER_FEC_BLOCK in total, but last FEC set has less @@ -11998,6 +12072,6 @@ pub mod tests { let mut slot_meta = blockstore.meta(slot).unwrap().unwrap(); slot_meta.last_index = Some(last_index as u64); blockstore.put_meta(slot, &slot_meta).unwrap(); - assert!(!blockstore.is_last_fec_set_full(slot).unwrap()); + assert!(!blockstore.check_last_fec_set(slot).unwrap().is_full); } } diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index dca8d9b524c20e..00eea6f811ebcb 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -159,6 +159,8 @@ pub enum BlockstoreError { MissingShred(Slot, u64), #[error("legacy shred slot {0}, index {1}")] LegacyShred(Slot, u64), + #[error("unable to read merkle root slot {0}, index {1}")] + MissingMerkleRoot(Slot, u64), } pub type Result = std::result::Result; diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 9866ad2291f928..a97c5d7cc275da 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -769,6 +769,9 @@ pub enum BlockstoreProcessorError { #[error("incomplete final fec set")] IncompleteFinalFecSet, + + #[error("invalid retransmitter signature final fec set")] + InvalidRetransmitterSignatureFinalFecSet, } /// Callback for accessing bank state after each slot is confirmed while diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index f2b7a84ed1e2ca..e6d18ef16ae7ea 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -786,6 +786,22 @@ pub mod layout { .ok_or(Error::InvalidPayloadSize(shred.len())) } + pub fn is_retransmitter_signed(shred: &[u8]) -> Result { + match get_shred_variant(shred)? { + ShredVariant::LegacyCode | ShredVariant::LegacyData => Err(Error::InvalidShredVariant), + ShredVariant::MerkleCode { + proof_size: _, + chained: _, + resigned, + } => Ok(resigned), + ShredVariant::MerkleData { + proof_size: _, + chained: _, + resigned, + } => Ok(resigned), + } + } + pub(crate) fn set_retransmitter_signature( shred: &mut [u8], signature: &Signature,