Skip to content

Commit

Permalink
send duplicate shred proofs for conflicting shred scenarios
Browse files Browse the repository at this point in the history
These are multiple last_shred_in_slot shreds and
coding shreds with conflicting erasure metas.
  • Loading branch information
AshwinSekar committed Aug 23, 2023
1 parent 329c6f1 commit 5c90b2c
Show file tree
Hide file tree
Showing 7 changed files with 789 additions and 121 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 40 additions & 20 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use {
rayon::{prelude::*, ThreadPool},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics},
blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
leader_schedule_cache::LeaderScheduleCache,
shred::{self, Nonce, ReedSolomonCache, Shred},
},
Expand Down Expand Up @@ -138,23 +138,41 @@ impl WindowServiceMetrics {
fn run_check_duplicate(
cluster_info: &ClusterInfo,
blockstore: &Blockstore,
shred_receiver: &Receiver<Shred>,
shred_receiver: &Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: &DuplicateSlotSender,
) -> Result<()> {
let check_duplicate = |shred: Shred| -> Result<()> {
let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
let shred_slot = shred.slot();
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) {
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
blockstore.store_duplicate_slot(
shred_slot,
existing_shred_payload,
shred.into_payload(),
)?;

duplicate_slots_sender.send(shred_slot)?;
let (shred1, shred2) = match shred {
PossibleDuplicateShred::LastIndexConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::ErasureConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::Exists(shred) => {
// Unlike the other cases we have to wait until here to decide to handle the duplicate and store
// in blockstore. This is because the duplicate could have been part of the same insert batch,
// so we wait until the batch has been written.
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) {
blockstore.store_duplicate_slot(
shred_slot,
existing_shred_payload.clone(),
shred.clone().into_payload(),
)?;
(shred, existing_shred_payload)
} else {
// Shred is not duplicate
return Ok(());
}
} else {
// Shred has already been handled
return Ok(());
}
}
}
};

// Propagate duplicate proof through gossip
cluster_info.push_duplicate_shred(&shred1, &shred2)?;
// Notify duplicate consensus state machine
duplicate_slots_sender.send(shred_slot)?;

Ok(())
};
Expand Down Expand Up @@ -226,7 +244,7 @@ fn run_insert<F>(
reed_solomon_cache: &ReedSolomonCache,
) -> Result<()>
where
F: Fn(Shred),
F: Fn(PossibleDuplicateShred),
{
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
Expand Down Expand Up @@ -370,7 +388,7 @@ impl WindowService {
cluster_info: Arc<ClusterInfo>,
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
duplicate_receiver: Receiver<Shred>,
duplicate_receiver: Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: DuplicateSlotSender,
) -> JoinHandle<()> {
let handle_error = || {
Expand Down Expand Up @@ -400,7 +418,7 @@ impl WindowService {
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
verified_receiver: Receiver<Vec<PacketBatch>>,
check_duplicate_sender: Sender<Shred>,
check_duplicate_sender: Sender<PossibleDuplicateShred>,
completed_data_sets_sender: CompletedDataSetsSender,
retransmit_sender: Sender<Vec<ShredPayload>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
Expand All @@ -417,8 +435,8 @@ impl WindowService {
Builder::new()
.name("solWinInsert".to_string())
.spawn(move || {
let handle_duplicate = |shred| {
let _ = check_duplicate_sender.send(shred);
let handle_duplicate = |possible_duplicate_shred| {
let _ = check_duplicate_sender.send(possible_duplicate_shred);
};
let mut metrics = BlockstoreInsertionMetrics::default();
let mut ws_metrics = WindowServiceMetrics::default();
Expand Down Expand Up @@ -551,7 +569,9 @@ mod test {
};
assert_eq!(duplicate_shred.slot(), shreds[0].slot());
let duplicate_shred_slot = duplicate_shred.slot();
sender.send(duplicate_shred.clone()).unwrap();
sender
.send(PossibleDuplicateShred::Exists(duplicate_shred.clone()))
.unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
Expand Down
1 change: 1 addition & 0 deletions gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ thiserror = { workspace = true }
[dev-dependencies]
num_cpus = { workspace = true }
serial_test = { workspace = true }
test-case = { workspace = true }

[build-dependencies]
rustc_version = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl CrdsGossip {
let now = timestamp();
for entry in entries {
if let Err(err) = crds.insert(entry, now, GossipRoute::LocalMessage) {
error!("push_duplicate_shred faild: {:?}", err);
error!("push_duplicate_shred failed: {:?}", err);
}
}
Ok(())
Expand Down
Loading

0 comments on commit 5c90b2c

Please sign in to comment.