Skip to content

Commit

Permalink
blockstore: populate duplicate shred proofs for merkle root conflicts (
Browse files Browse the repository at this point in the history
…#34270)

* blockstore: populate duplicate shred proofs for merkle root conflicts

* pr feedback: check test case

* pr feedback: comment

* pr feedback: match statement, shred_id, comment

* add feature flag

* pr feedback: rename ff var and perform_merkle_check

* pr feedback: move panic to callers in get_shred_from_just_inserted_or_db

* avoid unecessary write if proof is already present
  • Loading branch information
AshwinSekar authored Jan 3, 2024
1 parent 8148cb5 commit 1908841
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 22 deletions.
23 changes: 23 additions & 0 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ fn run_check_duplicate(
shred_slot,
&root_bank,
);
let merkle_conflict_duplicate_proofs = cluster_nodes::check_feature_activation(
&feature_set::merkle_conflict_duplicate_proofs::id(),
shred_slot,
&root_bank,
);
let (shred1, shred2) = match shred {
PossibleDuplicateShred::LastIndexConflict(shred, conflict)
| PossibleDuplicateShred::ErasureConflict(shred, conflict) => {
Expand All @@ -173,6 +178,24 @@ fn run_check_duplicate(
return Ok(());
}
}
PossibleDuplicateShred::MerkleRootConflict(shred, conflict) => {
if merkle_conflict_duplicate_proofs {
// Although this proof can be immediately stored on detection, we wait until
// here in order to check the feature flag, as storage in blockstore can
// preclude the detection of other duplicate proofs in this slot
if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
return Ok(());
}
blockstore.store_duplicate_slot(
shred_slot,
conflict.clone(),
shred.clone().into_payload(),
)?;
(shred, conflict)
} else {
return Ok(());
}
}
PossibleDuplicateShred::Exists(shred) => {
// Unlike the other cases we have to wait until here to decide to handle the duplicate and store
// in blockstore. This is because the duplicate could have been part of the same insert batch,
Expand Down
155 changes: 134 additions & 21 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ pub enum PossibleDuplicateShred {
Exists(Shred), // Blockstore has another shred in its spot
LastIndexConflict(/* original */ Shred, /* conflict */ Vec<u8>), // The index of this shred conflicts with `slot_meta.last_index`
ErasureConflict(/* original */ Shred, /* conflict */ Vec<u8>), // The coding shred has a conflict in the erasure_meta
MerkleRootConflict(/* original */ Shred, /* conflict */ Vec<u8>), // Merkle root conflict in the same fec set
}

impl PossibleDuplicateShred {
Expand All @@ -152,6 +153,7 @@ impl PossibleDuplicateShred {
Self::Exists(shred) => shred.slot(),
Self::LastIndexConflict(shred, _) => shred.slot(),
Self::ErasureConflict(shred, _) => shred.slot(),
Self::MerkleRootConflict(shred, _) => shred.slot(),
}
}
}
Expand Down Expand Up @@ -1257,6 +1259,21 @@ impl Blockstore {
metrics.num_coding_shreds_invalid += 1;
return false;
}

if let Some(merkle_root_meta) = merkle_root_metas.get(&erasure_set) {
// A previous shred has been inserted in this batch or in blockstore
// Compare our current shred against the previous shred for potential
// conflicts
if !self.check_merkle_root_consistency(
just_received_shreds,
slot,
merkle_root_meta.as_ref(),
&shred,
duplicate_shreds,
) {
return false;
}
}
}

let erasure_meta_entry = erasure_metas.entry(erasure_set).or_insert_with(|| {
Expand Down Expand Up @@ -1476,6 +1493,21 @@ impl Blockstore {
) {
return Err(InsertDataShredError::InvalidShred);
}

if let Some(merkle_root_meta) = merkle_root_metas.get(&erasure_set) {
// A previous shred has been inserted in this batch or in blockstore
// Compare our current shred against the previous shred for potential
// conflicts
if !self.check_merkle_root_consistency(
just_inserted_shreds,
slot,
merkle_root_meta.as_ref(),
&shred,
duplicate_shreds,
) {
return Err(InsertDataShredError::InvalidShred);
}
}
}

let newly_completed_data_sets = self.insert_data_shred(
Expand Down Expand Up @@ -1532,22 +1564,81 @@ impl Blockstore {
shred_index < slot_meta.consumed || data_index.contains(shred_index)
}

fn get_data_shred_from_just_inserted_or_db<'a>(
/// Finds the corresponding shred at `shred_id` in the just inserted
/// shreds or the backing store. Returns None if there is no shred.
fn get_shred_from_just_inserted_or_db<'a>(
&'a self,
just_inserted_shreds: &'a HashMap<ShredId, Shred>,
slot: Slot,
index: u64,
) -> Cow<'a, Vec<u8>> {
let key = ShredId::new(slot, u32::try_from(index).unwrap(), ShredType::Data);
if let Some(shred) = just_inserted_shreds.get(&key) {
Cow::Borrowed(shred.payload())
} else {
shred_id: ShredId,
) -> Option<Cow<'a, Vec<u8>>> {
let (slot, index, shred_type) = shred_id.unpack();
match (just_inserted_shreds.get(&shred_id), shred_type) {
(Some(shred), _) => Some(Cow::Borrowed(shred.payload())),
// If it doesn't exist in the just inserted set, it must exist in
// the backing store
Cow::Owned(self.get_data_shred(slot, index).unwrap().unwrap())
(_, ShredType::Data) => self
.get_data_shred(slot, u64::from(index))
.unwrap()
.map(Cow::Owned),
(_, ShredType::Code) => self
.get_coding_shred(slot, u64::from(index))
.unwrap()
.map(Cow::Owned),
}
}

/// Returns true if there is no merkle root conflict between
/// the existing `merkle_root_meta` and `shred`
///
/// Otherwise return false and if not already present, add duplicate proof to
/// `duplicate_shreds`.
fn check_merkle_root_consistency(
&self,
just_inserted_shreds: &HashMap<ShredId, Shred>,
slot: Slot,
merkle_root_meta: &MerkleRootMeta,
shred: &Shred,
duplicate_shreds: &mut Vec<PossibleDuplicateShred>,
) -> bool {
let new_merkle_root = shred.merkle_root().ok();
if merkle_root_meta.merkle_root() == new_merkle_root {
// No conflict, either both merkle shreds with same merkle root
// or both legacy shreds with merkle_root `None`
return true;
}

warn!(
"Received conflicting merkle roots for slot: {}, erasure_set: {:?}
original merkle root meta {:?} vs
conflicting merkle root {:?} shred index {} type {:?}. Reporting as duplicate",
slot,
shred.erasure_set(),
merkle_root_meta,
new_merkle_root,
shred.index(),
shred.shred_type(),
);

if !self.has_duplicate_shreds_in_slot(slot) {
let shred_id = ShredId::new(
slot,
merkle_root_meta.first_received_shred_index(),
merkle_root_meta.first_received_shred_type(),
);
let conflicting_shred = self
.get_shred_from_just_inserted_or_db(just_inserted_shreds, shred_id)
.unwrap_or_else(|| {
panic!("First received shred indicated by merkle root meta {:?} is missing from blockstore. This inconsistency may cause duplicate block detection to fail", merkle_root_meta);
})
.into_owned();
duplicate_shreds.push(PossibleDuplicateShred::MerkleRootConflict(
shred.clone(),
conflicting_shred,
));
}
false
}

fn should_insert_data_shred(
&self,
shred: &Shred,
Expand Down Expand Up @@ -1575,12 +1666,16 @@ impl Blockstore {
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));

if !self.has_duplicate_shreds_in_slot(slot) {
let shred_id = ShredId::new(
slot,
u32::try_from(last_index.unwrap()).unwrap(),
ShredType::Data,
);
let ending_shred: Vec<u8> = self
.get_data_shred_from_just_inserted_or_db(
just_inserted_shreds,
slot,
last_index.unwrap(),
)
.get_shred_from_just_inserted_or_db(just_inserted_shreds, shred_id)
.unwrap_or_else(|| {
panic!("Last index data shred indicated by slot meta {:?} is missing from blockstore. This inconsistency may cause duplicate block detection to fail", slot_meta)
})
.into_owned();

if self
Expand Down Expand Up @@ -1614,12 +1709,16 @@ impl Blockstore {
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));

if !self.has_duplicate_shreds_in_slot(slot) {
let shred_id = ShredId::new(
slot,
u32::try_from(slot_meta.received - 1).unwrap(),
ShredType::Data,
);
let ending_shred: Vec<u8> = self
.get_data_shred_from_just_inserted_or_db(
just_inserted_shreds,
slot,
slot_meta.received - 1,
)
.get_shred_from_just_inserted_or_db(just_inserted_shreds, shred_id)
.unwrap_or_else(|| {
panic!("Last received data shred indicated by slot meta {:?} is missing from blockstore. This inconsistency may cause duplicate block detection to fail", slot_meta)
})
.into_owned();

if self
Expand Down Expand Up @@ -6887,7 +6986,7 @@ pub mod tests {
let mut write_batch = blockstore.db.batch().unwrap();
let mut duplicates = vec![];

assert!(blockstore.check_insert_coding_shred(
assert!(!blockstore.check_insert_coding_shred(
new_coding_shred.clone(),
&mut erasure_metas,
&mut merkle_root_metas,
Expand All @@ -6901,6 +7000,13 @@ pub mod tests {
&mut BlockstoreInsertionMetrics::default(),
));

// No insert, notify duplicate
assert_eq!(duplicates.len(), 1);
match &duplicates[0] {
PossibleDuplicateShred::MerkleRootConflict(shred, _) if shred.slot() == slot => (),
_ => panic!("No merkle root conflict"),
}

// Verify that we still have the merkle root meta from the original shred
assert_eq!(merkle_root_metas.len(), 1);
assert_eq!(
Expand Down Expand Up @@ -7095,7 +7201,14 @@ pub mod tests {
None,
ShredSource::Turbine,
)
.is_ok());
.is_err());

// No insert, notify duplicate
assert_eq!(duplicates.len(), 1);
assert_matches!(
duplicates[0],
PossibleDuplicateShred::MerkleRootConflict(_, _)
);

// Verify that we still have the merkle root meta from the original shred
assert_eq!(merkle_root_metas.len(), 1);
Expand Down
1 change: 0 additions & 1 deletion ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ impl ErasureMeta {
}
}

#[allow(dead_code)]
impl MerkleRootMeta {
pub(crate) fn from_shred(shred: &Shred) -> Self {
Self {
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/feature_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,10 @@ pub mod index_erasure_conflict_duplicate_proofs {
solana_sdk::declare_id!("dupPajaLy2SSn8ko42aZz4mHANDNrLe8Nw8VQgFecLa");
}

pub mod merkle_conflict_duplicate_proofs {
solana_sdk::declare_id!("mrkPjRg79B2oK2ZLgd7S3AfEJaX9B6gAF3H9aEykRUS");
}

lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
Expand Down Expand Up @@ -930,6 +934,7 @@ lazy_static! {
(allow_commission_decrease_at_any_time::id(), "Allow commission decrease at any time in epoch #33843"),
(consume_blockstore_duplicate_proofs::id(), "consume duplicate proofs from blockstore in consensus #34372"),
(index_erasure_conflict_duplicate_proofs::id(), "generate duplicate proofs for index and erasure conflicts #34360"),
(merkle_conflict_duplicate_proofs::id(), "generate duplicate proofs for merkle root conflicts #34270"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()
Expand Down

0 comments on commit 1908841

Please sign in to comment.