Skip to content

Commit

Permalink
replay: extend last fec set check for 32+ retransmitter signed shreds
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar committed Jul 11, 2024
1 parent c338fbc commit 9b3728d
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 57 deletions.
74 changes: 38 additions & 36 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3071,48 +3071,50 @@ impl ReplayStage {
}

if bank.collector_id() != my_pubkey {
// If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK shreds in the last FEC set,
// mark it dead. No reason to perform this check on our leader block.
if !blockstore
.is_last_fec_set_full(bank.slot())
.inspect_err(|e| {
// If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK correctly retransmitted
// shreds in the last FEC set, mark it dead. No reason to perform this check on our leader block.
let check_result = match blockstore.check_last_fec_set(bank.slot()) {
Ok(last_fec_set_check_results) => {
// Update metrics regardless of feature flag
last_fec_set_check_results.report_metrics(bank_slot, bank.hash());
// Get a final result based on the feature flags
last_fec_set_check_results.get_result(&bank.feature_set)
}
Err(e) => {
warn!(
"Unable to determine if last fec set is full for slot {} {},
"Unable to check the last fec set for slot {} {},
marking as dead: {e:?}",
bank.slot(),
bank.hash()
)
})
.unwrap_or(false)
{
// Update metric regardless of feature flag
datapoint_warn!(
"incomplete_final_fec_set",
("slot", bank_slot, i64),
("hash", bank.hash().to_string(), String)
);
if bank
.feature_set
.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id())
{
let root = bank_forks.read().unwrap().root();
Self::mark_dead_slot(
blockstore,
bank,
root,
&BlockstoreProcessorError::IncompleteFinalFecSet,
rpc_subscriptions,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
);
continue;
if bank.feature_set
.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id())
{
Err(BlockstoreProcessorError::IncompleteFinalFecSet)
} else {
Ok(())
}
}
};

if let Err(result_err) = check_result {
let root = bank_forks.read().unwrap().root();
Self::mark_dead_slot(
blockstore,
bank,
root,
&result_err,
rpc_subscriptions,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
);
continue;
}
}

Expand Down
116 changes: 95 additions & 21 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use {
AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_FIFO,
BLOCKSTORE_DIRECTORY_ROCKS_LEVEL,
},
blockstore_processor::BlockstoreProcessorError,
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{
Expand Down Expand Up @@ -47,6 +48,7 @@ use {
account::ReadableAccount,
address_lookup_table::state::AddressLookupTable,
clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND},
feature_set::FeatureSet,
genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE},
hash::Hash,
pubkey::Pubkey,
Expand Down Expand Up @@ -170,6 +172,48 @@ impl<T> AsRef<T> for WorkingEntry<T> {
}
}

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct LastFECSetCheckResults {
is_full: bool,
is_retransmitter_signed: bool,
}

impl LastFECSetCheckResults {
pub fn report_metrics(&self, slot: Slot, hash: Hash) {
if !self.is_full {
datapoint_warn!(
"incomplete_final_fec_set",
("slot", slot, i64),
("hash", hash.to_string(), String)
);
}
if !self.is_retransmitter_signed {
datapoint_warn!(
"invalid_retransmitter_signature_final_fec_set",
("slot", slot, i64),
("hash", hash.to_string(), String)
);
}
}

pub fn get_result(
&self,
feature_set: &FeatureSet,
) -> std::result::Result<(), BlockstoreProcessorError> {
if feature_set.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id())
&& !self.is_full
{
return Err(BlockstoreProcessorError::IncompleteFinalFecSet);
} else if feature_set
.is_active(&solana_sdk::feature_set::verify_retransmitter_signature::id())
&& !self.is_retransmitter_signed
{
return Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet);
}
Ok(())
}
}

pub struct InsertResults {
completed_data_set_infos: Vec<CompletedDataSetInfo>,
duplicate_shreds: Vec<PossibleDuplicateShred>,
Expand Down Expand Up @@ -3680,15 +3724,21 @@ impl Blockstore {
self.get_slot_entries_in_block(slot, vec![(start_index, end_index)], slot_meta)
}

/// Returns true if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of a
/// slot have the same merkle root, indicating they are a part of the same
/// FEC set.
/// Performs checks on the last FEC set for this slot.
/// - `is_full` will be true if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of
/// `slot` have the same merkle root, indicating they are a part of the same FEC set.
/// This indicates that the last FEC set is sufficiently sized.
/// - `is_retransmitter_signed` will be true if the last `DATA_SHREDS_PER_FEC_BLOCK`
/// data shreds of `slot` are of the retransmitter variant. Since we already discard
/// invalid signatures on ingestion, this indicates that the last FEC set is properly
/// signed by retransmitters.
///
/// Will fail if:
/// - Slot meta is missing
/// - LAST_SHRED_IN_SLOT flag has not been received
/// - There are missing shreds in the last fec set
/// - The block contains legacy shreds
pub fn is_last_fec_set_full(&self, slot: Slot) -> Result<bool> {
pub fn check_last_fec_set(&self, slot: Slot) -> Result<LastFECSetCheckResults> {
// We need to check if the last FEC set index contains at least `DATA_SHREDS_PER_FEC_BLOCK` data shreds.
// We compare the merkle roots of the last `DATA_SHREDS_PER_FEC_BLOCK` shreds in this block.
// Since the merkle root contains the fec_set_index, if all of them match, we know that the last fec set has
Expand All @@ -3703,11 +3753,14 @@ impl Blockstore {
const_assert_eq!(MINIMUM_INDEX, 31);
let Some(start_index) = last_shred_index.checked_sub(MINIMUM_INDEX) else {
warn!("Slot {slot} has only {} shreds, fewer than the {DATA_SHREDS_PER_FEC_BLOCK} required", last_shred_index + 1);
return Ok(false);
return Ok(LastFECSetCheckResults {
is_full: false,
is_retransmitter_signed: false,
});
};
let keys = (start_index..=last_shred_index).map(|index| (slot, index));

let last_merkle_roots: Vec<Hash> = self
let (merkle_root, is_retransmitter_signed) = self
.data_shred_cf
.multi_get_bytes(keys)
.into_iter()
Expand All @@ -3718,17 +3771,38 @@ impl Blockstore {
warn!("Missing shred for {slot} index {shred_index}");
BlockstoreError::MissingShred(slot, shred_index)
})?;
shred::layout::get_merkle_root(&shred_bytes).ok_or_else(|| {
let shred_index = start_index + u64::try_from(offset).unwrap();
warn!("Found legacy shred for {slot}, index {shred_index}");
BlockstoreError::LegacyShred(slot, shred_index)
})
let is_retransmitter_signed = shred::layout::is_retransmitter_signed(&shred_bytes)
.map_err(|_| {
let shred_index = start_index + u64::try_from(offset).unwrap();
warn!("Found legacy shred for {slot}, index {shred_index}");
BlockstoreError::LegacyShred(slot, shred_index)
})?;
let merkle_root =
shred::layout::get_merkle_root(&shred_bytes).ok_or_else(|| {
let shred_index = start_index + u64::try_from(offset).unwrap();
warn!("Unable to read merkle root for {slot}, index {shred_index}");
BlockstoreError::MissingMerkleRoot(slot, shred_index)
})?;
Ok((merkle_root, is_retransmitter_signed))
})
.dedup_by(|res1, res2| res1.as_ref().ok() == res2.as_ref().ok())
.collect::<Result<Vec<Hash>>>()?;
.process_results::<_, _, BlockstoreError, _>(|iter| {
// lift and dedup merkle root and retransmitter individually to avoid allocation
// and still be able to report metrics accurately for each check.
iter.map(|(mr, rs)| (Some(mr), Some(rs)))
.reduce(|(mr1, rs1), (mr2, rs2)| {
(
if mr1 == mr2 { mr1 } else { None },
if rs1 == rs2 { rs1 } else { None },
)
})
.expect("There must be at least 1 shred in order to reach the above reduce without early error")
})?;

// After the dedup there should be exactly one Hash left if the shreds were part of the same FEC set.
Ok(last_merkle_roots.len() == 1)
// After the dedup there should be exactly one Hash left and one true value
Ok(LastFECSetCheckResults {
is_full: merkle_root.is_some(),
is_retransmitter_signed: is_retransmitter_signed.unwrap_or(false),
})
}

/// Returns a mapping from each elements of `slots` to a list of the
Expand Down Expand Up @@ -11887,7 +11961,7 @@ pub mod tests {

// Missing slot meta
assert_matches!(
blockstore.is_last_fec_set_full(0),
blockstore.check_last_fec_set(0),
Err(BlockstoreError::SlotUnavailable)
);

Expand All @@ -11902,7 +11976,7 @@ pub mod tests {
let meta = blockstore.meta(slot).unwrap().unwrap();
assert!(meta.last_index.is_none());
assert_matches!(
blockstore.is_last_fec_set_full(slot),
blockstore.check_last_fec_set(slot),
Err(BlockstoreError::UnknownLastIndex(_))
);
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();
Expand All @@ -11914,14 +11988,14 @@ pub mod tests {
let meta = blockstore.meta(slot).unwrap().unwrap();
assert_eq!(meta.last_index, Some(total_shreds - 1));
assert_matches!(
blockstore.is_last_fec_set_full(slot),
blockstore.check_last_fec_set(slot),
Err(BlockstoreError::MissingShred(_, _))
);
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

// Full slot
blockstore.insert_shreds(data_shreds, None, false).unwrap();
assert!(blockstore.is_last_fec_set_full(slot).unwrap());
assert!(blockstore.check_last_fec_set(slot).unwrap().is_full);
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

// Slot has less than DATA_SHREDS_PER_FEC_BLOCK shreds in total
Expand Down Expand Up @@ -11959,7 +12033,7 @@ pub mod tests {
let mut slot_meta = blockstore.meta(slot).unwrap().unwrap();
slot_meta.last_index = Some(last_index as u64);
blockstore.put_meta(slot, &slot_meta).unwrap();
assert!(!blockstore.is_last_fec_set_full(slot).unwrap());
assert!(!blockstore.check_last_fec_set(slot).unwrap().is_full);
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

// Slot has more than DATA_SHREDS_PER_FEC_BLOCK in total, but last FEC set has less
Expand Down Expand Up @@ -11998,6 +12072,6 @@ pub mod tests {
let mut slot_meta = blockstore.meta(slot).unwrap().unwrap();
slot_meta.last_index = Some(last_index as u64);
blockstore.put_meta(slot, &slot_meta).unwrap();
assert!(!blockstore.is_last_fec_set_full(slot).unwrap());
assert!(!blockstore.check_last_fec_set(slot).unwrap().is_full);
}
}
2 changes: 2 additions & 0 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ pub enum BlockstoreError {
MissingShred(Slot, u64),
#[error("legacy shred slot {0}, index {1}")]
LegacyShred(Slot, u64),
#[error("unable to read merkle root slot {0}, index {1}")]
MissingMerkleRoot(Slot, u64),
}
pub type Result<T> = std::result::Result<T, BlockstoreError>;

Expand Down
3 changes: 3 additions & 0 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,9 @@ pub enum BlockstoreProcessorError {

#[error("incomplete final fec set")]
IncompleteFinalFecSet,

#[error("invalid retransmitter signature final fec set")]
InvalidRetransmitterSignatureFinalFecSet,
}

/// Callback for accessing bank state after each slot is confirmed while
Expand Down
16 changes: 16 additions & 0 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,22 @@ pub mod layout {
.ok_or(Error::InvalidPayloadSize(shred.len()))
}

pub fn is_retransmitter_signed(shred: &[u8]) -> Result<bool, Error> {
match get_shred_variant(shred)? {
ShredVariant::LegacyCode | ShredVariant::LegacyData => Err(Error::InvalidShredVariant),
ShredVariant::MerkleCode {
proof_size: _,
chained: _,
resigned,
} => Ok(resigned),
ShredVariant::MerkleData {
proof_size: _,
chained: _,
resigned,
} => Ok(resigned),
}
}

pub(crate) fn set_retransmitter_signature(
shred: &mut [u8],
signature: &Signature,
Expand Down

0 comments on commit 9b3728d

Please sign in to comment.