Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recovers merkle shreds from erasure codes #27136

Merged
merged 1 commit into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ledger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ bs58 = "0.4.0"
matches = "0.1.9"
solana-account-decoder = { path = "../account-decoder", version = "=1.12.0" }
solana-logger = { path = "../logger", version = "=1.12.0" }
test-case = "2.1.0"

[build-dependencies]
rustc_version = "0.4"
Expand Down
31 changes: 20 additions & 11 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl Blockstore {
index: &mut Index,
erasure_meta: &ErasureMeta,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
recovered_data_shreds: &mut Vec<Shred>,
recovered_shreds: &mut Vec<Shred>,
data_cf: &LedgerColumn<cf::ShredData>,
code_cf: &LedgerColumn<cf::ShredCode>,
) {
Expand All @@ -647,9 +647,9 @@ impl Blockstore {
code_cf,
))
.collect();
if let Ok(mut result) = Shredder::try_recovery(available_shreds) {
if let Ok(mut result) = shred::recover(available_shreds) {
Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len());
recovered_data_shreds.append(&mut result);
recovered_shreds.append(&mut result);
} else {
Self::submit_metrics(slot, erasure_meta, true, "incomplete".into(), 0);
}
Expand Down Expand Up @@ -710,7 +710,7 @@ impl Blockstore {
) -> Vec<Shred> {
let data_cf = db.column::<cf::ShredData>();
let code_cf = db.column::<cf::ShredCode>();
let mut recovered_data_shreds = vec![];
let mut recovered_shreds = vec![];
// Recovery rules:
// 1. Only try recovery around indexes for which new data or coding shreds are received
// 2. For new data shreds, check if an erasure set exists. If not, don't try recovery
Expand All @@ -726,7 +726,7 @@ impl Blockstore {
index,
erasure_meta,
prev_inserted_shreds,
&mut recovered_data_shreds,
&mut recovered_shreds,
&data_cf,
&code_cf,
);
Expand All @@ -745,7 +745,7 @@ impl Blockstore {
}
};
}
recovered_data_shreds
recovered_shreds
}

/// The main helper function that performs the shred insertion logic
Expand Down Expand Up @@ -889,15 +889,18 @@ impl Blockstore {
metrics.insert_shreds_elapsed_us += start.as_us();
let mut start = Measure::start("Shred recovery");
if let Some(leader_schedule_cache) = leader_schedule {
let recovered_data_shreds = Self::try_shred_recovery(
let recovered_shreds = Self::try_shred_recovery(
db,
&erasure_metas,
&mut index_working_set,
&just_inserted_shreds,
);

metrics.num_recovered += recovered_data_shreds.len();
let recovered_data_shreds: Vec<_> = recovered_data_shreds
metrics.num_recovered += recovered_shreds
.iter()
.filter(|shred| shred.is_data())
.count();
let recovered_shreds: Vec<_> = recovered_shreds
.into_iter()
.filter_map(|shred| {
let leader =
Expand All @@ -906,6 +909,12 @@ impl Blockstore {
metrics.num_recovered_failed_sig += 1;
return None;
}
// Since the data shreds are fully recovered from the
// erasure batch, no need to store coding shreds in
// blockstore.
if shred.is_code() {
return Some(shred);
}
match self.check_insert_data_shred(
shred.clone(),
&mut erasure_metas,
Expand Down Expand Up @@ -942,10 +951,10 @@ impl Blockstore {
// Always collect recovered-shreds so that above insert code is
// executed even if retransmit-sender is None.
.collect();
if !recovered_data_shreds.is_empty() {
if !recovered_shreds.is_empty() {
if let Some(retransmit_sender) = retransmit_sender {
let _ = retransmit_sender.send(
recovered_data_shreds
recovered_shreds
.into_iter()
.map(Shred::into_payload)
.collect(),
Expand Down
53 changes: 52 additions & 1 deletion ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use {
crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT},
bitflags::bitflags,
num_enum::{IntoPrimitive, TryFromPrimitive},
reed_solomon_erasure::Error::TooFewShardsPresent,
serde::{Deserialize, Serialize},
solana_entry::entry::{create_ticks, Entry},
solana_perf::packet::Packet,
Expand Down Expand Up @@ -144,6 +145,10 @@ pub enum Error {
InvalidPayloadSize(/*payload size:*/ usize),
#[error("Invalid proof size: {0}")]
InvalidProofSize(/*proof_size:*/ u8),
#[error("Invalid recovered shred")]
InvalidRecoveredShred,
#[error("Invalid shard size: {0}")]
InvalidShardSize(/*shard_size:*/ usize),
#[error("Invalid shred flags: {0}")]
InvalidShredFlags(u8),
#[error("Invalid {0:?} shred index: {1}")]
Expand Down Expand Up @@ -211,7 +216,7 @@ struct DataShredHeader {
struct CodingShredHeader {
num_data_shreds: u16,
num_coding_shreds: u16,
position: u16,
position: u16, // [0..num_coding_shreds)
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -294,6 +299,8 @@ macro_rules! dispatch {
}
}

use dispatch;

impl Shred {
dispatch!(fn common_header(&self) -> &ShredCommonHeader);
dispatch!(fn set_signature(&mut self, signature: Signature));
Expand Down Expand Up @@ -494,6 +501,7 @@ impl Shred {
}
}

#[must_use]
pub fn verify(&self, pubkey: &Pubkey) -> bool {
let message = self.signed_message();
self.signature().verify(pubkey.as_ref(), message)
Expand Down Expand Up @@ -642,6 +650,28 @@ impl From<ShredData> for Shred {
}
}

impl From<merkle::Shred> for Shred {
fn from(shred: merkle::Shred) -> Self {
match shred {
merkle::Shred::ShredCode(shred) => Self::ShredCode(ShredCode::Merkle(shred)),
merkle::Shred::ShredData(shred) => Self::ShredData(ShredData::Merkle(shred)),
}
}
}

impl TryFrom<Shred> for merkle::Shred {
type Error = Error;

fn try_from(shred: Shred) -> Result<Self, Self::Error> {
match shred {
Shred::ShredCode(ShredCode::Legacy(_)) => Err(Error::InvalidShredVariant),
Shred::ShredCode(ShredCode::Merkle(shred)) => Ok(Self::ShredCode(shred)),
Shred::ShredData(ShredData::Legacy(_)) => Err(Error::InvalidShredVariant),
Shred::ShredData(ShredData::Merkle(shred)) => Ok(Self::ShredData(shred)),
}
}
}

impl From<ShredVariant> for ShredType {
#[inline]
fn from(shred_variant: ShredVariant) -> Self {
Expand Down Expand Up @@ -682,6 +712,27 @@ impl TryFrom<u8> for ShredVariant {
}
}

pub(crate) fn recover(shreds: Vec<Shred>) -> Result<Vec<Shred>, Error> {
match shreds
.first()
.ok_or(TooFewShardsPresent)?
.common_header()
.shred_variant
{
ShredVariant::LegacyData | ShredVariant::LegacyCode => Shredder::try_recovery(shreds),
ShredVariant::MerkleCode(_) | ShredVariant::MerkleData(_) => {
let shreds = shreds
.into_iter()
.map(merkle::Shred::try_from)
.collect::<Result<_, _>>()?;
Ok(merkle::recover(shreds)?
.into_iter()
.map(Shred::from)
.collect())
}
}
}

// Accepts shreds in the slot range [root + 1, max_slot].
#[must_use]
pub fn should_discard_shred(
Expand Down
Loading