Skip to content

Commit

Permalink
blockstore: always send a coding shred for chained merkle root confli…
Browse files Browse the repository at this point in the history
…cts (#1353)

* blockstore: always send a coding shred for chained merkle root conflicts

* pr feedback: return ErasureMeta

* fix format string indentation
  • Loading branch information
AshwinSekar authored May 28, 2024
1 parent 802b74a commit ecb491c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 55 deletions.
109 changes: 54 additions & 55 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,11 @@ impl Blockstore {
///
/// Checks the map `erasure_metas`, if not present scans blockstore. Returns None
/// if the previous consecutive erasure set is not present in either.
fn previous_erasure_set(
&self,
fn previous_erasure_set<'a>(
&'a self,
erasure_set: ErasureSetId,
erasure_metas: &BTreeMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
) -> Result<Option<ErasureSetId>> {
erasure_metas: &'a BTreeMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
) -> Result<Option<(ErasureSetId, Cow<ErasureMeta>)>> {
let (slot, fec_set_index) = erasure_set.store_key();

// Check the previous entry from the in memory map to see if it is the consecutive
Expand All @@ -513,13 +513,15 @@ impl Blockstore {
Bound::Excluded(erasure_set),
))
.next_back();
let candidate_erasure_set = candidate_erasure_entry
let candidate_erasure_set_and_meta = candidate_erasure_entry
.filter(|(_, candidate_erasure_meta)| {
candidate_erasure_meta.as_ref().next_fec_set_index() == Some(fec_set_index)
})
.map(|(candidate_erasure_set, _)| *candidate_erasure_set);
if candidate_erasure_set.is_some() {
return Ok(candidate_erasure_set);
.map(|(erasure_set, erasure_meta)| {
(*erasure_set, Cow::Borrowed(erasure_meta.as_ref()))
});
if candidate_erasure_set_and_meta.is_some() {
return Ok(candidate_erasure_set_and_meta);
}

// Consecutive set was not found in memory, scan blockstore for a potential candidate
Expand Down Expand Up @@ -549,7 +551,10 @@ impl Blockstore {
return Err(BlockstoreError::InvalidErasureConfig);
};
if next_fec_set_index == fec_set_index {
return Ok(Some(candidate_erasure_set));
return Ok(Some((
candidate_erasure_set,
Cow::Owned(candidate_erasure_meta),
)));
}
Ok(None)
}
Expand Down Expand Up @@ -1168,7 +1173,6 @@ impl Blockstore {
shred,
&just_inserted_shreds,
&erasure_metas,
&merkle_root_metas,
&mut duplicate_shreds,
);
}
Expand Down Expand Up @@ -1887,7 +1891,6 @@ impl Blockstore {
shred: &Shred,
just_inserted_shreds: &HashMap<ShredId, Shred>,
erasure_metas: &BTreeMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
merkle_root_metas: &HashMap<ErasureSetId, WorkingEntry<MerkleRootMeta>>,
duplicate_shreds: &mut Vec<PossibleDuplicateShred>,
) -> bool {
let slot = shred.slot();
Expand All @@ -1905,7 +1908,7 @@ impl Blockstore {
// If a shred from the previous fec set has already been inserted, check the chaining.
// Since we cannot compute the previous fec set index, we check the in memory map, otherwise
// check the previous key from blockstore to see if it is consecutive with our current set.
let Some(prev_erasure_set) = self
let Some((prev_erasure_set, prev_erasure_meta)) = self
.previous_erasure_set(erasure_set, erasure_metas)
.expect("Expect database operations to succeed")
else {
Expand All @@ -1915,37 +1918,22 @@ impl Blockstore {
return true;
};

let Some(prev_merkle_root_meta) = merkle_root_metas
.get(&prev_erasure_set)
.map(WorkingEntry::as_ref)
.map(Cow::Borrowed)
.or_else(|| {
self.merkle_root_meta(prev_erasure_set)
.unwrap()
.map(Cow::Owned)
})
else {
warn!(
"The merkle root meta for the previous erasure set {prev_erasure_set:?} does not \
exist. This should only happen if you have recently upgraded from a version < \
v1.18.13. Skipping the backwards chained merkle root for {erasure_set:?}"
);
return true;
};
let prev_shred_id = ShredId::new(
slot,
prev_merkle_root_meta.first_received_shred_index(),
prev_merkle_root_meta.first_received_shred_type(),
prev_erasure_meta
.first_received_coding_shred_index()
.expect("First received coding index must fit in u32"),
ShredType::Code,
);
let Some(prev_shred) =
Self::get_shred_from_just_inserted_or_db(self, just_inserted_shreds, prev_shred_id)
.map(Cow::into_owned)
else {
error!(
"Shred {prev_shred_id:?} indicated by merkle root meta {prev_merkle_root_meta:?} \
is missing from blockstore. This should only happen in extreme cases where \
blockstore cleanup has caught up to the root. Skipping the backwards chained \
merkle root consistency check"
warn!(
"Shred {prev_shred_id:?} indicated by the erasure meta {prev_erasure_meta:?} \
is missing from blockstore. This can happen if you have recently upgraded \
from a version < v1.18.13, or if blockstore cleanup has caught up to the root. \
Skipping the backwards chained merkle root consistency check"
);
return true;
};
Expand All @@ -1955,12 +1943,10 @@ impl Blockstore {
if !self.check_chaining(merkle_root, chained_merkle_root) {
warn!(
"Received conflicting chained merkle roots for slot: {slot}, shred {:?} type {:?} \
chains to merkle root {chained_merkle_root:?}, however previous fec set shred \
{prev_erasure_set:?} type {:?} has merkle root {merkle_root:?}. Reporting as \
duplicate",
chains to merkle root {chained_merkle_root:?}, however previous fec set coding \
shred {prev_erasure_set:?} has merkle root {merkle_root:?}. Reporting as duplicate",
shred.erasure_set(),
shred.shred_type(),
prev_merkle_root_meta.first_received_shred_type(),
);

if !self.has_duplicate_shreds_in_slot(shred.slot()) {
Expand Down Expand Up @@ -11322,8 +11308,9 @@ pub mod tests {
assert_eq!(
blockstore
.previous_erasure_set(erasure_set, &erasure_metas)
.unwrap(),
Some(erasure_set_prev)
.unwrap()
.map(|(erasure_set, erasure_meta)| (erasure_set, erasure_meta.into_owned())),
Some((erasure_set_prev, erasure_meta_prev))
);

// Erasure meta only contains the older, blockstore has the previous fec set
Expand All @@ -11334,32 +11321,36 @@ pub mod tests {
assert_eq!(
blockstore
.previous_erasure_set(erasure_set, &erasure_metas)
.unwrap(),
Some(erasure_set_prev)
.unwrap()
.map(|(erasure_set, erasure_meta)| (erasure_set, erasure_meta.into_owned())),
Some((erasure_set_prev, erasure_meta_prev))
);

// Both contain the previous fec set
erasure_metas.insert(erasure_set_prev, WorkingEntry::Clean(erasure_meta_prev));
assert_eq!(
blockstore
.previous_erasure_set(erasure_set, &erasure_metas)
.unwrap(),
Some(erasure_set_prev)
.unwrap()
.map(|(erasure_set, erasure_meta)| (erasure_set, erasure_meta.into_owned())),
Some((erasure_set_prev, erasure_meta_prev))
);

// Works even if the previous fec set has index 0
assert_eq!(
blockstore
.previous_erasure_set(erasure_set_prev, &erasure_metas)
.unwrap(),
Some(erasure_set_0)
.unwrap()
.map(|(erasure_set, erasure_meta)| (erasure_set, erasure_meta.into_owned())),
Some((erasure_set_0, erasure_meta_0))
);
erasure_metas.remove(&erasure_set_0);
assert_eq!(
blockstore
.previous_erasure_set(erasure_set_prev, &erasure_metas)
.unwrap(),
Some(erasure_set_0)
.unwrap()
.map(|(erasure_set, erasure_meta)| (erasure_set, erasure_meta.into_owned())),
Some((erasure_set_0, erasure_meta_0))
);

// Does not cross slot boundary
Expand Down Expand Up @@ -11764,7 +11755,7 @@ pub mod tests {

#[test]
fn test_chained_merkle_root_upgrade_inconsistency_backwards() {
// Insert a coding shred (without a merkle meta) then inconsistent shreds from the next FEC set
// Insert a coding shred (with an old erasure meta and no merkle root meta) then inconsistent shreds from the next FEC set
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

Expand All @@ -11773,15 +11764,23 @@ pub mod tests {
let fec_set_index = 0;
let (data_shreds, coding_shreds, leader_schedule) =
setup_erasure_shreds_with_index(slot, parent_slot, 10, fec_set_index);
let coding_shred_previous = coding_shreds[0].clone();
let coding_shred_previous = coding_shreds[1].clone();
let next_fec_set_index = fec_set_index + data_shreds.len() as u32;

assert!(blockstore
.insert_shred_return_duplicate(coding_shred_previous.clone(), &leader_schedule,)
.is_empty());

// Remove the merkle root meta in order to simulate this blockstore originating from
// an older version.
// Set the first received coding shred index to 0 and remove merkle root meta to simulate this insertion coming from an
// older version.
let mut erasure_meta = blockstore
.erasure_meta(coding_shred_previous.erasure_set())
.unwrap()
.unwrap();
erasure_meta.clear_first_received_coding_shred_index();
blockstore
.put_erasure_meta(coding_shred_previous.erasure_set(), &erasure_meta)
.unwrap();
let mut write_batch = blockstore.db.batch().unwrap();
blockstore
.db
Expand All @@ -11794,7 +11793,7 @@ pub mod tests {
.is_none());

// Add an incorrectly chained merkle from the next set. Although incorrectly chained
// we skip the duplicate check as the merkle root meta is missing.
// we skip the duplicate check as the first received coding shred index shred is missing
let merkle_root = Hash::new_unique();
assert!(merkle_root != coding_shred_previous.merkle_root().unwrap());
let (data_shreds, coding_shreds, leader_schedule) =
Expand Down
5 changes: 5 additions & 0 deletions ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,11 @@ impl ErasureMeta {
StillNeed(num_needed)
}
}

#[cfg(test)]
pub(crate) fn clear_first_received_coding_shred_index(&mut self) {
self.first_received_coding_index = 0;
}
}

impl MerkleRootMeta {
Expand Down

0 comments on commit ecb491c

Please sign in to comment.