Skip to content

Commit

Permalink
refactor metrics inside blockstore fn, return block_id for future use
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar committed Jul 16, 2024
1 parent 91d582e commit 509cdb9
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 76 deletions.
70 changes: 27 additions & 43 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3076,54 +3076,38 @@ impl ReplayStage {
}
}

if bank.collector_id() != my_pubkey {
let _block_id = if bank.collector_id() != my_pubkey {
// If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK correctly retransmitted
// shreds in the last FEC set, mark it dead. No reason to perform this check on our leader block.
let check_result = match blockstore.check_last_fec_set(bank.slot()) {
Ok(last_fec_set_check_results) => {
// Update metrics regardless of feature flag
last_fec_set_check_results.report_metrics(bank_slot, bank.hash());
// Get a final result based on the feature flags
last_fec_set_check_results.get_result(&bank.feature_set)
}
Err(e) => {
warn!(
"Unable to check the last fec set for slot {} {},
marking as dead: {e:?}",
bank.slot(),
bank.hash()
match blockstore.check_last_fec_set_and_get_block_id(
bank.slot(),
bank.hash(),
&bank.feature_set,
) {
Ok(block_id) => block_id,
Err(result_err) => {
let root = bank_forks.read().unwrap().root();
Self::mark_dead_slot(
blockstore,
bank,
root,
&result_err,
rpc_subscriptions,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
);
if bank
.feature_set
.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id())
{
Err(BlockstoreProcessorError::IncompleteFinalFecSet)
} else {
Ok(())
}
continue;
}
};

if let Err(result_err) = check_result {
let root = bank_forks.read().unwrap().root();
Self::mark_dead_slot(
blockstore,
bank,
root,
&result_err,
rpc_subscriptions,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
);
continue;
}
}
} else {
None
};

let r_replay_stats = replay_stats.read().unwrap();
let replay_progress = bank_progress.replay_progress.clone();
Expand Down
109 changes: 76 additions & 33 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ impl<T> AsRef<T> for WorkingEntry<T> {

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct LastFECSetCheckResults {
is_full: bool,
block_id: Option<Hash>,
is_retransmitter_signed: bool,
}

impl LastFECSetCheckResults {
pub fn report_metrics(&self, slot: Slot, bank_hash: Hash) {
if !self.is_full {
if self.block_id.is_none() {
datapoint_warn!(
"incomplete_final_fec_set",
("slot", slot, i64),
Expand All @@ -196,12 +196,12 @@ impl LastFECSetCheckResults {
}
}

pub fn get_result(
pub fn get_block_id(
&self,
feature_set: &FeatureSet,
) -> std::result::Result<(), BlockstoreProcessorError> {
) -> std::result::Result<Option<Hash>, BlockstoreProcessorError> {
if feature_set.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id())
&& !self.is_full
&& self.block_id.is_none()
{
return Err(BlockstoreProcessorError::IncompleteFinalFecSet);
} else if feature_set
Expand All @@ -210,7 +210,7 @@ impl LastFECSetCheckResults {
{
return Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet);
}
Ok(())
Ok(self.block_id)
}
}

Expand Down Expand Up @@ -3724,9 +3724,39 @@ impl Blockstore {
self.get_slot_entries_in_block(slot, vec![(start_index, end_index)], slot_meta)
}

/// Performs checks on the last fec set of a replayed slot, and returns the block_id.
/// Returns:
/// - BlockstoreProcessorError::IncompleteFinalFecSet
/// if the last fec set is not full
/// - BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet
/// if the last fec set is not signed by retransmitters
pub fn check_last_fec_set_and_get_block_id(
&self,
slot: Slot,
bank_hash: Hash,
feature_set: &FeatureSet,
) -> std::result::Result<Option<Hash>, BlockstoreProcessorError> {
let results = self.check_last_fec_set(slot);
let Ok(results) = results else {
warn!(
"Unable to check the last fec set for slot {} {},
marking as dead: {results:?}",
slot, bank_hash,
);
if feature_set.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) {
return Err(BlockstoreProcessorError::IncompleteFinalFecSet);
}
return Ok(None);
};
// Update metrics
results.report_metrics(slot, bank_hash);
// Return block id / error based on feature flags
results.get_block_id(feature_set)
}

/// Performs checks on the last FEC set for this slot.
/// - `is_full` will be true if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of
/// `slot` have the same merkle root, indicating they are a part of the same FEC set.
/// - `block_id` will be `Some(mr)` if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of
/// `slot` have the same merkle root of `mr`, indicating they are a part of the same FEC set.
/// This indicates that the last FEC set is sufficiently sized.
/// - `is_retransmitter_signed` will be true if the last `DATA_SHREDS_PER_FEC_BLOCK`
/// data shreds of `slot` are of the retransmitter variant. Since we already discard
Expand All @@ -3738,7 +3768,7 @@ impl Blockstore {
/// - LAST_SHRED_IN_SLOT flag has not been received
/// - There are missing shreds in the last fec set
/// - The block contains legacy shreds
pub fn check_last_fec_set(&self, slot: Slot) -> Result<LastFECSetCheckResults> {
fn check_last_fec_set(&self, slot: Slot) -> Result<LastFECSetCheckResults> {
// We need to check if the last FEC set index contains at least `DATA_SHREDS_PER_FEC_BLOCK` data shreds.
// We compare the merkle roots of the last `DATA_SHREDS_PER_FEC_BLOCK` shreds in this block.
// Since the merkle root contains the fec_set_index, if all of them match, we know that the last fec set has
Expand All @@ -3754,7 +3784,7 @@ impl Blockstore {
let Some(start_index) = last_shred_index.checked_sub(MINIMUM_INDEX) else {
warn!("Slot {slot} has only {} shreds, fewer than the {DATA_SHREDS_PER_FEC_BLOCK} required", last_shred_index + 1);
return Ok(LastFECSetCheckResults {
is_full: false,
block_id: None,
is_retransmitter_signed: false,
});
};
Expand Down Expand Up @@ -3804,7 +3834,7 @@ impl Blockstore {

// After the dedup there should be exactly one Hash left and one true value
Ok(LastFECSetCheckResults {
is_full: merkle_root.is_some(),
block_id: merkle_root,
is_retransmitter_signed,
})
}
Expand Down Expand Up @@ -11999,9 +12029,10 @@ pub mod tests {
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

// Full slot
let block_id = data_shreds[0].merkle_root().unwrap();
blockstore.insert_shreds(data_shreds, None, false).unwrap();
let results = blockstore.check_last_fec_set(slot).unwrap();
assert!(results.is_full);
assert_eq!(results.block_id, Some(block_id));
assert!(results.is_retransmitter_signed);
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

Expand Down Expand Up @@ -12041,7 +12072,7 @@ pub mod tests {
slot_meta.last_index = Some(last_index as u64);
blockstore.put_meta(slot, &slot_meta).unwrap();
let results = blockstore.check_last_fec_set(slot).unwrap();
assert!(!results.is_full);
assert!(results.block_id.is_none());
assert!(!results.is_retransmitter_signed);
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

Expand Down Expand Up @@ -12082,7 +12113,7 @@ pub mod tests {
slot_meta.last_index = Some(last_index as u64);
blockstore.put_meta(slot, &slot_meta).unwrap();
let results = blockstore.check_last_fec_set(slot).unwrap();
assert!(!results.is_full);
assert!(results.block_id.is_none());
assert!(!results.is_retransmitter_signed);
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

Expand Down Expand Up @@ -12122,7 +12153,7 @@ pub mod tests {
slot_meta.last_index = Some((last_index - 3) as u64);
blockstore.put_meta(slot, &slot_meta).unwrap();
let results = blockstore.check_last_fec_set(slot).unwrap();
assert!(!results.is_full);
assert!(results.block_id.is_none());
assert!(results.is_retransmitter_signed);
blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap();

Expand All @@ -12139,11 +12170,12 @@ pub mod tests {
true,
);
assert!(first_data_shreds.len() > DATA_SHREDS_PER_FEC_BLOCK);
let block_id = first_data_shreds[0].merkle_root().unwrap();
blockstore
.insert_shreds(first_data_shreds, None, false)
.unwrap();
let results = blockstore.check_last_fec_set(slot).unwrap();
assert!(results.is_full);
assert_eq!(results.block_id, Some(block_id));
assert!(!results.is_retransmitter_signed);
}

Expand All @@ -12157,55 +12189,66 @@ pub mod tests {
retransmitter_only.activate(&vote_only_retransmitter_signed_fec_sets::id(), 0);

let results = LastFECSetCheckResults {
is_full: false,
block_id: None,
is_retransmitter_signed: false,
};
assert_matches!(
results.get_result(&enabled_feature_set),
results.get_block_id(&enabled_feature_set),
Err(BlockstoreProcessorError::IncompleteFinalFecSet)
);
assert_matches!(
results.get_result(&full_only),
results.get_block_id(&full_only),
Err(BlockstoreProcessorError::IncompleteFinalFecSet)
);
assert_matches!(
results.get_result(&retransmitter_only),
results.get_block_id(&retransmitter_only),
Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet)
);
results.get_result(&disabled_feature_set).unwrap();
assert!(results
.get_block_id(&disabled_feature_set)
.unwrap()
.is_none());

let block_id = Hash::new_unique();
let results = LastFECSetCheckResults {
is_full: true,
block_id: Some(block_id),
is_retransmitter_signed: false,
};
assert_matches!(
results.get_result(&enabled_feature_set),
results.get_block_id(&enabled_feature_set),
Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet)
);
results.get_result(&full_only).unwrap();
assert_eq!(results.get_block_id(&full_only).unwrap(), Some(block_id));
assert_matches!(
results.get_result(&retransmitter_only),
results.get_block_id(&retransmitter_only),
Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet)
);
results.get_result(&disabled_feature_set).unwrap();
assert_eq!(
results.get_block_id(&disabled_feature_set).unwrap(),
Some(block_id)
);

let results = LastFECSetCheckResults {
is_full: false,
block_id: None,
is_retransmitter_signed: true,
};
assert_matches!(
results.get_result(&enabled_feature_set),
results.get_block_id(&enabled_feature_set),
Err(BlockstoreProcessorError::IncompleteFinalFecSet)
);
assert_matches!(
results.get_result(&full_only),
results.get_block_id(&full_only),
Err(BlockstoreProcessorError::IncompleteFinalFecSet)
);
results.get_result(&retransmitter_only).unwrap();
results.get_result(&disabled_feature_set).unwrap();
assert!(results.get_block_id(&retransmitter_only).unwrap().is_none());
assert!(results
.get_block_id(&disabled_feature_set)
.unwrap()
.is_none());

let block_id = Hash::new_unique();
let results = LastFECSetCheckResults {
is_full: true,
block_id: Some(block_id),
is_retransmitter_signed: true,
};
for feature_set in [
Expand All @@ -12214,7 +12257,7 @@ pub mod tests {
full_only,
retransmitter_only,
] {
results.get_result(&feature_set).unwrap();
assert_eq!(results.get_block_id(&feature_set).unwrap(), Some(block_id));
}
}
}

0 comments on commit 509cdb9

Please sign in to comment.