Skip to content

Commit

Permalink
replay: only vote on blocks with >= 32 data shreds in last fec set (s…
Browse files Browse the repository at this point in the history
…olana-labs#1002)

* replay: only vote on blocks with >= 32 data shreds in last fec set

* pr feedback: pub(crate), inspect_err

* pr feedback: error variants, collapse function, dedup

* pr feedback: remove set_last_in_slot, rework test

* pr feedback: add metric, perform check regardless of ff

* pr feedback: mark block as dead rather than duplicate

* pr feedback: self.meta, const_assert, no collect

* pr feedback: cfg(test) assertion, remove expect and collect, error fmt

* Keep the collect to preserve error

* pr feedback: do not hold bank_forks lock for mark_dead_slot
  • Loading branch information
AshwinSekar authored May 17, 2024
1 parent 9a7bada commit 8c67696
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 2 deletions.
48 changes: 48 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2963,6 +2963,7 @@ impl ReplayStage {
block_metadata_notifier: Option<BlockMetadataNotifierArc>,
replay_result_vec: &[ReplaySlotFromBlockstore],
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
my_pubkey: &Pubkey,
) -> bool {
// TODO: See if processing of blockstore replay results and bank completion can be made thread safe.
let mut did_complete_bank = false;
Expand Down Expand Up @@ -3046,6 +3047,52 @@ impl ReplayStage {
}
}

if bank.collector_id() != my_pubkey {
// If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK shreds in the last FEC set,
// mark it dead. No reason to perform this check on our leader block.
if !blockstore
.is_last_fec_set_full(bank.slot())
.inspect_err(|e| {
warn!(
"Unable to determine if last fec set is full for slot {} {},
marking as dead: {e:?}",
bank.slot(),
bank.hash()
)
})
.unwrap_or(false)
{
// Update metric regardless of feature flag
datapoint_warn!(
"incomplete_final_fec_set",
("slot", bank_slot, i64),
("hash", bank.hash().to_string(), String)
);
if bank
.feature_set
.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id())
{
let root = bank_forks.read().unwrap().root();
Self::mark_dead_slot(
blockstore,
bank,
root,
&BlockstoreProcessorError::IncompleteFinalFecSet,
rpc_subscriptions,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
);
continue;
}
}
}

let r_replay_stats = replay_stats.read().unwrap();
let replay_progress = bank_progress.replay_progress.clone();
let r_replay_progress = replay_progress.read().unwrap();
Expand Down Expand Up @@ -3300,6 +3347,7 @@ impl ReplayStage {
block_metadata_notifier,
&replay_result_vec,
purge_repair_slot_counter,
my_pubkey,
)
}

Expand Down
207 changes: 205 additions & 2 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use {
next_slots_iterator::NextSlotsIterator,
shred::{
self, max_ticks_per_n_shreds, ErasureSetId, ProcessShredsStats, ReedSolomonCache,
Shred, ShredData, ShredId, ShredType, Shredder,
Shred, ShredData, ShredId, ShredType, Shredder, DATA_SHREDS_PER_FEC_BLOCK,
},
slot_stats::{ShredSource, SlotsStats},
transaction_address_lookup_table_scanner::scan_transaction,
Expand Down Expand Up @@ -85,6 +85,8 @@ use {
trees::{Tree, TreeWalk},
};
pub mod blockstore_purge;
#[cfg(test)]
use static_assertions::const_assert_eq;
pub use {
crate::{
blockstore_db::BlockstoreError,
Expand Down Expand Up @@ -3681,6 +3683,57 @@ impl Blockstore {
self.get_slot_entries_in_block(slot, vec![(start_index, end_index)], slot_meta)
}

/// Returns true if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of a
/// slot have the same merkle root, indicating they are a part of the same
/// FEC set.
/// Will fail if:
/// - Slot meta is missing
/// - LAST_SHRED_IN_SLOT flag has not been received
/// - There are missing shreds in the last fec set
/// - The block contains legacy shreds
pub fn is_last_fec_set_full(&self, slot: Slot) -> Result<bool> {
// We need to check if the last FEC set index contains at least `DATA_SHREDS_PER_FEC_BLOCK` data shreds.
// We compare the merkle roots of the last `DATA_SHREDS_PER_FEC_BLOCK` shreds in this block.
// Since the merkle root contains the fec_set_index, if all of them match, we know that the last fec set has
// at least `DATA_SHREDS_PER_FEC_BLOCK` shreds.
let slot_meta = self.meta(slot)?.ok_or(BlockstoreError::SlotUnavailable)?;
let last_shred_index = slot_meta
.last_index
.ok_or(BlockstoreError::UnknownLastIndex(slot))?;

const MINIMUM_INDEX: u64 = DATA_SHREDS_PER_FEC_BLOCK as u64 - 1;
#[cfg(test)]
const_assert_eq!(MINIMUM_INDEX, 31);
let Some(start_index) = last_shred_index.checked_sub(MINIMUM_INDEX) else {
warn!("Slot {slot} has only {} shreds, fewer than the {DATA_SHREDS_PER_FEC_BLOCK} required", last_shred_index + 1);
return Ok(false);
};
let keys = (start_index..=last_shred_index).map(|index| (slot, index));

let last_merkle_roots: Vec<Hash> = self
.data_shred_cf
.multi_get_bytes(keys)
.into_iter()
.enumerate()
.map(|(offset, shred_bytes)| {
let shred_bytes = shred_bytes.ok().flatten().ok_or_else(|| {
let shred_index = start_index + u64::try_from(offset).unwrap();
warn!("Missing shred for {slot} index {shred_index}");
BlockstoreError::MissingShred(slot, shred_index)
})?;
shred::layout::get_merkle_root(&shred_bytes).ok_or_else(|| {
let shred_index = start_index + u64::try_from(offset).unwrap();
warn!("Found legacy shred for {slot}, index {shred_index}");
BlockstoreError::LegacyShred(slot, shred_index)
})
})
.dedup_by(|res1, res2| res1.as_ref().ok() == res2.as_ref().ok())
.collect::<Result<Vec<Hash>>>()?;

// After the dedup there should be exactly one Hash left if the shreds were part of the same FEC set.
Ok(last_merkle_roots.len() == 1)
}

/// Returns a mapping from each elements of `slots` to a list of the
/// element's children slots.
pub fn get_slots_since(&self, slots: &[Slot]) -> Result<HashMap<Slot, Vec<Slot>>> {
Expand Down Expand Up @@ -10253,14 +10306,32 @@ pub mod tests {
num_entries: u64,
fec_set_index: u32,
chained_merkle_root: Option<Hash>,
) -> (Vec<Shred>, Vec<Shred>, Arc<LeaderScheduleCache>) {
setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot(
slot,
parent_slot,
num_entries,
fec_set_index,
chained_merkle_root,
true,
)
}

fn setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot(
slot: u64,
parent_slot: u64,
num_entries: u64,
fec_set_index: u32,
chained_merkle_root: Option<Hash>,
is_last_in_slot: bool,
) -> (Vec<Shred>, Vec<Shred>, Arc<LeaderScheduleCache>) {
let entries = make_slot_entries_with_transactions(num_entries);
let leader_keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap();
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
&leader_keypair,
&entries,
true, // is_last_in_slot
is_last_in_slot,
chained_merkle_root,
fec_set_index, // next_shred_index
fec_set_index, // next_code_index
Expand Down Expand Up @@ -11784,4 +11855,136 @@ pub mod tests {
.insert_shred_return_duplicate(coding_shred, &leader_schedule)
.is_empty());
}

#[test]
fn test_is_last_fec_set_full() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

let parent_slot = 0;
let slot = 1;

let fec_set_index = 30;
let (data_shreds, _, _) =
setup_erasure_shreds_with_index(slot, parent_slot, 10, fec_set_index);
let total_shreds = fec_set_index as u64 + data_shreds.len() as u64;

// FEC set should be padded
assert_eq!(data_shreds.len(), DATA_SHREDS_PER_FEC_BLOCK);

// Missing slot meta
assert_matches!(
blockstore.is_last_fec_set_full(0),
Err(BlockstoreError::SlotUnavailable)
);

// Incomplete slot
blockstore
.insert_shreds(
data_shreds[0..DATA_SHREDS_PER_FEC_BLOCK - 1].to_vec(),
None,
false,
)
.unwrap();
let meta = blockstore.meta(slot).unwrap().unwrap();
assert!(meta.last_index.is_none());
assert_matches!(
blockstore.is_last_fec_set_full(slot),
Err(BlockstoreError::UnknownLastIndex(_))
);
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

// Missing shreds
blockstore
.insert_shreds(data_shreds[1..].to_vec(), None, false)
.unwrap();
let meta = blockstore.meta(slot).unwrap().unwrap();
assert_eq!(meta.last_index, Some(total_shreds - 1));
assert_matches!(
blockstore.is_last_fec_set_full(slot),
Err(BlockstoreError::MissingShred(_, _))
);
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

// Full slot
blockstore.insert_shreds(data_shreds, None, false).unwrap();
assert!(blockstore.is_last_fec_set_full(slot).unwrap());
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

// Slot has less than DATA_SHREDS_PER_FEC_BLOCK shreds in total
let mut fec_set_index = 0;
let (first_data_shreds, _, _) =
setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot(
slot,
parent_slot,
10,
fec_set_index,
None,
false,
);
let merkle_root = first_data_shreds[0].merkle_root().unwrap();
fec_set_index += first_data_shreds.len() as u32;
let (last_data_shreds, _, _) =
setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot(
slot,
parent_slot,
40,
fec_set_index,
Some(merkle_root),
false, // No padding
);
let last_index = last_data_shreds.last().unwrap().index();
let total_shreds = first_data_shreds.len() + last_data_shreds.len();
assert!(total_shreds < DATA_SHREDS_PER_FEC_BLOCK);
blockstore
.insert_shreds(first_data_shreds, None, false)
.unwrap();
blockstore
.insert_shreds(last_data_shreds, None, false)
.unwrap();
// Manually update last index flag
let mut slot_meta = blockstore.meta(slot).unwrap().unwrap();
slot_meta.last_index = Some(last_index as u64);
blockstore.put_meta(slot, &slot_meta).unwrap();
assert!(!blockstore.is_last_fec_set_full(slot).unwrap());
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

// Slot has more than DATA_SHREDS_PER_FEC_BLOCK in total, but last FEC set has less
let mut fec_set_index = 0;
let (first_data_shreds, _, _) =
setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot(
slot,
parent_slot,
100,
fec_set_index,
None,
false,
);
let merkle_root = first_data_shreds[0].merkle_root().unwrap();
fec_set_index += first_data_shreds.len() as u32;
let (last_data_shreds, _, _) =
setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot(
slot,
parent_slot,
100,
fec_set_index,
Some(merkle_root),
false, // No padding
);
let last_index = last_data_shreds.last().unwrap().index();
let total_shreds = first_data_shreds.len() + last_data_shreds.len();
assert!(last_data_shreds.len() < DATA_SHREDS_PER_FEC_BLOCK);
assert!(total_shreds > DATA_SHREDS_PER_FEC_BLOCK);
blockstore
.insert_shreds(first_data_shreds, None, false)
.unwrap();
blockstore
.insert_shreds(last_data_shreds, None, false)
.unwrap();
// Manually update last index flag
let mut slot_meta = blockstore.meta(slot).unwrap().unwrap();
slot_meta.last_index = Some(last_index as u64);
blockstore.put_meta(slot, &slot_meta).unwrap();
assert!(!blockstore.is_last_fec_set_full(slot).unwrap());
}
}
6 changes: 6 additions & 0 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ pub enum BlockstoreError {
TransactionIndexOverflow,
#[error("invalid erasure config")]
InvalidErasureConfig,
#[error("last shred index missing slot {0}")]
UnknownLastIndex(Slot),
#[error("missing shred slot {0}, index {1}")]
MissingShred(Slot, u64),
#[error("legacy shred slot {0}, index {1}")]
LegacyShred(Slot, u64),
}
pub type Result<T> = std::result::Result<T, BlockstoreError>;

Expand Down
3 changes: 3 additions & 0 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,9 @@ pub enum BlockstoreProcessorError {

#[error("set root error {0}")]
SetRootError(#[from] SetRootError),

#[error("incomplete final fec set")]
IncompleteFinalFecSet,
}

/// Callback for accessing bank state after each slot is confirmed while
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/feature_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,10 @@ pub mod migrate_feature_gate_program_to_core_bpf {
solana_sdk::declare_id!("4eohviozzEeivk1y9UbrnekbAFMDQyJz5JjA9Y6gyvky");
}

pub mod vote_only_full_fec_sets {
solana_sdk::declare_id!("ffecLRhhakKSGhMuc6Fz2Lnfq4uT9q3iu9ZsNaPLxPc");
}

lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
Expand Down Expand Up @@ -1006,6 +1010,7 @@ lazy_static! {
(abort_on_invalid_curve::id(), "Abort when elliptic curve syscalls invoked on invalid curve id SIMD-0137"),
(get_sysvar_syscall_enabled::id(), "Enable syscall for fetching Sysvar bytes #615"),
(migrate_feature_gate_program_to_core_bpf::id(), "Migrate Feature Gate program to Core BPF (programify) #1003"),
(vote_only_full_fec_sets::id(), "vote only full fec sets"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()
Expand Down

0 comments on commit 8c67696

Please sign in to comment.