From 196b7e1d7438c735b3a85cf932b7b7dc036c427b Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Fri, 27 Oct 2023 03:11:04 +0000 Subject: [PATCH] send duplicate shred proofs for merkle root conflicts --- core/src/window_service.rs | 1 + gossip/src/cluster_info.rs | 2 +- gossip/src/duplicate_shred.rs | 242 ++++++++++++-- ledger/src/blockstore.rs | 582 +++++++++++++++++++++++++++++++-- ledger/src/blockstore_meta.rs | 22 ++ ledger/src/shred.rs | 6 + ledger/src/shred/merkle.rs | 4 +- ledger/src/shred/shred_code.rs | 9 +- ledger/src/shred/shred_data.rs | 9 +- 9 files changed, 822 insertions(+), 55 deletions(-) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index a68a20e2078471..ebff8a6ec79382 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -148,6 +148,7 @@ fn run_check_duplicate( let (shred1, shred2) = match shred { PossibleDuplicateShred::LastIndexConflict(shred, conflict) => (shred, conflict), PossibleDuplicateShred::ErasureConflict(shred, conflict) => (shred, conflict), + PossibleDuplicateShred::MerkleRootConflict(shred, conflict) => (shred, conflict), PossibleDuplicateShred::Exists(shred) => { // Unlike the other cases we have to wait until here to decide to handle the duplicate and store // in blockstore. This is because the duplicate could have been part of the same insert batch, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 67f713676d5d2f..8c91757e047a1f 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -267,7 +267,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "CVvKB495YW6JN4w1rWwajyZmG5wvNhmD97V99rSv9fGw")] +#[frozen_abi(digest = "CroFF8MTW2fxatwv7ALz9Wde4e9L9yE4L59yebM3XuWe")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index b1ceab79b26949..82e09456c53112 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -8,6 +8,7 @@ use { }, solana_sdk::{ clock::Slot, + hash::Hash, pubkey::Pubkey, sanitize::{Sanitize, SanitizeError}, }, @@ -30,7 +31,7 @@ pub struct DuplicateShred { pub(crate) wallclock: u64, pub(crate) slot: Slot, _unused: u32, - shred_type: ShredType, + _unused_shred_type: ShredType, // Serialized DuplicateSlotProof split into chunks. num_chunks: u8, chunk_index: u8, @@ -66,6 +67,8 @@ pub enum Error { InvalidErasureMetaConflict, #[error("invalid last index conflict")] InvalidLastIndexConflict, + #[error("invalid merkle root conflict")] + InvalidMerkleRootConflict, #[error("invalid signature")] InvalidSignature, #[error("invalid size limit")] @@ -78,8 +81,6 @@ pub enum Error { MissingDataChunk, #[error("(de)serialization error")] SerializationError(#[from] bincode::Error), - #[error("shred type mismatch")] - ShredTypeMismatch, #[error("slot mismatch")] SlotMismatch, #[error("type conversion error")] @@ -90,8 +91,8 @@ pub enum Error { /// Check that `shred1` and `shred2` indicate a valid duplicate proof /// - Must be for the same slot -/// - Must have the same `shred_type` /// - Must both sigverify for the correct leader +/// - Must have a merkle root conflict, otherwise `shred1` and `shred2` must have the same `shred_type` /// - If `shred1` and `shred2` share the same index they must be not equal /// - If `shred1` and `shred2` do not share the same index and are data shreds /// verify that they indicate an index conflict. One of them must be the @@ -106,10 +107,6 @@ where return Err(Error::SlotMismatch); } - if shred1.shred_type() != shred2.shred_type() { - return Err(Error::ShredTypeMismatch); - } - if let Some(leader_schedule) = leader_schedule { let slot_leader = leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader(shred1.slot()))?; @@ -118,6 +115,23 @@ where } } + // Merkle root conflict check + if let Some((mr1, mr2)) = shred1.merkle_root().zip(shred2.merkle_root()) { + // Hash::default check to exclude legacy shreds + if shred1.fec_set_index() == shred2.fec_set_index() + && mr1 != mr2 + && mr1 != Hash::default() + && mr2 != Hash::default() + { + return Ok(()); + } + } + + if shred1.shred_type() != shred2.shred_type() { + // Only valid proof here is a merkle conflict which was checked above + return Err(Error::InvalidMerkleRootConflict); + } + if shred1.index() == shred2.index() { if shred1.payload() != shred2.payload() { return Ok(()); @@ -164,7 +178,7 @@ where } let other_shred = Shred::new_from_serialized_shred(other_payload)?; check_shreds(leader_schedule, &shred, &other_shred)?; - let (slot, shred_type) = (shred.slot(), shred.shred_type()); + let slot = shred.slot(); let proof = DuplicateSlotProof { shred1: shred.into_payload(), shred2: other_shred.into_payload(), @@ -184,27 +198,21 @@ where from: self_pubkey, wallclock, slot, - shred_type, num_chunks, chunk_index: i as u8, chunk, _unused: 0, + _unused_shred_type: ShredType::Code, }); Ok(chunks) } // Returns a predicate checking if a duplicate-shred chunk matches -// (slot, shred_type) and has valid chunk_index. -fn check_chunk( - slot: Slot, - shred_type: ShredType, - num_chunks: u8, -) -> impl Fn(&DuplicateShred) -> Result<(), Error> { +// the slot and has valid chunk_index. +fn check_chunk(slot: Slot, num_chunks: u8) -> impl Fn(&DuplicateShred) -> Result<(), Error> { move |dup| { if dup.slot != slot { Err(Error::SlotMismatch) - } else if dup.shred_type != shred_type { - Err(Error::ShredTypeMismatch) } else if dup.num_chunks != num_chunks { Err(Error::NumChunksMismatch) } else if dup.chunk_index >= num_chunks { @@ -226,13 +234,12 @@ pub(crate) fn into_shreds( let mut chunks = chunks.into_iter(); let DuplicateShred { slot, - shred_type, num_chunks, chunk_index, chunk, .. } = chunks.next().ok_or(Error::InvalidDuplicateShreds)?; - let check_chunk = check_chunk(slot, shred_type, num_chunks); + let check_chunk = check_chunk(slot, num_chunks); let mut data = HashMap::new(); data.insert(chunk_index, chunk); for chunk in chunks { @@ -260,8 +267,6 @@ pub(crate) fn into_shreds( let shred2 = Shred::new_from_serialized_shred(proof.shred2)?; if shred1.slot() != slot || shred2.slot() != slot { Err(Error::SlotMismatch) - } else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type { - Err(Error::ShredTypeMismatch) } else { check_shreds(Some(|_| Some(slot_leader).copied()), &shred1, &shred2)?; Ok((shred1, shred2)) @@ -300,7 +305,7 @@ pub(crate) mod tests { from: Pubkey::new_unique(), wallclock: u64::MAX, slot: Slot::MAX, - shred_type: ShredType::Data, + _unused_shred_type: ShredType::Data, num_chunks: u8::MAX, chunk_index: u8::MAX, chunk: Vec::default(), @@ -421,7 +426,7 @@ pub(crate) mod tests { wallclock: u64, max_size: usize, // Maximum serialized size of each DuplicateShred. ) -> Result, Error> { - let (slot, shred_type) = (shred.slot(), shred.shred_type()); + let slot = shred.slot(); let proof = DuplicateSlotProof { shred1: shred.into_payload(), shred2: other_shred.into_payload(), @@ -437,11 +442,11 @@ pub(crate) mod tests { from: self_pubkey, wallclock, slot, - shred_type, num_chunks, chunk_index: i as u8, chunk, _unused: 0, + _unused_shred_type: ShredType::Code, }); Ok(chunks) } @@ -815,6 +820,14 @@ pub(crate) mod tests { &leader, merkle_variant, ); + let coding_shreds_different_merkle_root = new_rand_coding_shreds( + &mut rng, + next_shred_index, + 10, + &shredder, + &leader, + merkle_variant, + ); let coding_shreds_bigger = new_rand_coding_shreds( &mut rng, next_shred_index, @@ -833,10 +846,18 @@ pub(crate) mod tests { ); // Same fec-set, different index, different erasure meta - let test_cases = vec![ + let mut test_cases = vec![ (coding_shreds[0].clone(), coding_shreds_bigger[1].clone()), (coding_shreds[0].clone(), coding_shreds_smaller[1].clone()), ]; + if merkle_variant { + // Same erasure config, different merkle root is still different + // erasure meta + test_cases.push(( + coding_shreds[0].clone(), + coding_shreds_different_merkle_root[0].clone(), + )); + } for (shred1, shred2) in test_cases.into_iter() { let chunks: Vec<_> = from_shred( shred1.clone(), @@ -949,4 +970,173 @@ pub(crate) mod tests { ); } } + + #[test] + fn test_merkle_root_conflict_round_trip() { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); + let next_shred_index = rng.gen_range(0..31_000); + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + let (coding_shreds, data_shreds) = new_rand_shreds( + &mut rng, + next_shred_index, + next_shred_index, + 10, + true, + &shredder, + &leader, + false, + ); + let (diff_coding_shreds, diff_data_shreds) = new_rand_shreds( + &mut rng, + next_shred_index, + next_shred_index, + 10, + true, + &shredder, + &leader, + false, + ); + + let test_cases = vec![ + (data_shreds[0].clone(), diff_data_shreds[1].clone()), + (coding_shreds[0].clone(), diff_coding_shreds[1].clone()), + (data_shreds[0].clone(), diff_coding_shreds[0].clone()), + (coding_shreds[0].clone(), diff_data_shreds[0].clone()), + ]; + for (shred1, shred2) in test_cases.into_iter() { + let chunks: Vec<_> = from_shred( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.payload().clone(), + Some(leader_schedule), + rng.gen(), // wallclock + 512, // max_size + ) + .unwrap() + .collect(); + assert!(chunks.len() > 4); + let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap(); + assert_eq!(shred1, shred3); + assert_eq!(shred2, shred4); + } + } + + #[test] + fn test_merkle_root_conflict_invalid() { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); + let next_shred_index = rng.gen_range(0..31_000); + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + + let (data_shreds, coding_shreds) = new_rand_shreds( + &mut rng, + next_shred_index, + next_shred_index, + 10, + true, + &shredder, + &leader, + true, + ); + + let (next_data_shreds, next_coding_shreds) = new_rand_shreds( + &mut rng, + next_shred_index + 1, + next_shred_index + 1, + 10, + true, + &shredder, + &leader, + true, + ); + + let (legacy_data_shreds, legacy_coding_shreds) = new_rand_shreds( + &mut rng, + next_shred_index, + next_shred_index, + 10, + false, + &shredder, + &leader, + true, + ); + + let test_cases = vec![ + // Same fec set same merkle root + (coding_shreds[0].clone(), data_shreds[0].clone()), + (data_shreds[0].clone(), coding_shreds[0].clone()), + // Different FEC set different merkle root + (coding_shreds[0].clone(), next_data_shreds[0].clone()), + (next_coding_shreds[0].clone(), data_shreds[0].clone()), + (data_shreds[0].clone(), next_coding_shreds[0].clone()), + (next_data_shreds[0].clone(), coding_shreds[0].clone()), + // Legacy shreds + ( + legacy_coding_shreds[0].clone(), + legacy_data_shreds[0].clone(), + ), + ( + legacy_data_shreds[0].clone(), + legacy_coding_shreds[0].clone(), + ), + // Mix of legacy and merkle + (legacy_coding_shreds[0].clone(), data_shreds[0].clone()), + (coding_shreds[0].clone(), legacy_data_shreds[0].clone()), + (legacy_data_shreds[0].clone(), coding_shreds[0].clone()), + (data_shreds[0].clone(), legacy_coding_shreds[0].clone()), + // Mix of legacy and merkle with different fec index + (legacy_coding_shreds[0].clone(), next_data_shreds[0].clone()), + (next_coding_shreds[0].clone(), legacy_data_shreds[0].clone()), + (legacy_data_shreds[0].clone(), next_coding_shreds[0].clone()), + (next_data_shreds[0].clone(), legacy_coding_shreds[0].clone()), + ]; + for (shred1, shred2) in test_cases.into_iter() { + assert_matches!( + from_shred( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.payload().clone(), + Some(leader_schedule), + rng.gen(), // wallclock + 512, // max_size + ) + .err() + .unwrap(), + Error::InvalidMerkleRootConflict + ); + + let chunks: Vec<_> = from_shred_bypass_checks( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.clone(), + rng.gen(), // wallclock + 512, // max_size + ) + .unwrap() + .collect(); + assert!(chunks.len() > 4); + + assert_matches!( + into_shreds(&leader.pubkey(), chunks).err().unwrap(), + Error::InvalidMerkleRootConflict + ); + } + } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 3010a65be7f90c..1c2bb7cdf3a298 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -143,6 +143,7 @@ pub enum PossibleDuplicateShred { Exists(Shred), // Blockstore has another shred in its spot LastIndexConflict(/* original */ Shred, /* conflict */ Vec), // The index of this shred conflicts with `slot_meta.last_index` ErasureConflict(/* original */ Shred, /* conflict */ Vec), // The coding shred has a conflict in the erasure_meta + MerkleRootConflict(/* original */ Shred, /* conflict */ Vec), } impl PossibleDuplicateShred { @@ -151,6 +152,7 @@ impl PossibleDuplicateShred { Self::Exists(shred) => shred.slot(), Self::LastIndexConflict(shred, _) => shred.slot(), Self::ErasureConflict(shred, _) => shred.slot(), + Self::MerkleRootConflict(shred, _) => shred.slot(), } } } @@ -464,6 +466,10 @@ impl Blockstore { self.erasure_meta_cf.get(erasure_set.store_key()) } + fn merkle_root_meta(&self, erasure_set: ErasureSetId) -> Result> { + self.merkle_root_meta_cf.get(erasure_set.key()) + } + /// Check whether the specified slot is an orphan slot which does not /// have a parent slot. /// @@ -793,6 +799,9 @@ impl Blockstore { /// - [`cf::ErasureMeta`]: the associated ErasureMeta of the coding and data /// shreds inside `shreds` will be updated and committed to /// `cf::ErasureMeta`. + /// - [`cf::MerkleRootMeta`]: the associated MerkleRootMeta of the coding and data + /// shreds inside `shreds` will be updated and committed to + /// `cf::MerkleRootMeta`. /// - [`cf::Index`]: stores (slot id, index to the index_working_set_entry) /// pair to the `cf::Index` column family for each index_working_set_entry /// which insert did occur in this function call. @@ -835,6 +844,7 @@ impl Blockstore { let mut just_inserted_shreds = HashMap::with_capacity(shreds.len()); let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); let mut duplicate_shreds = vec![]; @@ -854,6 +864,7 @@ impl Blockstore { match self.check_insert_data_shred( shred, &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut slot_meta_working_set, &mut write_batch, @@ -891,6 +902,7 @@ impl Blockstore { self.check_insert_coding_shred( shred, &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_inserted_shreds, @@ -937,6 +949,7 @@ impl Blockstore { match self.check_insert_data_shred( shred.clone(), &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut slot_meta_working_set, &mut write_batch, @@ -1002,6 +1015,10 @@ impl Blockstore { write_batch.put::(erasure_set.store_key(), &erasure_meta)?; } + for (erasure_set, merkle_root_meta) in merkle_root_metas { + write_batch.put::(erasure_set.key(), &merkle_root_meta)?; + } + for (&slot, index_working_set_entry) in index_working_set.iter() { if index_working_set_entry.did_insert_occur { write_batch.put::(slot, &index_working_set_entry.index)?; @@ -1159,6 +1176,7 @@ impl Blockstore { &self, shred: Shred, erasure_metas: &mut HashMap, + merkle_root_metas: &mut HashMap, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, just_received_shreds: &mut HashMap, @@ -1175,10 +1193,20 @@ impl Blockstore { self.get_index_meta_entry(slot, index_working_set, index_meta_time_us); let index_meta = &mut index_meta_working_set_entry.index; + let erasure_set = shred.erasure_set(); + let erasure_meta = erasure_metas.entry(erasure_set).or_insert_with(|| { + self.erasure_meta(erasure_set) + .expect("Expect database get to succeed") + .unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap()) + }); + if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(erasure_set) { + if let Some(meta) = self.merkle_root_meta(erasure_set).unwrap() { + entry.insert(meta); + } + } // This gives the index of first coding shred in this FEC block // So, all coding shreds in a given FEC block will have the same set index - if !is_trusted { if index_meta.coding().contains(shred_index) { metrics.num_coding_shreds_exists += 1; @@ -1190,14 +1218,19 @@ impl Blockstore { metrics.num_coding_shreds_invalid += 1; return false; } - } - let erasure_set = shred.erasure_set(); - let erasure_meta = erasure_metas.entry(erasure_set).or_insert_with(|| { - self.erasure_meta(erasure_set) - .expect("Expect database get to succeed") - .unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap()) - }); + if let Some(merkle_root_meta) = merkle_root_metas.get(&erasure_set) { + if !self.perform_merkle_check( + just_received_shreds, + slot, + merkle_root_meta, + &shred, + duplicate_shreds, + ) { + return false; + } + } + } if !erasure_meta.check_coding_shred(&shred) { metrics.num_coding_shreds_invalid_erasure_config += 1; @@ -1255,6 +1288,12 @@ impl Blockstore { if result { index_meta_working_set_entry.did_insert_occur = true; metrics.num_inserted += 1; + + // Important that we do this only after the insert has succeeded to ensure that MerkleRootMeta is consistent + // with the rest of blockstore + merkle_root_metas + .entry(erasure_set) + .or_insert(MerkleRootMeta::from_shred(&shred)); } if let HashMapEntry::Vacant(entry) = just_received_shreds.entry(shred.id()) { @@ -1303,8 +1342,8 @@ impl Blockstore { /// /// The resulting `write_batch` may include updates to [`cf::DeadSlots`] /// and [`cf::ShredData`]. Note that it will also update the in-memory copy - /// of `erasure_metas` and `index_working_set`, which will later be - /// used to update other column families such as [`cf::ErasureMeta`] and + /// of `erasure_metas`, `merkle_root_metas`, and `index_working_set`, which will + /// later be used to update other column families such as [`cf::ErasureMeta`] and /// [`cf::Index`]. /// /// Arguments: @@ -1312,6 +1351,9 @@ impl Blockstore { /// - `erasure_metas`: the in-memory hash-map that maintains the dirty /// copy of the erasure meta. It will later be written to /// `cf::ErasureMeta` in insert_shreds_handle_duplicate(). + /// - `merkle_root_metas`: the in-memory hash-map that maintains the dirty + /// copy of the merkle root meta. It will later be written to + /// `cf::MerkleRootMeta` in `insert_shreds_handle_duplicate()`. /// - `index_working_set`: the in-memory hash-map that maintains the /// dirty copy of the index meta. It will later be written to /// `cf::Index` in insert_shreds_handle_duplicate(). @@ -1335,6 +1377,7 @@ impl Blockstore { &self, shred: Shred, erasure_metas: &mut HashMap, + merkle_root_metas: &mut HashMap, index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, @@ -1360,6 +1403,12 @@ impl Blockstore { ); let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut(); + let erasure_set = shred.erasure_set(); + if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(erasure_set) { + if let Some(meta) = self.merkle_root_meta(erasure_set).unwrap() { + entry.insert(meta); + } + } if !is_trusted { if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) { @@ -1392,9 +1441,20 @@ impl Blockstore { ) { return Err(InsertDataShredError::InvalidShred); } + + if let Some(merkle_root_meta) = merkle_root_metas.get(&erasure_set) { + if !self.perform_merkle_check( + just_inserted_shreds, + slot, + merkle_root_meta, + &shred, + duplicate_shreds, + ) { + return Err(InsertDataShredError::InvalidShred); + } + } } - let erasure_set = shred.erasure_set(); let newly_completed_data_sets = self.insert_data_shred( slot_meta, index_meta.data_mut(), @@ -1402,6 +1462,9 @@ impl Blockstore { write_batch, shred_source, )?; + merkle_root_metas + .entry(erasure_set) + .or_insert(MerkleRootMeta::from_shred(&shred)); just_inserted_shreds.insert(shred.id(), shred); index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; @@ -1446,20 +1509,95 @@ impl Blockstore { shred_index < slot_meta.consumed || data_index.contains(shred_index) } - fn get_data_shred_from_just_inserted_or_db<'a>( + fn get_shred_from_just_inserted_or_db<'a>( &'a self, just_inserted_shreds: &'a HashMap, slot: Slot, - index: u64, + index: u32, + shred_type: ShredType, ) -> Cow<'a, Vec> { - let key = ShredId::new(slot, u32::try_from(index).unwrap(), ShredType::Data); + let key = ShredId::new(slot, index, shred_type); if let Some(shred) = just_inserted_shreds.get(&key) { Cow::Borrowed(shred.payload()) - } else { + } else if shred_type == ShredType::Data { // If it doesn't exist in the just inserted set, it must exist in // the backing store - Cow::Owned(self.get_data_shred(slot, index).unwrap().unwrap()) + Cow::Owned( + self.get_data_shred(slot, u64::from(index)) + .unwrap() + .unwrap_or_else(|| { + panic!("{} {} {:?} must be present!", slot, index, shred_type) + }), + ) + } else { + Cow::Owned( + self.get_coding_shred(slot, u64::from(index)) + .unwrap() + .unwrap_or_else(|| { + panic!("{} {} {:?} must be present!", slot, index, shred_type) + }), + ) + } + } + + /// Returns true if there is no merkle root conflict between + /// the existing `merkle_root_meta` and `shred` + /// + /// Otherwise return false and if not already present, add duplicate proof to + /// blockstore and `duplicate_shreds`. + fn perform_merkle_check( + &self, + just_inserted_shreds: &HashMap, + slot: Slot, + merkle_root_meta: &MerkleRootMeta, + shred: &Shred, + duplicate_shreds: &mut Vec, + ) -> bool { + if merkle_root_meta.merkle_root() == Hash::default() || shred.merkle_root().is_none() { + // Legacy Shreds + return true; + } + + if merkle_root_meta.merkle_root() == shred.merkle_root().unwrap_or_default() { + // No conflict + return true; + } + + warn!( + "Received conflicting merkle roots for slot: {}, erasure_set: {:?} + original merkle root {:?} shred index {} type {:?} vs + conflicting merkle root {:?} shred index {} type {:?}. Reporting as duplicate", + slot, + shred.erasure_set(), + merkle_root_meta.merkle_root(), + merkle_root_meta.first_received_shred_index(), + merkle_root_meta.first_received_shred_type(), + shred.merkle_root(), + shred.index(), + shred.shred_type(), + ); + + if !self.has_duplicate_shreds_in_slot(slot) { + let conflicting_shred = self + .get_shred_from_just_inserted_or_db( + just_inserted_shreds, + slot, + merkle_root_meta.first_received_shred_index(), + merkle_root_meta.first_received_shred_type(), + ) + .into_owned(); + if self + .store_duplicate_slot(slot, conflicting_shred.clone(), shred.payload().clone()) + .is_err() + { + warn!("store duplicate error"); + } + duplicate_shreds.push(PossibleDuplicateShred::MerkleRootConflict( + shred.clone(), + conflicting_shred, + )); } + false } fn should_insert_data_shred( @@ -1490,10 +1628,11 @@ impl Blockstore { if !self.has_duplicate_shreds_in_slot(slot) { let ending_shred: Vec = self - .get_data_shred_from_just_inserted_or_db( + .get_shred_from_just_inserted_or_db( just_inserted_shreds, slot, - last_index.unwrap(), + u32::try_from(last_index.unwrap()).unwrap(), + ShredType::Data, ) .into_owned(); @@ -1529,10 +1668,11 @@ impl Blockstore { if !self.has_duplicate_shreds_in_slot(slot) { let ending_shred: Vec = self - .get_data_shred_from_just_inserted_or_db( + .get_shred_from_just_inserted_or_db( just_inserted_shreds, slot, - slot_meta.received - 1, + u32::try_from(slot_meta.received - 1).unwrap(), + ShredType::Data, ) .into_owned(); @@ -6723,6 +6863,388 @@ pub mod tests { ),); } + #[test] + fn test_merkle_root_metas_coding() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let parent_slot = 0; + let slot = 1; + let index = 0; + let (_, coding_shreds, _) = setup_erasure_shreds(slot, parent_slot, 10); + let coding_shred = coding_shreds[index as usize].clone(); + + let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); + let mut index_working_set = HashMap::new(); + let mut just_received_shreds = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + let mut index_meta_time_us = 0; + assert!(blockstore.check_insert_coding_shred( + coding_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + &mut vec![], + false, + ShredSource::Turbine, + &mut BlockstoreInsertionMetrics::default(), + )); + + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_type(), + ShredType::Code, + ); + + for (erasure_set, merkle_root_meta) in merkle_root_metas { + write_batch + .put::(erasure_set.key(), &merkle_root_meta) + .unwrap(); + } + blockstore.db.write(write_batch).unwrap(); + + // Add a shred with different merkle root and index + let (_, coding_shreds, _) = setup_erasure_shreds(slot, parent_slot, 10); + let new_coding_shred = coding_shreds[(index + 1) as usize].clone(); + + erasure_metas.clear(); + index_working_set.clear(); + just_received_shreds.clear(); + let mut merkle_root_metas = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + let mut duplicates = vec![]; + + assert!(!blockstore.check_insert_coding_shred( + new_coding_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + &mut duplicates, + false, + ShredSource::Turbine, + &mut BlockstoreInsertionMetrics::default(), + )); + + // No insert, notify duplicate + assert_eq!(duplicates.len(), 1); + assert_matches!( + duplicates[0], + PossibleDuplicateShred::MerkleRootConflict(_, _) + ); + + // Verify that we still have the merkle root meta from the original shred + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + + // Blockstore should also have the merkle root meta of the original shred + assert_eq!( + blockstore + .merkle_root_meta(coding_shred.erasure_set()) + .unwrap() + .unwrap() + .merkle_root(), + coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + blockstore + .merkle_root_meta(coding_shred.erasure_set()) + .unwrap() + .unwrap() + .first_received_shred_index(), + index + ); + + // Add a shred from different fec set + let new_index = index + 31; + let (_, coding_shreds, _) = + setup_erasure_shreds_with_index(slot, parent_slot, 10, new_index); + let new_coding_shred = coding_shreds[0].clone(); + + assert!(blockstore.check_insert_coding_shred( + new_coding_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + &mut vec![], + false, + ShredSource::Turbine, + &mut BlockstoreInsertionMetrics::default(), + )); + + // Verify that we still have the merkle root meta for the original shred + // and the new shred + assert_eq!(merkle_root_metas.len(), 2); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + assert_eq!( + merkle_root_metas + .get(&new_coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + new_coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&new_coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + new_index + ); + } + + #[test] + fn test_merkle_root_metas_data() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let parent_slot = 0; + let slot = 1; + let index = 11; + let fec_set_index = 11; + let (data_shreds, _, _) = + setup_erasure_shreds_with_index(slot, parent_slot, 10, fec_set_index); + let data_shred = data_shreds[0].clone(); + + let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); + let mut index_working_set = HashMap::new(); + let mut just_received_shreds = HashMap::new(); + let mut slot_meta_working_set = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + let mut index_meta_time_us = 0; + blockstore + .check_insert_data_shred( + data_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + false, + &mut vec![], + None, + ShredSource::Turbine, + ) + .unwrap(); + + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .merkle_root(), + data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_type(), + ShredType::Data, + ); + + for (erasure_set, merkle_root_meta) in merkle_root_metas { + write_batch + .put::(erasure_set.key(), &merkle_root_meta) + .unwrap(); + } + blockstore.db.write(write_batch).unwrap(); + + // Add a shred with different merkle root and index + let (data_shreds, _, _) = + setup_erasure_shreds_with_index(slot, parent_slot, 10, fec_set_index); + let new_data_shred = data_shreds[1].clone(); + + erasure_metas.clear(); + index_working_set.clear(); + just_received_shreds.clear(); + let mut merkle_root_metas = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + let mut duplicates = vec![]; + + assert!(blockstore + .check_insert_data_shred( + new_data_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + false, + &mut duplicates, + None, + ShredSource::Turbine, + ) + .is_err()); + + // No insert, notify duplicate + assert_eq!(duplicates.len(), 1); + assert_matches!( + duplicates[0], + PossibleDuplicateShred::MerkleRootConflict(_, _) + ); + + // Verify that we still have the merkle root meta from the original shred + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .merkle_root(), + data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + + // Blockstore should also have the merkle root meta of the original shred + assert_eq!( + blockstore + .merkle_root_meta(data_shred.erasure_set()) + .unwrap() + .unwrap() + .merkle_root(), + data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + blockstore + .merkle_root_meta(data_shred.erasure_set()) + .unwrap() + .unwrap() + .first_received_shred_index(), + index + ); + + // Add a shred from different fec set + let new_index = fec_set_index + 31; + let new_data_shred = Shred::new_from_data( + slot, + new_index, + 1, // parent_offset + &[3, 3, 3], // data + ShredFlags::empty(), + 0, // reference_tick, + 0, // version + fec_set_index + 30, + ); + + blockstore + .check_insert_data_shred( + new_data_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + false, + &mut vec![], + None, + ShredSource::Turbine, + ) + .unwrap(); + + // Verify that we still have the merkle root meta for the original shred + // and the new shred + assert_eq!(merkle_root_metas.len(), 2); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .merkle_root(), + data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + assert_eq!( + merkle_root_metas + .get(&new_data_shred.erasure_set()) + .unwrap() + .merkle_root(), + new_data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&new_data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + new_index + ); + } + #[test] fn test_check_insert_coding_shred() { let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -6741,6 +7263,7 @@ pub mod tests { ); let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); let mut index_working_set = HashMap::new(); let mut just_received_shreds = HashMap::new(); let mut write_batch = blockstore.db.batch().unwrap(); @@ -6748,6 +7271,7 @@ pub mod tests { assert!(blockstore.check_insert_coding_shred( coding_shred.clone(), &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_received_shreds, @@ -6763,6 +7287,7 @@ pub mod tests { assert!(!blockstore.check_insert_coding_shred( coding_shred.clone(), &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_received_shreds, @@ -9250,6 +9775,15 @@ pub mod tests { slot: u64, parent_slot: u64, num_entries: u64, + ) -> (Vec, Vec, Arc) { + setup_erasure_shreds_with_index(slot, parent_slot, num_entries, 0) + } + + fn setup_erasure_shreds_with_index( + slot: u64, + parent_slot: u64, + num_entries: u64, + fec_set_index: u32, ) -> (Vec, Vec, Arc) { let entries = make_slot_entries_with_transactions(num_entries); let leader_keypair = Arc::new(Keypair::new()); @@ -9257,10 +9791,10 @@ pub mod tests { let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &leader_keypair, &entries, - true, // is_last_in_slot - 0, // next_shred_index - 0, // next_code_index - true, // merkle_variant + true, // is_last_in_slot + fec_set_index, // next_shred_index + fec_set_index, // next_code_index + true, // merkle_variant &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 41a16c9ae3fee3..38e1145fbfa5e7 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -406,6 +406,28 @@ impl ErasureMeta { } } +impl MerkleRootMeta { + pub(crate) fn from_shred(shred: &Shred) -> Self { + Self { + merkle_root: shred.merkle_root().unwrap_or_default(), + first_received_shred_index: shred.index(), + first_received_shred_type: shred.shred_type(), + } + } + + pub(crate) fn merkle_root(&self) -> Hash { + self.merkle_root + } + + pub(crate) fn first_received_shred_index(&self) -> u32 { + self.first_received_shred_index + } + + pub(crate) fn first_received_shred_type(&self) -> ShredType { + self.first_received_shred_type + } +} + impl DuplicateSlotProof { pub(crate) fn new(shred1: Vec, shred2: Vec) -> Self { DuplicateSlotProof { shred1, shred2 } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 5fda160e29b976..baddf17dbf0266 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -287,6 +287,10 @@ impl ErasureSetId { pub(crate) fn store_key(&self) -> (Slot, /*fec_set_index:*/ u64) { (self.0, u64::from(self.1)) } + + pub(crate) fn key(&self) -> (Slot, /*fec_set_index:*/ u32) { + (self.0, self.1) + } } macro_rules! dispatch { @@ -337,6 +341,8 @@ impl Shred { dispatch!(pub fn payload(&self) -> &Vec); dispatch!(pub fn sanitize(&self) -> Result<(), Error>); + dispatch!(pub fn merkle_root(&self) -> Option); + // Only for tests. dispatch!(pub fn set_index(&mut self, index: u32)); dispatch!(pub fn set_slot(&mut self, slot: Slot)); diff --git a/ledger/src/shred/merkle.rs b/ledger/src/shred/merkle.rs index 4f1cd22111e07f..d0d507ae48aba1 100644 --- a/ledger/src/shred/merkle.rs +++ b/ledger/src/shred/merkle.rs @@ -154,7 +154,7 @@ impl ShredData { Ok(Self::SIZE_OF_HEADERS + Self::capacity(proof_size)?) } - fn merkle_root(&self) -> Result { + pub(super) fn merkle_root(&self) -> Result { let proof_size = self.proof_size()?; let index = self.erasure_shard_index()?; let proof_offset = Self::proof_offset(proof_size)?; @@ -266,7 +266,7 @@ impl ShredCode { Ok(Self::SIZE_OF_HEADERS + Self::capacity(proof_size)?) } - fn merkle_root(&self) -> Result { + pub(super) fn merkle_root(&self) -> Result { let proof_size = self.proof_size()?; let index = self.erasure_shard_index()?; let proof_offset = Self::proof_offset(proof_size)?; diff --git a/ledger/src/shred/shred_code.rs b/ledger/src/shred/shred_code.rs index ba85d92af25187..eecb4da6a4a365 100644 --- a/ledger/src/shred/shred_code.rs +++ b/ledger/src/shred/shred_code.rs @@ -6,7 +6,7 @@ use { CodingShredHeader, Error, ShredCommonHeader, ShredType, SignedData, DATA_SHREDS_PER_FEC_BLOCK, MAX_DATA_SHREDS_PER_SLOT, SIZE_OF_NONCE, }, - solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, signature::Signature}, + solana_sdk::{clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, signature::Signature}, static_assertions::const_assert_eq, }; @@ -47,6 +47,13 @@ impl ShredCode { } } + pub(super) fn merkle_root(&self) -> Option { + match self { + Self::Legacy(_) => None, + Self::Merkle(shred) => shred.merkle_root().ok(), + } + } + pub(super) fn new_from_parity_shard( slot: Slot, index: u32, diff --git a/ledger/src/shred/shred_data.rs b/ledger/src/shred/shred_data.rs index 9bf2c0bf05f79e..ff3f78dda2cc10 100644 --- a/ledger/src/shred/shred_data.rs +++ b/ledger/src/shred/shred_data.rs @@ -7,7 +7,7 @@ use { DataShredHeader, Error, ShredCommonHeader, ShredFlags, ShredType, ShredVariant, SignedData, MAX_DATA_SHREDS_PER_SLOT, }, - solana_sdk::{clock::Slot, signature::Signature}, + solana_sdk::{clock::Slot, hash::Hash, signature::Signature}, }; #[derive(Clone, Debug, Eq, PartialEq)] @@ -41,6 +41,13 @@ impl ShredData { } } + pub(super) fn merkle_root(&self) -> Option { + match self { + Self::Legacy(_) => None, + Self::Merkle(shred) => shred.merkle_root().ok(), + } + } + pub(super) fn new_from_data( slot: Slot, index: u32,