Skip to content

Commit

Permalink
Better dupe detection (#13992)
Browse files Browse the repository at this point in the history
(cherry picked from commit c5fe076)
  • Loading branch information
sakridge authored and mergify-bot committed Dec 10, 2020
1 parent 07191dc commit 30cb12f
Show file tree
Hide file tree
Showing 6 changed files with 377 additions and 111 deletions.
19 changes: 15 additions & 4 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,28 @@ fn bench_shredder_coding(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
let data_shreds = make_shreds(symbol_count);
bencher.iter(|| {
Shredder::generate_coding_shreds(0, RECOMMENDED_FEC_RATE, &data_shreds[..symbol_count], 0)
.len();
Shredder::generate_coding_shreds(
0,
RECOMMENDED_FEC_RATE,
&data_shreds[..symbol_count],
0,
symbol_count,
)
.len();
})
}

#[bench]
fn bench_shredder_decoding(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
let data_shreds = make_shreds(symbol_count);
let coding_shreds =
Shredder::generate_coding_shreds(0, RECOMMENDED_FEC_RATE, &data_shreds[..symbol_count], 0);
let coding_shreds = Shredder::generate_coding_shreds(
0,
RECOMMENDED_FEC_RATE,
&data_shreds[..symbol_count],
0,
symbol_count,
);
bencher.iter(|| {
Shredder::try_recovery(
coding_shreds[..].to_vec(),
Expand Down
97 changes: 64 additions & 33 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,29 +973,43 @@ impl ReplayStage {
// errors related to the slot being purged
let slot = bank.slot();
warn!("Fatal replay error in slot: {}, err: {:?}", slot, err);
if let BlockstoreProcessorError::InvalidBlock(BlockError::InvalidTickCount) = err {
datapoint_info!(
"replay-stage-mark_dead_slot",
("error", format!("error: {:?}", err), String),
("slot", slot, i64)
);
} else {
datapoint_error!(
"replay-stage-mark_dead_slot",
("error", format!("error: {:?}", err), String),
("slot", slot, i64)
);
}
bank_progress.is_dead = true;
blockstore
.set_dead_slot(slot)
.expect("Failed to mark slot as dead in blockstore");
let is_serious = matches!(
err,
BlockstoreProcessorError::InvalidBlock(BlockError::InvalidTickCount)
);
Self::mark_dead_slot(blockstore, bank_progress, slot, &err, is_serious);
err
})?;

Ok(tx_count)
}

fn mark_dead_slot(
blockstore: &Blockstore,
bank_progress: &mut ForkProgress,
slot: Slot,
err: &BlockstoreProcessorError,
is_serious: bool,
) {
if is_serious {
datapoint_error!(
"replay-stage-mark_dead_slot",
("error", format!("error: {:?}", err), String),
("slot", slot, i64)
);
} else {
datapoint_info!(
"replay-stage-mark_dead_slot",
("error", format!("error: {:?}", err), String),
("slot", slot, i64)
);
}
bank_progress.is_dead = true;
blockstore
.set_dead_slot(slot)
.expect("Failed to mark slot as dead in blockstore");
}

#[allow(clippy::too_many_arguments)]
fn handle_votable_bank(
bank: &Arc<Bank>,
Expand Down Expand Up @@ -1307,23 +1321,40 @@ impl ReplayStage {
}
assert_eq!(*bank_slot, bank.slot());
if bank.is_complete() {
bank_progress.replay_stats.report_stats(
bank.slot(),
bank_progress.replay_progress.num_entries,
bank_progress.replay_progress.num_shreds,
);
did_complete_bank = true;
info!("bank frozen: {}", bank.slot());
bank.freeze();
heaviest_subtree_fork_choice
.add_new_leaf_slot(bank.slot(), Some(bank.parent_slot()));
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::Frozen(bank.clone()))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
if !blockstore.has_duplicate_shreds_in_slot(bank.slot()) {
bank_progress.replay_stats.report_stats(
bank.slot(),
bank_progress.replay_progress.num_entries,
bank_progress.replay_progress.num_shreds,
);
did_complete_bank = true;
info!("bank frozen: {}", bank.slot());
bank.freeze();
heaviest_subtree_fork_choice
.add_new_leaf_slot(bank.slot(), Some(bank.parent_slot()));
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::Frozen(bank.clone()))
.unwrap_or_else(|err| {
warn!("bank_notification_sender failed: {:?}", err)
});
}

Self::record_rewards(&bank, &rewards_recorder_sender);
Self::record_rewards(&bank, &rewards_recorder_sender);
} else {
Self::mark_dead_slot(
blockstore,
bank_progress,
bank.slot(),
&BlockstoreProcessorError::InvalidBlock(BlockError::DuplicateBlock),
true,
);
warn!(
"{} duplicate shreds detected, not freezing bank {}",
my_pubkey,
bank.slot()
);
}
} else {
trace!(
"bank {} not completed tick_height: {}, max_tick_height: {}",
Expand Down
2 changes: 1 addition & 1 deletion core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ mod tests {
assert!(!packet.meta.discard);

let coding =
solana_ledger::shred::Shredder::generate_coding_shreds(slot, 1.0f32, &[shred], 10);
solana_ledger::shred::Shredder::generate_coding_shreds(slot, 1.0f32, &[shred], 10, 1);
coding[0].copy_to_packet(&mut packet);
ShredFetchStage::process_packet(
&mut packet,
Expand Down
3 changes: 3 additions & 0 deletions ledger/src/block_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ pub enum BlockError {
/// that each block has the same number of hashes
#[error("trailing entry")]
TrailingEntry,

#[error("duplicate block")]
DuplicateBlock,
}
Loading

0 comments on commit 30cb12f

Please sign in to comment.