Skip to content

Commit

Permalink
v2.0: replay: do not early return when marking slots duplicate confir…
Browse files Browse the repository at this point in the history
…med (backport of #2700) (#2706)

* replay: do not early return when marking slots duplicate confirmed (#2700)

* replay: do not early return when marking slots duplicate confirmed

* pr feedback: catch panic explicitely, comments, add root test case

* pr feedback: add custom string to panic message

* pr feedback: add slot to log, use should_panic

* pr feedback: notification for {slot} -> notification for slot {slot}

(cherry picked from commit 1444baa)

* rename mark_slots_duplicate_confirmed -> mark_slots_confirmed

---------

Co-authored-by: Ashwin Sekar <[email protected]>
Co-authored-by: Ashwin Sekar <[email protected]>
  • Loading branch information
3 people authored Aug 23, 2024
1 parent a210e0c commit 901de9a
Showing 1 changed file with 254 additions and 4 deletions.
258 changes: 254 additions & 4 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1860,9 +1860,12 @@ impl ReplayStage {
} else if let Some(prev_hash) =
duplicate_confirmed_slots.insert(confirmed_slot, duplicate_confirmed_hash)
{
assert_eq!(prev_hash, duplicate_confirmed_hash);
assert_eq!(
prev_hash, duplicate_confirmed_hash,
"Additional duplicate confirmed notification for slot {confirmed_slot} with a different hash"
);
// Already processed this signal
return;
continue;
}

let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state(
Expand Down Expand Up @@ -4203,9 +4206,12 @@ impl ReplayStage {
}

if let Some(prev_hash) = duplicate_confirmed_slots.insert(*slot, *frozen_hash) {
assert_eq!(prev_hash, *frozen_hash);
assert_eq!(
prev_hash, *frozen_hash,
"Additional duplicate confirmed notification for slot {slot} with a different hash"
);
// Already processed this signal
return;
continue;
}

let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state(
Expand Down Expand Up @@ -4603,6 +4609,7 @@ pub(crate) mod tests {
sync::{atomic::AtomicU64, Arc, RwLock},
},
tempfile::tempdir,
test_case::test_case,
trees::{tr, Tree},
};

Expand Down Expand Up @@ -9444,4 +9451,247 @@ pub(crate) mod tests {
assert_eq!(working_bank.slot(), good_slot);
assert_eq!(working_bank.parent_slot(), initial_slot);
}

#[test]
#[should_panic(expected = "Additional duplicate confirmed notification for slot 6")]
fn test_mark_slots_confirmed() {
let generate_votes = |pubkeys: Vec<Pubkey>| {
pubkeys
.into_iter()
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
.collect()
};
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
let (vote_simulator, blockstore) =
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
let VoteSimulator {
bank_forks,
mut heaviest_subtree_fork_choice,
mut progress,
..
} = vote_simulator;

let (ancestor_hashes_replay_update_sender, _) = unbounded();
let mut duplicate_confirmed_slots = DuplicateConfirmedSlots::default();
let bank_hash_0 = bank_forks.read().unwrap().bank_hash(0).unwrap();
bank_forks
.write()
.unwrap()
.set_root(1, &AbsRequestSender::default(), None)
.unwrap();

// Mark 0 as duplicate confirmed, should fail as it is 0 < root
let confirmed_slots = [ConfirmedSlot::new_duplicate_confirmed_slot(0, bank_hash_0)];
ReplayStage::mark_slots_confirmed(
&confirmed_slots,
&blockstore,
&bank_forks,
&mut progress,
&mut DuplicateSlotsTracker::default(),
&mut heaviest_subtree_fork_choice,
&mut EpochSlotsFrozenSlots::default(),
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
&mut duplicate_confirmed_slots,
);

assert!(!duplicate_confirmed_slots.contains_key(&0));

// Mark 5 as duplicate confirmed, should suceed
let bank_hash_5 = bank_forks.read().unwrap().bank_hash(5).unwrap();
let confirmed_slots = [ConfirmedSlot::new_duplicate_confirmed_slot(5, bank_hash_5)];

ReplayStage::mark_slots_confirmed(
&confirmed_slots,
&blockstore,
&bank_forks,
&mut progress,
&mut DuplicateSlotsTracker::default(),
&mut heaviest_subtree_fork_choice,
&mut EpochSlotsFrozenSlots::default(),
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
&mut duplicate_confirmed_slots,
);

assert_eq!(*duplicate_confirmed_slots.get(&5).unwrap(), bank_hash_5);
assert!(heaviest_subtree_fork_choice
.is_duplicate_confirmed(&(5, bank_hash_5))
.unwrap_or(false));

// Mark 5 and 6 as duplicate confirmed, should succeed
let bank_hash_6 = bank_forks.read().unwrap().bank_hash(6).unwrap();
let confirmed_slots = [
ConfirmedSlot::new_duplicate_confirmed_slot(5, bank_hash_5),
ConfirmedSlot::new_duplicate_confirmed_slot(6, bank_hash_6),
];

ReplayStage::mark_slots_confirmed(
&confirmed_slots,
&blockstore,
&bank_forks,
&mut progress,
&mut DuplicateSlotsTracker::default(),
&mut heaviest_subtree_fork_choice,
&mut EpochSlotsFrozenSlots::default(),
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
&mut duplicate_confirmed_slots,
);

assert_eq!(*duplicate_confirmed_slots.get(&5).unwrap(), bank_hash_5);
assert!(heaviest_subtree_fork_choice
.is_duplicate_confirmed(&(5, bank_hash_5))
.unwrap_or(false));
assert_eq!(*duplicate_confirmed_slots.get(&6).unwrap(), bank_hash_6);
assert!(heaviest_subtree_fork_choice
.is_duplicate_confirmed(&(6, bank_hash_6))
.unwrap_or(false));

// Mark 6 as duplicate confirmed again with a different hash, should panic
let confirmed_slots = [ConfirmedSlot::new_duplicate_confirmed_slot(
6,
Hash::new_unique(),
)];
ReplayStage::mark_slots_confirmed(
&confirmed_slots,
&blockstore,
&bank_forks,
&mut progress,
&mut DuplicateSlotsTracker::default(),
&mut heaviest_subtree_fork_choice,
&mut EpochSlotsFrozenSlots::default(),
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
&mut duplicate_confirmed_slots,
);
}

#[test_case(true ; "same_batch")]
#[test_case(false ; "seperate_batches")]
#[should_panic(expected = "Additional duplicate confirmed notification for slot 6")]
fn test_process_duplicate_confirmed_slots(same_batch: bool) {
let generate_votes = |pubkeys: Vec<Pubkey>| {
pubkeys
.into_iter()
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
.collect()
};
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
let (vote_simulator, blockstore) =
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
let VoteSimulator {
bank_forks,
mut heaviest_subtree_fork_choice,
progress,
..
} = vote_simulator;

let (ancestor_hashes_replay_update_sender, _) = unbounded();
let (sender, receiver) = unbounded();
let mut duplicate_confirmed_slots = DuplicateConfirmedSlots::default();
let bank_hash_0 = bank_forks.read().unwrap().bank_hash(0).unwrap();
bank_forks
.write()
.unwrap()
.set_root(1, &AbsRequestSender::default(), None)
.unwrap();

// Mark 0 as duplicate confirmed, should fail as it is 0 < root
sender.send(vec![(0, bank_hash_0)]).unwrap();

ReplayStage::process_duplicate_confirmed_slots(
&receiver,
&blockstore,
&mut DuplicateSlotsTracker::default(),
&mut duplicate_confirmed_slots,
&mut EpochSlotsFrozenSlots::default(),
&bank_forks,
&progress,
&mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
);

assert!(!duplicate_confirmed_slots.contains_key(&0));

// Mark 5 as duplicate confirmed, should succed
let bank_hash_5 = bank_forks.read().unwrap().bank_hash(5).unwrap();
sender.send(vec![(5, bank_hash_5)]).unwrap();

ReplayStage::process_duplicate_confirmed_slots(
&receiver,
&blockstore,
&mut DuplicateSlotsTracker::default(),
&mut duplicate_confirmed_slots,
&mut EpochSlotsFrozenSlots::default(),
&bank_forks,
&progress,
&mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
);

assert_eq!(*duplicate_confirmed_slots.get(&5).unwrap(), bank_hash_5);
assert!(heaviest_subtree_fork_choice
.is_duplicate_confirmed(&(5, bank_hash_5))
.unwrap_or(false));

// Mark 5 and 6 as duplicate confirmed, should suceed
let bank_hash_6 = bank_forks.read().unwrap().bank_hash(6).unwrap();
if same_batch {
sender
.send(vec![(5, bank_hash_5), (6, bank_hash_6)])
.unwrap();
} else {
sender.send(vec![(5, bank_hash_5)]).unwrap();
sender.send(vec![(6, bank_hash_6)]).unwrap();
}

ReplayStage::process_duplicate_confirmed_slots(
&receiver,
&blockstore,
&mut DuplicateSlotsTracker::default(),
&mut duplicate_confirmed_slots,
&mut EpochSlotsFrozenSlots::default(),
&bank_forks,
&progress,
&mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
);

assert_eq!(*duplicate_confirmed_slots.get(&5).unwrap(), bank_hash_5);
assert!(heaviest_subtree_fork_choice
.is_duplicate_confirmed(&(5, bank_hash_5))
.unwrap_or(false));
assert_eq!(*duplicate_confirmed_slots.get(&6).unwrap(), bank_hash_6);
assert!(heaviest_subtree_fork_choice
.is_duplicate_confirmed(&(6, bank_hash_6))
.unwrap_or(false));

// Mark 6 as duplicate confirmed again with a different hash, should panic
sender.send(vec![(6, Hash::new_unique())]).unwrap();

ReplayStage::process_duplicate_confirmed_slots(
&receiver,
&blockstore,
&mut DuplicateSlotsTracker::default(),
&mut duplicate_confirmed_slots,
&mut EpochSlotsFrozenSlots::default(),
&bank_forks,
&progress,
&mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
);
}
}

0 comments on commit 901de9a

Please sign in to comment.