From fe4c2b33ead086f8161686ce61098bf26998be9c Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 18 Jul 2024 16:41:02 -0400 Subject: [PATCH] replay: extend last fec set check for 32+ retransmitter signed shreds (#2101) * replay: extend last fec set check for 32+ retransmitter signed shreds * pr feedback: use separate feature flag * pr feedback: is_retransmitter_signed -> is_retransmitter_signed_variant, false for legacy * pr feedback: update doc comment fail -> error * pr feedback: hash -> bank_hash for report metrics * refactor metrics inside blockstore fn, return block_id for future use * pr feedback: gate metrics reporting * pr feedback: do not distinguish impossible combos, simplify check code * pr feedback: remove report_metrics helper fn * pr feedback: remove metric * pr feedback: block_id -> last_fec_set_merkle_root --- core/src/replay_stage.rs | 41 ++--- ledger/src/blockstore.rs | 252 ++++++++++++++++++++++++++--- ledger/src/blockstore_db.rs | 2 + ledger/src/blockstore_processor.rs | 3 + ledger/src/shred.rs | 16 ++ sdk/src/feature_set.rs | 5 + 6 files changed, 270 insertions(+), 49 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 4a057b11e3bfe1..a83af4c3c28403 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3086,37 +3086,22 @@ impl ReplayStage { } } - if bank.collector_id() != my_pubkey { - // If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK shreds in the last FEC set, - // mark it dead. No reason to perform this check on our leader block. - if !blockstore - .is_last_fec_set_full(bank.slot()) - .inspect_err(|e| { - warn!( - "Unable to determine if last fec set is full for slot {} {}, - marking as dead: {e:?}", - bank.slot(), - bank.hash() - ) - }) - .unwrap_or(false) - { - // Update metric regardless of feature flag - datapoint_warn!( - "incomplete_final_fec_set", - ("slot", bank_slot, i64), - ("hash", bank.hash().to_string(), String) - ); - if bank - .feature_set - .is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) - { + let _block_id = if bank.collector_id() != my_pubkey { + // If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK correctly retransmitted + // shreds in the last FEC set, mark it dead. No reason to perform this check on our leader block. + match blockstore.check_last_fec_set_and_get_block_id( + bank.slot(), + bank.hash(), + &bank.feature_set, + ) { + Ok(block_id) => block_id, + Err(result_err) => { let root = bank_forks.read().unwrap().root(); Self::mark_dead_slot( blockstore, bank, root, - &BlockstoreProcessorError::IncompleteFinalFecSet, + &result_err, rpc_subscriptions, duplicate_slots_tracker, duplicate_confirmed_slots, @@ -3130,7 +3115,9 @@ impl ReplayStage { continue; } } - } + } else { + None + }; let r_replay_stats = replay_stats.read().unwrap(); let replay_progress = bank_progress.replay_progress.clone(); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index afa339483986b1..acb630bebb3633 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -15,6 +15,7 @@ use { AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_FIFO, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, }, + blockstore_processor::BlockstoreProcessorError, leader_schedule_cache::LeaderScheduleCache, next_slots_iterator::NextSlotsIterator, shred::{ @@ -47,6 +48,7 @@ use { account::ReadableAccount, address_lookup_table::state::AddressLookupTable, clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND}, + feature_set::FeatureSet, genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE}, hash::Hash, pubkey::Pubkey, @@ -170,6 +172,31 @@ impl AsRef for WorkingEntry { } } +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct LastFECSetCheckResults { + last_fec_set_merkle_root: Option, + is_retransmitter_signed: bool, +} + +impl LastFECSetCheckResults { + fn get_last_fec_set_merkle_root( + &self, + feature_set: &FeatureSet, + ) -> std::result::Result, BlockstoreProcessorError> { + if feature_set.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) + && self.last_fec_set_merkle_root.is_none() + { + return Err(BlockstoreProcessorError::IncompleteFinalFecSet); + } else if feature_set + .is_active(&solana_sdk::feature_set::vote_only_retransmitter_signed_fec_sets::id()) + && !self.is_retransmitter_signed + { + return Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet); + } + Ok(self.last_fec_set_merkle_root) + } +} + pub struct InsertResults { completed_data_set_infos: Vec, duplicate_shreds: Vec, @@ -3680,15 +3707,53 @@ impl Blockstore { self.get_slot_entries_in_block(slot, vec![(start_index, end_index)], slot_meta) } - /// Returns true if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of a - /// slot have the same merkle root, indicating they are a part of the same - /// FEC set. - /// Will fail if: + /// Performs checks on the last fec set of a replayed slot, and returns the block_id. + /// Returns: + /// - BlockstoreProcessorError::IncompleteFinalFecSet + /// if the last fec set is not full + /// - BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet + /// if the last fec set is not signed by retransmitters + pub fn check_last_fec_set_and_get_block_id( + &self, + slot: Slot, + bank_hash: Hash, + feature_set: &FeatureSet, + ) -> std::result::Result, BlockstoreProcessorError> { + let results = self.check_last_fec_set(slot); + let Ok(results) = results else { + warn!( + "Unable to check the last fec set for slot {} {}, + marking as dead: {results:?}", + slot, bank_hash, + ); + if feature_set.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) { + return Err(BlockstoreProcessorError::IncompleteFinalFecSet); + } + return Ok(None); + }; + // Update metrics + if results.last_fec_set_merkle_root.is_none() { + datapoint_warn!("incomplete_final_fec_set", ("slot", slot, i64),); + } + // Return block id / error based on feature flags + results.get_last_fec_set_merkle_root(feature_set) + } + + /// Performs checks on the last FEC set for this slot. + /// - `block_id` will be `Some(mr)` if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of + /// `slot` have the same merkle root of `mr`, indicating they are a part of the same FEC set. + /// This indicates that the last FEC set is sufficiently sized. + /// - `is_retransmitter_signed` will be true if the last `DATA_SHREDS_PER_FEC_BLOCK` + /// data shreds of `slot` are of the retransmitter variant. Since we already discard + /// invalid signatures on ingestion, this indicates that the last FEC set is properly + /// signed by retransmitters. + /// + /// Will error if: /// - Slot meta is missing /// - LAST_SHRED_IN_SLOT flag has not been received /// - There are missing shreds in the last fec set /// - The block contains legacy shreds - pub fn is_last_fec_set_full(&self, slot: Slot) -> Result { + fn check_last_fec_set(&self, slot: Slot) -> Result { // We need to check if the last FEC set index contains at least `DATA_SHREDS_PER_FEC_BLOCK` data shreds. // We compare the merkle roots of the last `DATA_SHREDS_PER_FEC_BLOCK` shreds in this block. // Since the merkle root contains the fec_set_index, if all of them match, we know that the last fec set has @@ -3703,11 +3768,14 @@ impl Blockstore { const_assert_eq!(MINIMUM_INDEX, 31); let Some(start_index) = last_shred_index.checked_sub(MINIMUM_INDEX) else { warn!("Slot {slot} has only {} shreds, fewer than the {DATA_SHREDS_PER_FEC_BLOCK} required", last_shred_index + 1); - return Ok(false); + return Ok(LastFECSetCheckResults { + last_fec_set_merkle_root: None, + is_retransmitter_signed: false, + }); }; let keys = (start_index..=last_shred_index).map(|index| (slot, index)); - let last_merkle_roots: Vec = self + let deduped_shred_checks: Vec<(Hash, bool)> = self .data_shred_cf .multi_get_bytes(keys) .into_iter() @@ -3718,17 +3786,34 @@ impl Blockstore { warn!("Missing shred for {slot} index {shred_index}"); BlockstoreError::MissingShred(slot, shred_index) })?; - shred::layout::get_merkle_root(&shred_bytes).ok_or_else(|| { - let shred_index = start_index + u64::try_from(offset).unwrap(); - warn!("Found legacy shred for {slot}, index {shred_index}"); - BlockstoreError::LegacyShred(slot, shred_index) - }) + let is_retransmitter_signed = + shred::layout::is_retransmitter_signed_variant(&shred_bytes).map_err(|_| { + let shred_index = start_index + u64::try_from(offset).unwrap(); + warn!("Found legacy shred for {slot}, index {shred_index}"); + BlockstoreError::LegacyShred(slot, shred_index) + })?; + let merkle_root = + shred::layout::get_merkle_root(&shred_bytes).ok_or_else(|| { + let shred_index = start_index + u64::try_from(offset).unwrap(); + warn!("Unable to read merkle root for {slot}, index {shred_index}"); + BlockstoreError::MissingMerkleRoot(slot, shred_index) + })?; + Ok((merkle_root, is_retransmitter_signed)) }) .dedup_by(|res1, res2| res1.as_ref().ok() == res2.as_ref().ok()) - .collect::>>()?; + .collect::>>()?; - // After the dedup there should be exactly one Hash left if the shreds were part of the same FEC set. - Ok(last_merkle_roots.len() == 1) + // After the dedup there should be exactly one Hash left and one true value + let &[(block_id, is_retransmitter_signed)] = deduped_shred_checks.as_slice() else { + return Ok(LastFECSetCheckResults { + last_fec_set_merkle_root: None, + is_retransmitter_signed: false, + }); + }; + Ok(LastFECSetCheckResults { + last_fec_set_merkle_root: Some(block_id), + is_retransmitter_signed, + }) } /// Returns a mapping from each elements of `slots` to a list of the @@ -5231,6 +5316,7 @@ pub mod tests { solana_runtime::bank::{Bank, RewardType}, solana_sdk::{ clock::{DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT}, + feature_set::{vote_only_full_fec_sets, vote_only_retransmitter_signed_fec_sets}, hash::{self, hash, Hash}, instruction::CompiledInstruction, message::v0::LoadedAddresses, @@ -11877,7 +11963,7 @@ pub mod tests { } #[test] - fn test_is_last_fec_set_full() { + fn test_check_last_fec_set() { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); @@ -11894,7 +11980,7 @@ pub mod tests { // Missing slot meta assert_matches!( - blockstore.is_last_fec_set_full(0), + blockstore.check_last_fec_set(0), Err(BlockstoreError::SlotUnavailable) ); @@ -11909,7 +11995,7 @@ pub mod tests { let meta = blockstore.meta(slot).unwrap().unwrap(); assert!(meta.last_index.is_none()); assert_matches!( - blockstore.is_last_fec_set_full(slot), + blockstore.check_last_fec_set(slot), Err(BlockstoreError::UnknownLastIndex(_)) ); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); @@ -11921,14 +12007,17 @@ pub mod tests { let meta = blockstore.meta(slot).unwrap().unwrap(); assert_eq!(meta.last_index, Some(total_shreds - 1)); assert_matches!( - blockstore.is_last_fec_set_full(slot), + blockstore.check_last_fec_set(slot), Err(BlockstoreError::MissingShred(_, _)) ); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); // Full slot + let block_id = data_shreds[0].merkle_root().unwrap(); blockstore.insert_shreds(data_shreds, None, false).unwrap(); - assert!(blockstore.is_last_fec_set_full(slot).unwrap()); + let results = blockstore.check_last_fec_set(slot).unwrap(); + assert_eq!(results.last_fec_set_merkle_root, Some(block_id)); + assert!(results.is_retransmitter_signed); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); // Slot has less than DATA_SHREDS_PER_FEC_BLOCK shreds in total @@ -11966,7 +12055,9 @@ pub mod tests { let mut slot_meta = blockstore.meta(slot).unwrap().unwrap(); slot_meta.last_index = Some(last_index as u64); blockstore.put_meta(slot, &slot_meta).unwrap(); - assert!(!blockstore.is_last_fec_set_full(slot).unwrap()); + let results = blockstore.check_last_fec_set(slot).unwrap(); + assert!(results.last_fec_set_merkle_root.is_none()); + assert!(!results.is_retransmitter_signed); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); // Slot has more than DATA_SHREDS_PER_FEC_BLOCK in total, but last FEC set has less @@ -12005,6 +12096,123 @@ pub mod tests { let mut slot_meta = blockstore.meta(slot).unwrap().unwrap(); slot_meta.last_index = Some(last_index as u64); blockstore.put_meta(slot, &slot_meta).unwrap(); - assert!(!blockstore.is_last_fec_set_full(slot).unwrap()); + let results = blockstore.check_last_fec_set(slot).unwrap(); + assert!(results.last_fec_set_merkle_root.is_none()); + assert!(!results.is_retransmitter_signed); + blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); + + // Slot is full, but does not contain retransmitter shreds + let fec_set_index = 0; + let (first_data_shreds, _, _) = + setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( + slot, + parent_slot, + 200, + fec_set_index, + // Do not set merkle root, so shreds are not signed + None, + true, + ); + assert!(first_data_shreds.len() > DATA_SHREDS_PER_FEC_BLOCK); + let block_id = first_data_shreds[0].merkle_root().unwrap(); + blockstore + .insert_shreds(first_data_shreds, None, false) + .unwrap(); + let results = blockstore.check_last_fec_set(slot).unwrap(); + assert_eq!(results.last_fec_set_merkle_root, Some(block_id)); + assert!(!results.is_retransmitter_signed); + } + + #[test] + fn test_last_fec_set_check_results() { + let enabled_feature_set = FeatureSet::all_enabled(); + let disabled_feature_set = FeatureSet::default(); + let mut full_only = FeatureSet::default(); + full_only.activate(&vote_only_full_fec_sets::id(), 0); + let mut retransmitter_only = FeatureSet::default(); + retransmitter_only.activate(&vote_only_retransmitter_signed_fec_sets::id(), 0); + + let results = LastFECSetCheckResults { + last_fec_set_merkle_root: None, + is_retransmitter_signed: false, + }; + assert_matches!( + results.get_last_fec_set_merkle_root(&enabled_feature_set), + Err(BlockstoreProcessorError::IncompleteFinalFecSet) + ); + assert_matches!( + results.get_last_fec_set_merkle_root(&full_only), + Err(BlockstoreProcessorError::IncompleteFinalFecSet) + ); + assert_matches!( + results.get_last_fec_set_merkle_root(&retransmitter_only), + Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet) + ); + assert!(results + .get_last_fec_set_merkle_root(&disabled_feature_set) + .unwrap() + .is_none()); + + let block_id = Hash::new_unique(); + let results = LastFECSetCheckResults { + last_fec_set_merkle_root: Some(block_id), + is_retransmitter_signed: false, + }; + assert_matches!( + results.get_last_fec_set_merkle_root(&enabled_feature_set), + Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet) + ); + assert_eq!( + results.get_last_fec_set_merkle_root(&full_only).unwrap(), + Some(block_id) + ); + assert_matches!( + results.get_last_fec_set_merkle_root(&retransmitter_only), + Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet) + ); + assert_eq!( + results + .get_last_fec_set_merkle_root(&disabled_feature_set) + .unwrap(), + Some(block_id) + ); + + let results = LastFECSetCheckResults { + last_fec_set_merkle_root: None, + is_retransmitter_signed: true, + }; + assert_matches!( + results.get_last_fec_set_merkle_root(&enabled_feature_set), + Err(BlockstoreProcessorError::IncompleteFinalFecSet) + ); + assert_matches!( + results.get_last_fec_set_merkle_root(&full_only), + Err(BlockstoreProcessorError::IncompleteFinalFecSet) + ); + assert!(results + .get_last_fec_set_merkle_root(&retransmitter_only) + .unwrap() + .is_none()); + assert!(results + .get_last_fec_set_merkle_root(&disabled_feature_set) + .unwrap() + .is_none()); + + let block_id = Hash::new_unique(); + let results = LastFECSetCheckResults { + last_fec_set_merkle_root: Some(block_id), + is_retransmitter_signed: true, + }; + for feature_set in [ + enabled_feature_set, + disabled_feature_set, + full_only, + retransmitter_only, + ] { + assert_eq!( + results.get_last_fec_set_merkle_root(&feature_set).unwrap(), + Some(block_id) + ); + } } } diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index dca8d9b524c20e..00eea6f811ebcb 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -159,6 +159,8 @@ pub enum BlockstoreError { MissingShred(Slot, u64), #[error("legacy shred slot {0}, index {1}")] LegacyShred(Slot, u64), + #[error("unable to read merkle root slot {0}, index {1}")] + MissingMerkleRoot(Slot, u64), } pub type Result = std::result::Result; diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index b9beedf1a24d5a..67330d5a949cbb 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -771,6 +771,9 @@ pub enum BlockstoreProcessorError { #[error("incomplete final fec set")] IncompleteFinalFecSet, + + #[error("invalid retransmitter signature final fec set")] + InvalidRetransmitterSignatureFinalFecSet, } /// Callback for accessing bank state after each slot is confirmed while diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index f2b7a84ed1e2ca..814ec2b5bf303a 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -786,6 +786,22 @@ pub mod layout { .ok_or(Error::InvalidPayloadSize(shred.len())) } + pub fn is_retransmitter_signed_variant(shred: &[u8]) -> Result { + match get_shred_variant(shred)? { + ShredVariant::LegacyCode | ShredVariant::LegacyData => Ok(false), + ShredVariant::MerkleCode { + proof_size: _, + chained: _, + resigned, + } => Ok(resigned), + ShredVariant::MerkleData { + proof_size: _, + chained: _, + resigned, + } => Ok(resigned), + } + } + pub(crate) fn set_retransmitter_signature( shred: &mut [u8], signature: &Signature, diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 21c48305498923..b9972d3240be08 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -841,6 +841,10 @@ pub mod verify_retransmitter_signature { solana_sdk::declare_id!("BZ5g4hRbu5hLQQBdPyo2z9icGyJ8Khiyj3QS6dhWijTb"); } +pub mod vote_only_retransmitter_signed_fec_sets { + solana_sdk::declare_id!("RfEcA95xnhuwooVAhUUksEJLZBF7xKCLuqrJoqk4Zph"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -1046,6 +1050,7 @@ lazy_static! { (move_stake_and_move_lamports_ixs::id(), "Enable MoveStake and MoveLamports stake program instructions #1610"), (ed25519_precompile_verify_strict::id(), "Use strict verification in ed25519 precompile SIMD-0152"), (verify_retransmitter_signature::id(), "Verify retransmitter signature #1840"), + (vote_only_retransmitter_signed_fec_sets::id(), "vote only on retransmitter signed fec sets"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()