Skip to content

Commit

Permalink
recovers merkle shreds from erasure codes
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Aug 14, 2022
1 parent a97346a commit d18ba30
Show file tree
Hide file tree
Showing 4 changed files with 458 additions and 27 deletions.
29 changes: 18 additions & 11 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl Blockstore {
index: &mut Index,
erasure_meta: &ErasureMeta,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
recovered_data_shreds: &mut Vec<Shred>,
recovered_shreds: &mut Vec<Shred>,
data_cf: &LedgerColumn<cf::ShredData>,
code_cf: &LedgerColumn<cf::ShredCode>,
) {
Expand All @@ -647,9 +647,9 @@ impl Blockstore {
code_cf,
))
.collect();
if let Ok(mut result) = Shredder::try_recovery(available_shreds) {
if let Ok(mut result) = shred::recover(available_shreds) {
Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len());
recovered_data_shreds.append(&mut result);
recovered_shreds.append(&mut result);
} else {
Self::submit_metrics(slot, erasure_meta, true, "incomplete".into(), 0);
}
Expand Down Expand Up @@ -710,7 +710,7 @@ impl Blockstore {
) -> Vec<Shred> {
let data_cf = db.column::<cf::ShredData>();
let code_cf = db.column::<cf::ShredCode>();
let mut recovered_data_shreds = vec![];
let mut recovered_shreds = vec![];
// Recovery rules:
// 1. Only try recovery around indexes for which new data or coding shreds are received
// 2. For new data shreds, check if an erasure set exists. If not, don't try recovery
Expand All @@ -726,7 +726,7 @@ impl Blockstore {
index,
erasure_meta,
prev_inserted_shreds,
&mut recovered_data_shreds,
&mut recovered_shreds,
&data_cf,
&code_cf,
);
Expand All @@ -745,7 +745,7 @@ impl Blockstore {
}
};
}
recovered_data_shreds
recovered_shreds
}

/// The main helper function that performs the shred insertion logic
Expand Down Expand Up @@ -889,15 +889,18 @@ impl Blockstore {
metrics.insert_shreds_elapsed_us += start.as_us();
let mut start = Measure::start("Shred recovery");
if let Some(leader_schedule_cache) = leader_schedule {
let recovered_data_shreds = Self::try_shred_recovery(
let recovered_shreds = Self::try_shred_recovery(
db,
&erasure_metas,
&mut index_working_set,
&just_inserted_shreds,
);

metrics.num_recovered += recovered_data_shreds.len();
let recovered_data_shreds: Vec<_> = recovered_data_shreds
metrics.num_recovered += recovered_shreds
.iter()
.filter(|shred| shred.is_data())
.count();
let recovered_shreds: Vec<_> = recovered_shreds
.into_iter()
.filter_map(|shred| {
let leader =
Expand All @@ -906,6 +909,10 @@ impl Blockstore {
metrics.num_recovered_failed_sig += 1;
return None;
}
// TODO: consider inserting coding shreds into blockstore.
if shred.is_code() {
return Some(shred);
}
match self.check_insert_data_shred(
shred.clone(),
&mut erasure_metas,
Expand Down Expand Up @@ -942,10 +949,10 @@ impl Blockstore {
// Always collect recovered-shreds so that above insert code is
// executed even if retransmit-sender is None.
.collect();
if !recovered_data_shreds.is_empty() {
if !recovered_shreds.is_empty() {
if let Some(retransmit_sender) = retransmit_sender {
let _ = retransmit_sender.send(
recovered_data_shreds
recovered_shreds
.into_iter()
.map(Shred::into_payload)
.collect(),
Expand Down
55 changes: 52 additions & 3 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ use {
crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT},
bitflags::bitflags,
num_enum::{IntoPrimitive, TryFromPrimitive},
reed_solomon_erasure::Error::TooFewShardsPresent,
serde::{Deserialize, Serialize},
solana_entry::entry::{create_ticks, Entry},
solana_perf::packet::Packet,
solana_sdk::{
clock::Slot,
hash::{hashv, Hash},
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
signature::{Keypair, Signature, Signer, SIGNATURE_BYTES},
},
static_assertions::const_assert_eq,
std::fmt::Debug,
Expand All @@ -92,7 +93,7 @@ pub const SIZE_OF_NONCE: usize = 4;
const SIZE_OF_COMMON_SHRED_HEADER: usize = 83;
const SIZE_OF_DATA_SHRED_HEADERS: usize = 88;
const SIZE_OF_CODING_SHRED_HEADERS: usize = 89;
const SIZE_OF_SIGNATURE: usize = 64;
const SIZE_OF_SIGNATURE: usize = SIGNATURE_BYTES;
const SIZE_OF_SHRED_VARIANT: usize = 1;
const SIZE_OF_SHRED_SLOT: usize = 8;
const SIZE_OF_SHRED_INDEX: usize = 4;
Expand Down Expand Up @@ -144,6 +145,8 @@ pub enum Error {
InvalidPayloadSize(/*payload size:*/ usize),
#[error("Invalid proof size: {0}")]
InvalidProofSize(/*proof_size:*/ u8),
#[error("Invalid shard size: {0}")]
InvalidShardSize(/*shard_size:*/ usize),
#[error("Invalid shred flags: {0}")]
InvalidShredFlags(u8),
#[error("Invalid {0:?} shred index: {1}")]
Expand Down Expand Up @@ -211,7 +214,7 @@ struct DataShredHeader {
struct CodingShredHeader {
num_data_shreds: u16,
num_coding_shreds: u16,
position: u16,
position: u16, // [0..num_coding_shreds)
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -294,6 +297,8 @@ macro_rules! dispatch {
}
}

use dispatch;

impl Shred {
dispatch!(fn common_header(&self) -> &ShredCommonHeader);
dispatch!(fn set_signature(&mut self, signature: Signature));
Expand Down Expand Up @@ -494,6 +499,7 @@ impl Shred {
}
}

#[must_use]
pub fn verify(&self, pubkey: &Pubkey) -> bool {
let message = self.signed_message();
self.signature().verify(pubkey.as_ref(), message)
Expand Down Expand Up @@ -642,6 +648,28 @@ impl From<ShredData> for Shred {
}
}

impl From<merkle::Shred> for Shred {
fn from(shred: merkle::Shred) -> Self {
match shred {
merkle::Shred::ShredCode(shred) => Self::ShredCode(ShredCode::Merkle(shred)),
merkle::Shred::ShredData(shred) => Self::ShredData(ShredData::Merkle(shred)),
}
}
}

impl TryFrom<Shred> for merkle::Shred {
type Error = Error;

fn try_from(shred: Shred) -> Result<Self, Self::Error> {
match shred {
Shred::ShredCode(ShredCode::Legacy(_)) => Err(Error::InvalidShredVariant),
Shred::ShredCode(ShredCode::Merkle(shred)) => Ok(Self::ShredCode(shred)),
Shred::ShredData(ShredData::Legacy(_)) => Err(Error::InvalidShredVariant),
Shred::ShredData(ShredData::Merkle(shred)) => Ok(Self::ShredData(shred)),
}
}
}

impl From<ShredVariant> for ShredType {
#[inline]
fn from(shred_variant: ShredVariant) -> Self {
Expand Down Expand Up @@ -682,6 +710,27 @@ impl TryFrom<u8> for ShredVariant {
}
}

pub(crate) fn recover(shreds: Vec<Shred>) -> Result<Vec<Shred>, Error> {
match shreds
.first()
.ok_or(TooFewShardsPresent)?
.common_header()
.shred_variant
{
ShredVariant::LegacyData | ShredVariant::LegacyCode => Shredder::try_recovery(shreds),
ShredVariant::MerkleCode(_) | ShredVariant::MerkleData(_) => {
let shreds = shreds
.into_iter()
.map(merkle::Shred::try_from)
.collect::<Result<_, _>>()?;
Ok(merkle::recover(shreds)?
.into_iter()
.map(Shred::from)
.collect())
}
}
}

// Accepts shreds in the slot range [root + 1, max_slot].
#[must_use]
pub fn should_discard_shred(
Expand Down
Loading

0 comments on commit d18ba30

Please sign in to comment.