Skip to content

Commit

Permalink
skips (redundant) signature verification for recovered Merkle shreds (s…
Browse files Browse the repository at this point in the history
…olana-labs#4282)

With Merkle shreds, leader signs the Merkle root of the erasure batch
and all shreds within the same erasure batch have the same signature.
For recovered shreds, the (unique) signature is copied from shreds which
were received from turbine (or repair) and are already sig-verified.
The same signature also verifies for recovered shreds because when
reconstructing the Merkle tree for the erasure batch, we will obtain the
same Merkle root.
  • Loading branch information
behzadnouri authored Jan 5, 2025
1 parent f0d9547 commit 40be9b6
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 40 deletions.
17 changes: 10 additions & 7 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,7 @@ impl Blockstore {
erasure_meta: &ErasureMeta,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
recovered_shreds: &mut Vec<Shred>,
leader_schedule_cache: &LeaderScheduleCache,
reed_solomon_cache: &ReedSolomonCache,
) {
// Find shreds for this erasure set and try recovery
Expand All @@ -833,7 +834,12 @@ impl Blockstore {
.get_recovery_data_shreds(index, slot, erasure_meta, prev_inserted_shreds)
.chain(self.get_recovery_coding_shreds(index, slot, erasure_meta, prev_inserted_shreds))
.collect();
if let Ok(mut result) = shred::recover(available_shreds, reed_solomon_cache) {
let get_slot_leader = |slot: Slot| -> Option<Pubkey> {
leader_schedule_cache.slot_leader_at(slot, /*bank:*/ None)
};
if let Ok(mut result) =
shred::recover(available_shreds, reed_solomon_cache, get_slot_leader)
{
Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len());
recovered_shreds.append(&mut result);
} else {
Expand Down Expand Up @@ -966,6 +972,7 @@ impl Blockstore {
erasure_metas: &BTreeMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
leader_schedule_cache: &LeaderScheduleCache,
reed_solomon_cache: &ReedSolomonCache,
) -> Vec<Shred> {
let mut recovered_shreds = vec![];
Expand All @@ -986,6 +993,7 @@ impl Blockstore {
erasure_meta,
prev_inserted_shreds,
&mut recovered_shreds,
leader_schedule_cache,
reed_solomon_cache,
);
}
Expand Down Expand Up @@ -1026,6 +1034,7 @@ impl Blockstore {
&shred_insertion_tracker.erasure_metas,
&mut shred_insertion_tracker.index_working_set,
&shred_insertion_tracker.just_inserted_shreds,
leader_schedule_cache,
reed_solomon_cache,
);

Expand All @@ -1036,12 +1045,6 @@ impl Blockstore {
let recovered_shreds: Vec<_> = recovered_shreds
.into_iter()
.filter_map(|shred| {
let leader =
leader_schedule_cache.slot_leader_at(shred.slot(), /*bank=*/ None)?;
if !shred.verify(&leader) {
metrics.num_recovered_failed_sig += 1;
return None;
}
// Since the data shreds are fully recovered from the
// erasure batch, no need to store coding shreds in
// blockstore.
Expand Down
6 changes: 0 additions & 6 deletions ledger/src/blockstore_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub struct BlockstoreInsertionMetrics {
pub num_recovered: usize,
pub num_recovered_blockstore_error: usize,
pub num_recovered_inserted: usize,
pub num_recovered_failed_sig: usize,
pub num_recovered_failed_invalid: usize,
pub num_recovered_exists: usize,
pub num_repaired_data_shreds_exists: usize,
Expand Down Expand Up @@ -83,11 +82,6 @@ impl BlockstoreInsertionMetrics {
self.num_recovered_inserted as i64,
i64
),
(
"num_recovered_failed_sig",
self.num_recovered_failed_sig as i64,
i64
),
(
"num_recovered_failed_invalid",
self.num_recovered_failed_invalid as i64,
Expand Down
26 changes: 24 additions & 2 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,7 @@ impl TryFrom<u8> for ShredVariant {
pub(crate) fn recover(
shreds: Vec<Shred>,
reed_solomon_cache: &ReedSolomonCache,
get_slot_leader: impl Fn(Slot) -> Option<Pubkey>,
) -> Result<Vec<Shred>, Error> {
match shreds
.first()
Expand All @@ -1114,16 +1115,37 @@ pub(crate) fn recover(
.shred_variant
{
ShredVariant::LegacyData | ShredVariant::LegacyCode => {
Shredder::try_recovery(shreds, reed_solomon_cache)
let mut shreds = Shredder::try_recovery(shreds, reed_solomon_cache)?;
shreds.retain(|shred| {
get_slot_leader(shred.slot())
.map(|pubkey| shred.verify(&pubkey))
.unwrap_or_default()
});
Ok(shreds)
}
ShredVariant::MerkleCode { .. } | ShredVariant::MerkleData { .. } => {
let shreds = shreds
.into_iter()
.map(merkle::Shred::try_from)
.collect::<Result<_, _>>()?;
// With Merkle shreds, leader signs the Merkle root of the erasure
// batch and all shreds within the same erasure batch have the same
// signature.
// For recovered shreds, the (unique) signature is copied from
// shreds which were received from turbine (or repair) and are
// already sig-verified.
// The same signature also verifies for recovered shreds because
// when reconstructing the Merkle tree for the erasure batch, we
// will obtain the same Merkle root.
Ok(merkle::recover(shreds, reed_solomon_cache)?
.into_iter()
.map(Shred::from)
.map(|shred| {
let shred = Shred::from(shred);
debug_assert!(get_slot_leader(shred.slot())
.map(|pubkey| shred.verify(&pubkey))
.unwrap_or_default());
shred
})
.collect())
}
}
Expand Down
56 changes: 31 additions & 25 deletions ledger/src/shred/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,31 +787,34 @@ pub(super) fn recover(
// Incoming shreds are resigned immediately after signature verification,
// so we can just grab the retransmitter signature from one of the
// available shreds and attach it to the recovered shreds.
let (common_header, coding_header, chained_merkle_root, retransmitter_signature) = shreds
.iter()
.find_map(|shred| {
let Shred::ShredCode(shred) = shred else {
return None;
};
let chained_merkle_root = shred.chained_merkle_root().ok();
let retransmitter_signature = shred.retransmitter_signature().ok();
let position = u32::from(shred.coding_header.position);
let common_header = ShredCommonHeader {
index: shred.common_header.index.checked_sub(position)?,
..shred.common_header
};
let coding_header = CodingShredHeader {
position: 0u16,
..shred.coding_header
};
Some((
common_header,
coding_header,
chained_merkle_root,
retransmitter_signature,
))
})
.ok_or(TooFewParityShards)?;
let (common_header, coding_header, merkle_root, chained_merkle_root, retransmitter_signature) =
shreds
.iter()
.find_map(|shred| {
let Shred::ShredCode(shred) = shred else {
return None;
};
let merkle_root = shred.merkle_root().ok()?;
let chained_merkle_root = shred.chained_merkle_root().ok();
let retransmitter_signature = shred.retransmitter_signature().ok();
let position = u32::from(shred.coding_header.position);
let common_header = ShredCommonHeader {
index: shred.common_header.index.checked_sub(position)?,
..shred.common_header
};
let coding_header = CodingShredHeader {
position: 0u16,
..shred.coding_header
};
Some((
common_header,
coding_header,
merkle_root,
chained_merkle_root,
retransmitter_signature,
))
})
.ok_or(TooFewParityShards)?;
debug_assert_matches!(common_header.shred_variant, ShredVariant::MerkleCode { .. });
let (proof_size, chained, resigned) = match common_header.shred_variant {
ShredVariant::MerkleCode {
Expand Down Expand Up @@ -953,6 +956,9 @@ pub(super) fn recover(
.map(Shred::merkle_node)
.collect::<Result<_, _>>()?;
let tree = make_merkle_tree(nodes);
if tree.last() != Some(&merkle_root) {
return Err(Error::InvalidMerkleRoot);
}
for (index, (shred, mask)) in shreds.iter_mut().zip(&mask).enumerate() {
let proof = make_merkle_proof(index, num_shards, &tree).ok_or(Error::InvalidMerkleProof)?;
if proof.len() != usize::from(proof_size) {
Expand Down

0 comments on commit 40be9b6

Please sign in to comment.