Skip to content

Commit

Permalink
recovers merkle shreds from erasure codes (#27136)
Browse files Browse the repository at this point in the history
The commit
* Identifies Merkle shreds when recovering from erasure codes and
  dispatches specialized code to reconstruct shreds.
* Coding shred headers are added to recovered erasure shards.
* Merkle tree is reconstructed for the erasure batch and added to
  recovered shreds.
* The common signature (for the root of Merkle tree) is attached to all
  recovered shreds.
  • Loading branch information
behzadnouri authored Aug 19, 2022
1 parent a54ea4d commit c0b6335
Show file tree
Hide file tree
Showing 6 changed files with 552 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ledger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ bs58 = "0.4.0"
matches = "0.1.9"
solana-account-decoder = { path = "../account-decoder", version = "=1.12.0" }
solana-logger = { path = "../logger", version = "=1.12.0" }
test-case = "2.1.0"

[build-dependencies]
rustc_version = "0.4"
Expand Down
31 changes: 20 additions & 11 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ impl Blockstore {
index: &mut Index,
erasure_meta: &ErasureMeta,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
recovered_data_shreds: &mut Vec<Shred>,
recovered_shreds: &mut Vec<Shred>,
data_cf: &LedgerColumn<cf::ShredData>,
code_cf: &LedgerColumn<cf::ShredCode>,
) {
Expand All @@ -646,9 +646,9 @@ impl Blockstore {
code_cf,
))
.collect();
if let Ok(mut result) = Shredder::try_recovery(available_shreds) {
if let Ok(mut result) = shred::recover(available_shreds) {
Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len());
recovered_data_shreds.append(&mut result);
recovered_shreds.append(&mut result);
} else {
Self::submit_metrics(slot, erasure_meta, true, "incomplete".into(), 0);
}
Expand Down Expand Up @@ -709,7 +709,7 @@ impl Blockstore {
) -> Vec<Shred> {
let data_cf = db.column::<cf::ShredData>();
let code_cf = db.column::<cf::ShredCode>();
let mut recovered_data_shreds = vec![];
let mut recovered_shreds = vec![];
// Recovery rules:
// 1. Only try recovery around indexes for which new data or coding shreds are received
// 2. For new data shreds, check if an erasure set exists. If not, don't try recovery
Expand All @@ -725,7 +725,7 @@ impl Blockstore {
index,
erasure_meta,
prev_inserted_shreds,
&mut recovered_data_shreds,
&mut recovered_shreds,
&data_cf,
&code_cf,
);
Expand All @@ -744,7 +744,7 @@ impl Blockstore {
}
};
}
recovered_data_shreds
recovered_shreds
}

/// The main helper function that performs the shred insertion logic
Expand Down Expand Up @@ -888,15 +888,18 @@ impl Blockstore {
metrics.insert_shreds_elapsed_us += start.as_us();
let mut start = Measure::start("Shred recovery");
if let Some(leader_schedule_cache) = leader_schedule {
let recovered_data_shreds = Self::try_shred_recovery(
let recovered_shreds = Self::try_shred_recovery(
db,
&erasure_metas,
&mut index_working_set,
&just_inserted_shreds,
);

metrics.num_recovered += recovered_data_shreds.len();
let recovered_data_shreds: Vec<_> = recovered_data_shreds
metrics.num_recovered += recovered_shreds
.iter()
.filter(|shred| shred.is_data())
.count();
let recovered_shreds: Vec<_> = recovered_shreds
.into_iter()
.filter_map(|shred| {
let leader =
Expand All @@ -905,6 +908,12 @@ impl Blockstore {
metrics.num_recovered_failed_sig += 1;
return None;
}
// Since the data shreds are fully recovered from the
// erasure batch, no need to store coding shreds in
// blockstore.
if shred.is_code() {
return Some(shred);
}
match self.check_insert_data_shred(
shred.clone(),
&mut erasure_metas,
Expand Down Expand Up @@ -941,10 +950,10 @@ impl Blockstore {
// Always collect recovered-shreds so that above insert code is
// executed even if retransmit-sender is None.
.collect();
if !recovered_data_shreds.is_empty() {
if !recovered_shreds.is_empty() {
if let Some(retransmit_sender) = retransmit_sender {
let _ = retransmit_sender.send(
recovered_data_shreds
recovered_shreds
.into_iter()
.map(Shred::into_payload)
.collect(),
Expand Down
53 changes: 52 additions & 1 deletion ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use {
crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT},
bitflags::bitflags,
num_enum::{IntoPrimitive, TryFromPrimitive},
reed_solomon_erasure::Error::TooFewShardsPresent,
serde::{Deserialize, Serialize},
solana_entry::entry::{create_ticks, Entry},
solana_perf::packet::Packet,
Expand Down Expand Up @@ -144,6 +145,10 @@ pub enum Error {
InvalidPayloadSize(/*payload size:*/ usize),
#[error("Invalid proof size: {0}")]
InvalidProofSize(/*proof_size:*/ u8),
#[error("Invalid recovered shred")]
InvalidRecoveredShred,
#[error("Invalid shard size: {0}")]
InvalidShardSize(/*shard_size:*/ usize),
#[error("Invalid shred flags: {0}")]
InvalidShredFlags(u8),
#[error("Invalid {0:?} shred index: {1}")]
Expand Down Expand Up @@ -211,7 +216,7 @@ struct DataShredHeader {
struct CodingShredHeader {
num_data_shreds: u16,
num_coding_shreds: u16,
position: u16,
position: u16, // [0..num_coding_shreds)
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -294,6 +299,8 @@ macro_rules! dispatch {
}
}

use dispatch;

impl Shred {
dispatch!(fn common_header(&self) -> &ShredCommonHeader);
dispatch!(fn set_signature(&mut self, signature: Signature));
Expand Down Expand Up @@ -494,6 +501,7 @@ impl Shred {
}
}

#[must_use]
pub fn verify(&self, pubkey: &Pubkey) -> bool {
let message = self.signed_message();
self.signature().verify(pubkey.as_ref(), message)
Expand Down Expand Up @@ -642,6 +650,28 @@ impl From<ShredData> for Shred {
}
}

impl From<merkle::Shred> for Shred {
fn from(shred: merkle::Shred) -> Self {
match shred {
merkle::Shred::ShredCode(shred) => Self::ShredCode(ShredCode::Merkle(shred)),
merkle::Shred::ShredData(shred) => Self::ShredData(ShredData::Merkle(shred)),
}
}
}

impl TryFrom<Shred> for merkle::Shred {
type Error = Error;

fn try_from(shred: Shred) -> Result<Self, Self::Error> {
match shred {
Shred::ShredCode(ShredCode::Legacy(_)) => Err(Error::InvalidShredVariant),
Shred::ShredCode(ShredCode::Merkle(shred)) => Ok(Self::ShredCode(shred)),
Shred::ShredData(ShredData::Legacy(_)) => Err(Error::InvalidShredVariant),
Shred::ShredData(ShredData::Merkle(shred)) => Ok(Self::ShredData(shred)),
}
}
}

impl From<ShredVariant> for ShredType {
#[inline]
fn from(shred_variant: ShredVariant) -> Self {
Expand Down Expand Up @@ -682,6 +712,27 @@ impl TryFrom<u8> for ShredVariant {
}
}

pub(crate) fn recover(shreds: Vec<Shred>) -> Result<Vec<Shred>, Error> {
match shreds
.first()
.ok_or(TooFewShardsPresent)?
.common_header()
.shred_variant
{
ShredVariant::LegacyData | ShredVariant::LegacyCode => Shredder::try_recovery(shreds),
ShredVariant::MerkleCode(_) | ShredVariant::MerkleData(_) => {
let shreds = shreds
.into_iter()
.map(merkle::Shred::try_from)
.collect::<Result<_, _>>()?;
Ok(merkle::recover(shreds)?
.into_iter()
.map(Shred::from)
.collect())
}
}
}

// Accepts shreds in the slot range [root + 1, max_slot].
#[must_use]
pub fn should_discard_shred(
Expand Down
Loading

0 comments on commit c0b6335

Please sign in to comment.