Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
adds back ErasureMeta::first_coding_index field (#21623)
Browse files Browse the repository at this point in the history
#16646
removed first_coding_index since the field is currently redundant and
always equal to fec_set_index.
However, with upcoming changes to erasure coding schema, this will no
longer be the same as fec_set_index and so requires a separate field to
represent.

(cherry picked from commit 49ba09b)

# Conflicts:
#	ledger/src/blockstore.rs
  • Loading branch information
behzadnouri authored and mergify-bot committed Feb 3, 2022
1 parent 59dd876 commit b7081ca
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 32 deletions.
126 changes: 116 additions & 10 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ use {
IteratorMode, LedgerColumn, Result, WriteBatch,
},
blockstore_meta::*,
<<<<<<< HEAD
entry::{create_ticks, Entry},
erasure::ErasureConfig,
=======
>>>>>>> 49ba09b33 (adds back ErasureMeta::first_coding_index field (#21623))
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{Result as ShredResult, Shred, ShredType, Shredder, SHRED_PAYLOAD_SIZE},
Expand Down Expand Up @@ -1063,21 +1066,16 @@ impl Blockstore {
}
}

let set_index = u64::from(shred.common_header.fec_set_index);
let erasure_config = ErasureConfig::new(
shred.coding_header.num_data_shreds as usize,
shred.coding_header.num_coding_shreds as usize,
);

let set_index = u64::from(shred.fec_set_index());
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
self.erasure_meta(slot, set_index)
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::new(set_index, erasure_config))
.unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap())
});

// TODO: handle_duplicate is not invoked and so duplicate shreds are
// not gossiped to the rest of cluster.
if erasure_config != erasure_meta.config() {
if !erasure_meta.check_coding_shred(&shred) {
metrics.num_coding_shreds_invalid_erasure_config += 1;
let conflicting_shred = self.find_conflicting_coding_shred(
&shred,
Expand All @@ -1100,7 +1098,7 @@ impl Blockstore {
warn!("Received multiple erasure configs for the same erasure set!!!");
warn!(
"Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), erasure_config
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header,
);

return false;
Expand Down Expand Up @@ -1230,7 +1228,7 @@ impl Blockstore {
}
}

let set_index = u64::from(shred.common_header.fec_set_index);
let set_index = u64::from(shred.fec_set_index());
let newly_completed_data_sets = self.insert_data_shred(
slot_meta,
index_meta.data_mut(),
Expand Down Expand Up @@ -5466,6 +5464,7 @@ pub mod tests {
ShredSource::Recovered,
));

<<<<<<< HEAD
// Trying to insert another "is_last" shred with index < the received index should fail
// skip over shred 7
blockstore
Expand All @@ -5490,6 +5489,39 @@ pub mod tests {
ShredSource::Repaired,
));
assert!(blockstore.has_duplicate_shreds_in_slot(0));
=======
// Ensure that an empty shred (one with no data) would get inserted. Such shreds
// may be used as signals (broadcast does so to indicate a slot was interrupted)
// Reuse shred5's header values to avoid a false negative result
let mut empty_shred = Shred::new_from_data(
shred5.common_header.slot,
shred5.common_header.index,
shred5.data_header.parent_offset,
None, // data
true, // is_last_data
true, // is_last_in_slot
0, // reference_tick
shred5.common_header.version,
shred5.fec_set_index(),
);
assert!(blockstore.should_insert_data_shred(
&empty_shred,
&slot_meta,
&HashMap::new(),
&last_root,
None,
ShredSource::Repaired,
));
empty_shred.data_header.size = 0;
assert!(!blockstore.should_insert_data_shred(
&empty_shred,
&slot_meta,
&HashMap::new(),
&last_root,
None,
ShredSource::Recovered,
));
>>>>>>> 49ba09b33 (adds back ErasureMeta::first_coding_index field (#21623))

// Insert all pending shreds
let mut shred8 = shreds[8].clone();
Expand Down Expand Up @@ -5632,7 +5664,81 @@ pub mod tests {
coding.clone(),
);

<<<<<<< HEAD
// Insert a good coding shred
=======
// Trying to insert a shred with index < position should fail
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
let index = coding_shred.index() - coding_shred.fec_set_index() - 1;
coding_shred.set_index(index as u32);

assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
}

// Trying to insert shred with num_coding == 0 should fail
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.coding_header.num_coding_shreds = 0;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
}

// Trying to insert shred with pos >= num_coding should fail
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
let num_coding_shreds = coding_shred.index() - coding_shred.fec_set_index();
coding_shred.coding_header.num_coding_shreds = num_coding_shreds as u16;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
}

// Trying to insert with set_index with num_coding that would imply the last shred
// has index > u32::MAX should fail
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.common_header.fec_set_index = std::u32::MAX - 1;
coding_shred.coding_header.num_data_shreds = 2;
coding_shred.coding_header.num_coding_shreds = 3;
coding_shred.coding_header.position = 1;
coding_shred.common_header.index = std::u32::MAX - 1;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));

coding_shred.coding_header.num_coding_shreds = 2000;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));

// Decreasing the number of num_coding_shreds will put it within the allowed limit
coding_shred.coding_header.num_coding_shreds = 2;
>>>>>>> 49ba09b33 (adds back ErasureMeta::first_coding_index field (#21623))
assert!(Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
Expand Down
66 changes: 54 additions & 12 deletions ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use {
crate::erasure::ErasureConfig,
crate::{
erasure::ErasureConfig,
shred::{Shred, ShredType},
},
serde::{Deserialize, Serialize},
solana_sdk::{clock::Slot, hash::Hash},
std::{
Expand Down Expand Up @@ -56,9 +59,8 @@ pub struct ShredIndex {
pub struct ErasureMeta {
/// Which erasure set in the slot this is
set_index: u64,
/// Deprecated field.
#[serde(rename = "first_coding_index")]
__unused_first_coding_index: u64,
/// First coding index in the FEC set
first_coding_index: u64,
/// Size of shards in this erasure set
#[serde(rename = "size")]
__unused_size: usize,
Expand Down Expand Up @@ -218,15 +220,41 @@ impl SlotMeta {
}

impl ErasureMeta {
pub(crate) fn new(set_index: u64, config: ErasureConfig) -> ErasureMeta {
ErasureMeta {
set_index,
config,
__unused_first_coding_index: 0,
__unused_size: 0,
pub(crate) fn from_coding_shred(shred: &Shred) -> Option<Self> {
match shred.shred_type() {
ShredType::Data => None,
ShredType::Code => {
let config = ErasureConfig::new(
usize::from(shred.coding_header.num_data_shreds),
usize::from(shred.coding_header.num_coding_shreds),
);
let first_coding_index = u64::from(shred.first_coding_index()?);
let erasure_meta = ErasureMeta {
set_index: u64::from(shred.fec_set_index()),
config,
first_coding_index,
__unused_size: 0,
};
Some(erasure_meta)
}
}
}

// Returns true if the erasure fields on the shred
// are consistent with the erasure-meta.
pub(crate) fn check_coding_shred(&self, shred: &Shred) -> bool {
let mut other = match Self::from_coding_shred(shred) {
Some(erasure_meta) => erasure_meta,
None => return false,
};
other.__unused_size = self.__unused_size;
// Ignore first_coding_index field for now to be backward compatible.
// TODO remove this once cluster is upgraded to always populate
// first_coding_index field.
other.first_coding_index = self.first_coding_index;
self == &other
}

pub(crate) fn config(&self) -> ErasureConfig {
self.config
}
Expand All @@ -238,7 +266,16 @@ impl ErasureMeta {

pub(crate) fn coding_shreds_indices(&self) -> Range<u64> {
let num_coding = self.config.num_coding() as u64;
self.set_index..self.set_index + num_coding
// first_coding_index == 0 may imply that the field is not populated.
// self.set_index to be backward compatible.
// TODO remove this once cluster is upgraded to always populate
// first_coding_index field.
let first_coding_index = if self.first_coding_index == 0 {
self.set_index
} else {
self.first_coding_index
};
first_coding_index..first_coding_index + num_coding
}

pub(crate) fn status(&self, index: &Index) -> ErasureMetaStatus {
Expand Down Expand Up @@ -308,7 +345,12 @@ mod test {
let set_index = 0;
let erasure_config = ErasureConfig::new(8, 16);

let e_meta = ErasureMeta::new(set_index, erasure_config);
let e_meta = ErasureMeta {
set_index,
first_coding_index: set_index,
config: erasure_config,
__unused_size: 0,
};
let mut rng = thread_rng();
let mut index = Index::new(0);

Expand Down
6 changes: 3 additions & 3 deletions ledger/src/erasure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,18 @@ pub struct ErasureConfig {
}

impl ErasureConfig {
pub fn new(num_data: usize, num_coding: usize) -> ErasureConfig {
pub(crate) fn new(num_data: usize, num_coding: usize) -> ErasureConfig {
ErasureConfig {
num_data,
num_coding,
}
}

pub fn num_data(self) -> usize {
pub(crate) fn num_data(self) -> usize {
self.num_data
}

pub fn num_coding(self) -> usize {
pub(crate) fn num_coding(self) -> usize {
self.num_coding
}
}
Expand Down
23 changes: 16 additions & 7 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,15 @@ impl Shred {
self.common_header.fec_set_index
}

pub(crate) fn first_coding_index(&self) -> Option<u32> {
match self.shred_type() {
ShredType::Data => None,
// TODO should be: self.index() - self.coding_header.position
// once position field is populated.
ShredType::Code => Some(self.fec_set_index()),
}
}

// Returns true if the shred passes sanity checks.
pub(crate) fn sanitize(&self) -> bool {
self.erasure_block_index().is_some()
Expand Down Expand Up @@ -901,7 +910,7 @@ impl Shredder {
assert_eq!(fec_set_index, index);
assert!(data.iter().all(|shred| shred.common_header.slot == slot
&& shred.common_header.version == version
&& shred.common_header.fec_set_index == fec_set_index));
&& shred.fec_set_index() == fec_set_index));
let num_data = data.len();
let num_coding = if is_last_in_slot {
(2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize)
Expand Down Expand Up @@ -947,7 +956,7 @@ impl Shredder {
Self::verify_consistent_shred_payload_sizes("try_recovery()", &shreds)?;
let (slot, fec_set_index) = match shreds.first() {
None => return Ok(Vec::default()),
Some(shred) => (shred.slot(), shred.common_header.fec_set_index),
Some(shred) => (shred.slot(), shred.fec_set_index()),
};
let (num_data_shreds, num_coding_shreds) = match shreds.iter().find(|shred| shred.is_code())
{
Expand All @@ -957,9 +966,9 @@ impl Shredder {
shred.coding_header.num_coding_shreds,
),
};
debug_assert!(shreds.iter().all(
|shred| shred.slot() == slot && shred.common_header.fec_set_index == fec_set_index
));
debug_assert!(shreds
.iter()
.all(|shred| shred.slot() == slot && shred.fec_set_index() == fec_set_index));
debug_assert!(shreds
.iter()
.filter(|shred| shred.is_code())
Expand Down Expand Up @@ -1793,15 +1802,15 @@ pub mod tests {
let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
data_shreds.iter().enumerate().for_each(|(i, s)| {
let expected_fec_set_index = start_index + ((i / max_per_block) * max_per_block) as u32;
assert_eq!(s.common_header.fec_set_index, expected_fec_set_index);
assert_eq!(s.fec_set_index(), expected_fec_set_index);
});

coding_shreds.iter().enumerate().for_each(|(i, s)| {
let mut expected_fec_set_index = start_index + (i - i % max_per_block) as u32;
while expected_fec_set_index as usize > data_shreds.len() {
expected_fec_set_index -= max_per_block as u32;
}
assert_eq!(s.common_header.fec_set_index, expected_fec_set_index);
assert_eq!(s.fec_set_index(), expected_fec_set_index);
});
}

Expand Down

0 comments on commit b7081ca

Please sign in to comment.