Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.18: replay: only vote on blocks with >= 32 data shreds in last fec set (backport of #1002) #1410

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2847,6 +2847,7 @@ impl ReplayStage {
block_metadata_notifier: Option<BlockMetadataNotifierArc>,
replay_result_vec: &[ReplaySlotFromBlockstore],
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
my_pubkey: &Pubkey,
) -> bool {
// TODO: See if processing of blockstore replay results and bank completion can be made thread safe.
let mut did_complete_bank = false;
Expand Down Expand Up @@ -2930,6 +2931,52 @@ impl ReplayStage {
}
}

if bank.collector_id() != my_pubkey {
// If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK shreds in the last FEC set,
// mark it dead. No reason to perform this check on our leader block.
let is_last_fec_set_full = blockstore
.is_last_fec_set_full(bank.slot())
.unwrap_or_else(|e| {
warn!(
"Unable to determine if last fec set is full for slot {} {},
marking as dead: {e:?}",
bank.slot(),
bank.hash()
);
false
});
if !is_last_fec_set_full {
// Update metric regardless of feature flag
datapoint_warn!(
"incomplete_final_fec_set",
("slot", bank_slot, i64),
("hash", bank.hash().to_string(), String)
);
if bank
.feature_set
.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id())
{
let root = bank_forks.read().unwrap().root();
Self::mark_dead_slot(
blockstore,
bank,
root,
&BlockstoreProcessorError::IncompleteFinalFecSet,
rpc_subscriptions,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
);
continue;
}
}
}

let r_replay_stats = replay_stats.read().unwrap();
let replay_progress = bank_progress.replay_progress.clone();
let r_replay_progress = replay_progress.read().unwrap();
Expand Down Expand Up @@ -3178,6 +3225,7 @@ impl ReplayStage {
block_metadata_notifier,
&replay_result_vec,
purge_repair_slot_counter,
my_pubkey,
)
} else {
false
Expand Down
210 changes: 208 additions & 2 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use {
next_slots_iterator::NextSlotsIterator,
shred::{
self, max_ticks_per_n_shreds, ErasureSetId, ProcessShredsStats, ReedSolomonCache,
Shred, ShredData, ShredId, ShredType, Shredder,
Shred, ShredData, ShredId, ShredType, Shredder, DATA_SHREDS_PER_FEC_BLOCK,
},
slot_stats::{ShredSource, SlotsStats},
transaction_address_lookup_table_scanner::scan_transaction,
Expand All @@ -28,6 +28,7 @@ use {
bincode::{deserialize, serialize},
crossbeam_channel::{bounded, Receiver, Sender, TrySendError},
dashmap::DashSet,
itertools::Itertools,
log::*,
rand::Rng,
rayon::{
Expand Down Expand Up @@ -88,6 +89,8 @@ use {
trees::{Tree, TreeWalk},
};
pub mod blockstore_purge;
#[cfg(test)]
use static_assertions::const_assert_eq;
pub use {
crate::{
blockstore_db::BlockstoreError,
Expand Down Expand Up @@ -3690,6 +3693,59 @@ impl Blockstore {
entries.into_iter().flatten().collect()
}

/// Returns true if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of a
/// slot have the same merkle root, indicating they are a part of the same
/// FEC set.
/// Will fail if:
/// - Slot meta is missing
/// - LAST_SHRED_IN_SLOT flag has not been received
/// - There are missing shreds in the last fec set
/// - The block contains legacy shreds
pub fn is_last_fec_set_full(&self, slot: Slot) -> Result<bool> {
// We need to check if the last FEC set index contains at least `DATA_SHREDS_PER_FEC_BLOCK` data shreds.
// We compare the merkle roots of the last `DATA_SHREDS_PER_FEC_BLOCK` shreds in this block.
// Since the merkle root contains the fec_set_index, if all of them match, we know that the last fec set has
// at least `DATA_SHREDS_PER_FEC_BLOCK` shreds.
let slot_meta = self.meta(slot)?.ok_or(BlockstoreError::SlotUnavailable)?;
let last_shred_index = slot_meta
.last_index
.ok_or(BlockstoreError::UnknownLastIndex(slot))?;

const MINIMUM_INDEX: u64 = DATA_SHREDS_PER_FEC_BLOCK as u64 - 1;
#[cfg(test)]
const_assert_eq!(MINIMUM_INDEX, 31);
let Some(start_index) = last_shred_index.checked_sub(MINIMUM_INDEX) else {
warn!("Slot {slot} has only {} shreds, fewer than the {DATA_SHREDS_PER_FEC_BLOCK} required", last_shred_index + 1);
return Ok(false);
};
let keys = (start_index..=last_shred_index)
.map(|index| (slot, index))
.collect();

let last_merkle_roots: Vec<Hash> = self
.data_shred_cf
.multi_get_bytes(keys)
.into_iter()
.enumerate()
.map(|(offset, shred_bytes)| {
let shred_bytes = shred_bytes.ok().flatten().ok_or_else(|| {
let shred_index = start_index + u64::try_from(offset).unwrap();
warn!("Missing shred for {slot} index {shred_index}");
BlockstoreError::MissingShred(slot, shred_index)
})?;
shred::layout::get_merkle_root(&shred_bytes).ok_or_else(|| {
let shred_index = start_index + u64::try_from(offset).unwrap();
warn!("Found legacy shred for {slot}, index {shred_index}");
BlockstoreError::LegacyShred(slot, shred_index)
})
})
.dedup_by(|res1, res2| res1.as_ref().ok() == res2.as_ref().ok())
.collect::<Result<Vec<Hash>>>()?;

// After the dedup there should be exactly one Hash left if the shreds were part of the same FEC set.
Ok(last_merkle_roots.len() == 1)
}

/// Returns a mapping from each elements of `slots` to a list of the
/// element's children slots.
pub fn get_slots_since(&self, slots: &[Slot]) -> Result<HashMap<Slot, Vec<Slot>>> {
Expand Down Expand Up @@ -10254,14 +10310,32 @@ pub mod tests {
num_entries: u64,
fec_set_index: u32,
chained_merkle_root: Option<Hash>,
) -> (Vec<Shred>, Vec<Shred>, Arc<LeaderScheduleCache>) {
setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot(
slot,
parent_slot,
num_entries,
fec_set_index,
chained_merkle_root,
true,
)
}

fn setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot(
slot: u64,
parent_slot: u64,
num_entries: u64,
fec_set_index: u32,
chained_merkle_root: Option<Hash>,
is_last_in_slot: bool,
) -> (Vec<Shred>, Vec<Shred>, Arc<LeaderScheduleCache>) {
let entries = make_slot_entries_with_transactions(num_entries);
let leader_keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap();
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
&leader_keypair,
&entries,
true, // is_last_in_slot
is_last_in_slot,
chained_merkle_root,
fec_set_index, // next_shred_index
fec_set_index, // next_code_index
Expand Down Expand Up @@ -11781,4 +11855,136 @@ pub mod tests {
.insert_shred_return_duplicate(coding_shred, &leader_schedule)
.is_empty());
}

#[test]
fn test_is_last_fec_set_full() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

let parent_slot = 0;
let slot = 1;

let fec_set_index = 30;
let (data_shreds, _, _) =
setup_erasure_shreds_with_index(slot, parent_slot, 10, fec_set_index);
let total_shreds = fec_set_index as u64 + data_shreds.len() as u64;

// FEC set should be padded
assert_eq!(data_shreds.len(), DATA_SHREDS_PER_FEC_BLOCK);

// Missing slot meta
assert_matches!(
blockstore.is_last_fec_set_full(0),
Err(BlockstoreError::SlotUnavailable)
);

// Incomplete slot
blockstore
.insert_shreds(
data_shreds[0..DATA_SHREDS_PER_FEC_BLOCK - 1].to_vec(),
None,
false,
)
.unwrap();
let meta = blockstore.meta(slot).unwrap().unwrap();
assert!(meta.last_index.is_none());
assert_matches!(
blockstore.is_last_fec_set_full(slot),
Err(BlockstoreError::UnknownLastIndex(_))
);
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

// Missing shreds
blockstore
.insert_shreds(data_shreds[1..].to_vec(), None, false)
.unwrap();
let meta = blockstore.meta(slot).unwrap().unwrap();
assert_eq!(meta.last_index, Some(total_shreds - 1));
assert_matches!(
blockstore.is_last_fec_set_full(slot),
Err(BlockstoreError::MissingShred(_, _))
);
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

// Full slot
blockstore.insert_shreds(data_shreds, None, false).unwrap();
assert!(blockstore.is_last_fec_set_full(slot).unwrap());
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

// Slot has less than DATA_SHREDS_PER_FEC_BLOCK shreds in total
let mut fec_set_index = 0;
let (first_data_shreds, _, _) =
setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot(
slot,
parent_slot,
10,
fec_set_index,
None,
false,
);
let merkle_root = first_data_shreds[0].merkle_root().unwrap();
fec_set_index += first_data_shreds.len() as u32;
let (last_data_shreds, _, _) =
setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot(
slot,
parent_slot,
40,
fec_set_index,
Some(merkle_root),
false, // No padding
);
let last_index = last_data_shreds.last().unwrap().index();
let total_shreds = first_data_shreds.len() + last_data_shreds.len();
assert!(total_shreds < DATA_SHREDS_PER_FEC_BLOCK);
blockstore
.insert_shreds(first_data_shreds, None, false)
.unwrap();
blockstore
.insert_shreds(last_data_shreds, None, false)
.unwrap();
// Manually update last index flag
let mut slot_meta = blockstore.meta(slot).unwrap().unwrap();
slot_meta.last_index = Some(last_index as u64);
blockstore.put_meta(slot, &slot_meta).unwrap();
assert!(!blockstore.is_last_fec_set_full(slot).unwrap());
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

// Slot has more than DATA_SHREDS_PER_FEC_BLOCK in total, but last FEC set has less
let mut fec_set_index = 0;
let (first_data_shreds, _, _) =
setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot(
slot,
parent_slot,
100,
fec_set_index,
None,
false,
);
let merkle_root = first_data_shreds[0].merkle_root().unwrap();
fec_set_index += first_data_shreds.len() as u32;
let (last_data_shreds, _, _) =
setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot(
slot,
parent_slot,
100,
fec_set_index,
Some(merkle_root),
false, // No padding
);
let last_index = last_data_shreds.last().unwrap().index();
let total_shreds = first_data_shreds.len() + last_data_shreds.len();
assert!(last_data_shreds.len() < DATA_SHREDS_PER_FEC_BLOCK);
assert!(total_shreds > DATA_SHREDS_PER_FEC_BLOCK);
blockstore
.insert_shreds(first_data_shreds, None, false)
.unwrap();
blockstore
.insert_shreds(last_data_shreds, None, false)
.unwrap();
// Manually update last index flag
let mut slot_meta = blockstore.meta(slot).unwrap().unwrap();
slot_meta.last_index = Some(last_index as u64);
blockstore.put_meta(slot, &slot_meta).unwrap();
assert!(!blockstore.is_last_fec_set_full(slot).unwrap());
}
}
6 changes: 6 additions & 0 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ pub enum BlockstoreError {
TransactionIndexOverflow,
#[error("invalid erasure config")]
InvalidErasureConfig,
#[error("last shred index missing slot {0}")]
UnknownLastIndex(Slot),
#[error("missing shred slot {0}, index {1}")]
MissingShred(Slot, u64),
#[error("legacy shred slot {0}, index {1}")]
LegacyShred(Slot, u64),
}
pub type Result<T> = std::result::Result<T, BlockstoreError>;

Expand Down
3 changes: 3 additions & 0 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,9 @@ pub enum BlockstoreProcessorError {

#[error("root bank with mismatched capitalization at {0}")]
RootBankWithMismatchedCapitalization(Slot),

#[error("incomplete final fec set")]
IncompleteFinalFecSet,
}

/// Callback for accessing bank state while processing the blockstore
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/feature_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,10 @@ pub mod deprecate_unused_legacy_vote_plumbing {
solana_sdk::declare_id!("6Uf8S75PVh91MYgPQSHnjRAPQq6an5BDv9vomrCwDqLe");
}

pub mod vote_only_full_fec_sets {
solana_sdk::declare_id!("ffecLRhhakKSGhMuc6Fz2Lnfq4uT9q3iu9ZsNaPLxPc");
}

lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
Expand Down Expand Up @@ -981,6 +985,7 @@ lazy_static! {
(enable_chained_merkle_shreds::id(), "Enable chained Merkle shreds #34916"),
(deprecate_unused_legacy_vote_plumbing::id(), "Deprecate unused legacy vote tx plumbing"),
(chained_merkle_conflict_duplicate_proofs::id(), "generate duplicate proofs for chained merkle root conflicts"),
(vote_only_full_fec_sets::id(), "vote only full fec sets"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()
Expand Down
Loading