Skip to content

Commit

Permalink
Plumb dumps from replay_stage to repair
Browse files Browse the repository at this point in the history
When dumping a slot from replay_stage as a result of duplicate or
ancestor hashes, properly update repair subtrees to keep weighting and
forks view accurate.
  • Loading branch information
AshwinSekar committed Dec 6, 2022
1 parent 582397a commit 2cca8ae
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 51 deletions.
2 changes: 2 additions & 0 deletions core/src/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,7 @@ mod test {
ref cluster_slots,
..
} = repair_info;
let (dumped_slots_sender, _dumped_slots_receiver) = unbounded();

// Add the responder to the eligible list for requests
let responder_id = responder_info.id;
Expand All @@ -1550,6 +1551,7 @@ mod test {
&requester_blockstore,
None,
&mut PurgeRepairSlotCounter::default(),
&dumped_slots_sender,
);

// Simulate making a request
Expand Down
115 changes: 68 additions & 47 deletions core/src/heaviest_subtree_fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,24 +413,29 @@ impl HeaviestSubtreeForkChoice {
.map(|(slot_hash, fork_info)| (slot_hash, fork_info.stake_voted_subtree))
}

/// Dump the node `slot_hash_key` and propagate the stake subtraction up to the root of the
/// tree. Children of `slot_hash_key` are implicitely dumped as well, as if we were able to
/// chain them to a defunct parent that implies that they are defunct as well (consistent with
/// bank forks).
/// Split off the node at `slot_hash_key` and propagate the stake subtraction up to the root of the
/// tree.
///
/// Returns all removed slots
pub fn dump_node(&mut self, slot_hash_key: &SlotHashKey) -> Vec<Slot> {
let parent = {
/// Assumes that `slot_hash_key` is not the `tree_root`
/// Returns the subtree originating from `slot_hash_key`
pub fn split_off(&mut self, slot_hash_key: &SlotHashKey) -> Self {
assert_ne!(self.tree_root, *slot_hash_key);
let mut tree = HeaviestSubtreeForkChoice::new(*slot_hash_key);
let (parent, stake_voted_subtree, stake_voted_at) = {
let mut node_to_dump = self
.fork_infos
.get_mut(slot_hash_key)
.expect("Slot hash key must exist in tree");
// Remove stake to be aggregated up the tree
node_to_dump.stake_voted_subtree = 0;
node_to_dump.stake_voted_at = 0;
let stake_voted_subtree = std::mem::take(&mut node_to_dump.stake_voted_subtree);
let stake_voted_at = std::mem::take(&mut node_to_dump.stake_voted_at);
// Mark this node as invalid so that it cannot be chosen as best child
node_to_dump.latest_invalid_ancestor = Some(slot_hash_key.0);
node_to_dump.parent
(
node_to_dump.parent.expect("Cannot split off from root"),
stake_voted_subtree,
stake_voted_at,
)
};

let mut update_operations: UpdateOperations = BTreeMap::new();
Expand All @@ -440,7 +445,6 @@ impl HeaviestSubtreeForkChoice {

// Remove node + all children
let mut to_visit = vec![*slot_hash_key];
let mut removed = Vec::new();

while !to_visit.is_empty() {
let current_node = to_visit.pop().unwrap();
Expand All @@ -449,22 +453,27 @@ impl HeaviestSubtreeForkChoice {
.remove(&current_node)
.expect("Node must exist in tree");

removed.push(current_node.0);
to_visit.extend(current_fork_info.children.into_iter());
to_visit.extend(current_fork_info.children.iter());
tree.fork_infos.insert(current_node, current_fork_info);
}

if let Some(parent) = parent {
// Remove link from parent
let parent_fork_info = self
.fork_infos
.get_mut(&parent)
.expect("Parent must exist in fork infos");
parent_fork_info.children.remove(slot_hash_key);
} else {
warn!("Dumped root of tree {:?}", slot_hash_key);
}
// Remove link from parent
let parent_fork_info = self
.fork_infos
.get_mut(&parent)
.expect("Parent must exist in fork infos");
parent_fork_info.children.remove(slot_hash_key);

// Reset fork info
let root = tree
.fork_infos
.get_mut(slot_hash_key)
.expect("Slot hash key is the root must exist");
root.stake_voted_subtree = stake_voted_subtree;
root.stake_voted_at = stake_voted_at;
root.latest_invalid_ancestor = None;

removed
tree
}

#[cfg(test)]
Expand Down Expand Up @@ -3474,7 +3483,7 @@ mod test {
}

#[test]
fn test_dump_node_simple() {
fn test_split_off_simple() {
let mut heaviest_subtree_fork_choice = setup_forks();
let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake);
Expand All @@ -3491,7 +3500,7 @@ mod test {
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
heaviest_subtree_fork_choice.dump_node(&(5, Hash::default()));
let tree = heaviest_subtree_fork_choice.split_off(&(5, Hash::default()));

assert_eq!(
3 * stake,
Expand Down Expand Up @@ -3519,10 +3528,18 @@ mod test {
None,
heaviest_subtree_fork_choice.stake_voted_subtree(&(6, Hash::default()))
);
assert_eq!(
stake,
tree.stake_voted_subtree(&(5, Hash::default())).unwrap()
);
assert_eq!(
stake,
tree.stake_voted_subtree(&(6, Hash::default())).unwrap()
);
}

#[test]
fn test_dump_node_unvoted() {
fn test_split_off_unvoted() {
let mut heaviest_subtree_fork_choice = setup_forks();
let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake);
Expand All @@ -3539,7 +3556,7 @@ mod test {
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
heaviest_subtree_fork_choice.dump_node(&(2, Hash::default()));
let tree = heaviest_subtree_fork_choice.split_off(&(2, Hash::default()));

assert_eq!(
4 * stake,
Expand All @@ -3561,10 +3578,12 @@ mod test {
None,
heaviest_subtree_fork_choice.stake_voted_subtree(&(4, Hash::default()))
);
assert_eq!(0, tree.stake_voted_subtree(&(2, Hash::default())).unwrap());
assert_eq!(0, tree.stake_voted_subtree(&(4, Hash::default())).unwrap());
}

#[test]
fn test_dump_node_on_best_path() {
fn test_split_off_on_best_path() {
let mut heaviest_subtree_fork_choice = setup_forks();
let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake);
Expand All @@ -3584,18 +3603,21 @@ mod test {

assert_eq!(6, heaviest_subtree_fork_choice.best_overall_slot().0);

heaviest_subtree_fork_choice.dump_node(&(6, Hash::default()));
let tree = heaviest_subtree_fork_choice.split_off(&(6, Hash::default()));
assert_eq!(5, heaviest_subtree_fork_choice.best_overall_slot().0);
assert_eq!(6, tree.best_overall_slot().0);

heaviest_subtree_fork_choice.dump_node(&(3, Hash::default()));
let tree = heaviest_subtree_fork_choice.split_off(&(3, Hash::default()));
assert_eq!(4, heaviest_subtree_fork_choice.best_overall_slot().0);
assert_eq!(5, tree.best_overall_slot().0);

heaviest_subtree_fork_choice.dump_node(&(1, Hash::default()));
let tree = heaviest_subtree_fork_choice.split_off(&(1, Hash::default()));
assert_eq!(0, heaviest_subtree_fork_choice.best_overall_slot().0);
assert_eq!(4, tree.best_overall_slot().0);
}

#[test]
fn test_dump_with_dups() {
fn test_split_off_with_dups() {
let (
mut heaviest_subtree_fork_choice,
duplicate_leaves_descended_from_4,
Expand Down Expand Up @@ -3629,16 +3651,17 @@ mod test {
heaviest_subtree_fork_choice.best_overall_slot(),
expected_best_slot_hash
);
heaviest_subtree_fork_choice.dump_node(&expected_best_slot_hash);
let tree = heaviest_subtree_fork_choice.split_off(&expected_best_slot_hash);

assert_eq!(
heaviest_subtree_fork_choice.best_overall_slot(),
duplicate_leaves_descended_from_4[1]
);
assert_eq!(tree.best_overall_slot(), expected_best_slot_hash);
}

#[test]
fn test_dump_subtree_with_dups() {
fn test_split_off_subtree_with_dups() {
let (
mut heaviest_subtree_fork_choice,
duplicate_leaves_descended_from_4,
Expand Down Expand Up @@ -3672,45 +3695,43 @@ mod test {
heaviest_subtree_fork_choice.best_overall_slot(),
expected_best_slot_hash
);
heaviest_subtree_fork_choice.dump_node(&(2, Hash::default()));
let tree = heaviest_subtree_fork_choice.split_off(&(2, Hash::default()));

assert_eq!(
heaviest_subtree_fork_choice.best_overall_slot(),
duplicate_leaves_descended_from_5[0]
);
assert_eq!(tree.best_overall_slot(), expected_best_slot_hash);
}

#[test]
fn test_dump_node_complicated() {
fn test_split_off_complicated() {
let mut heaviest_subtree_fork_choice = setup_complicated_forks();

let dump_and_check =
let split_and_check =
|tree: &mut HeaviestSubtreeForkChoice, node: Slot, nodes_to_check: Vec<Slot>| {
for &n in nodes_to_check.iter() {
assert!(tree.contains_block(&(n, Hash::default())));
}
tree.dump_node(&(node, Hash::default()));
let split_tree = tree.split_off(&(node, Hash::default()));
for &n in nodes_to_check.iter() {
assert!(!tree.contains_block(&(n, Hash::default())));
assert!(split_tree.contains_block(&(n, Hash::default())));
}
};

dump_and_check(
split_and_check(
&mut heaviest_subtree_fork_choice,
14,
vec![14, 15, 16, 22, 23, 17, 21, 18, 19, 20, 24, 25],
);
dump_and_check(&mut heaviest_subtree_fork_choice, 12, vec![12, 13]);
dump_and_check(
split_and_check(&mut heaviest_subtree_fork_choice, 12, vec![12, 13]);
split_and_check(
&mut heaviest_subtree_fork_choice,
2,
vec![2, 7, 8, 9, 33, 34, 10, 31, 32],
);
dump_and_check(
&mut heaviest_subtree_fork_choice,
0,
vec![0, 1, 5, 6, 3, 11, 26],
);
split_and_check(&mut heaviest_subtree_fork_choice, 1, vec![1, 5, 6]);
}

fn setup_forks() -> HeaviestSubtreeForkChoice {
Expand Down
28 changes: 28 additions & 0 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub type DuplicateSlotsResetSender = CrossbeamSender<Vec<(Slot, Hash)>>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Vec<(Slot, Hash)>>;
pub type ConfirmedSlotsSender = CrossbeamSender<Vec<Slot>>;
pub type ConfirmedSlotsReceiver = CrossbeamReceiver<Vec<Slot>>;
pub type DumpedSlotsSender = CrossbeamSender<Vec<(Slot, Hash)>>;
pub type DumpedSlotsReceiver = CrossbeamReceiver<Vec<(Slot, Hash)>>;
pub type OutstandingShredRepairs = OutstandingRequests<ShredRepairType>;

#[derive(Default, Debug)]
Expand Down Expand Up @@ -92,6 +94,7 @@ pub struct RepairStats {
#[derive(Default, Debug)]
pub struct RepairTiming {
pub set_root_elapsed: u64,
pub dump_slots_elapsed: u64,
pub get_votes_elapsed: u64,
pub add_votes_elapsed: u64,
pub get_best_orphans_elapsed: u64,
Expand All @@ -107,12 +110,14 @@ impl RepairTiming {
fn update(
&mut self,
set_root_elapsed: u64,
dump_slots_elapsed: u64,
get_votes_elapsed: u64,
add_votes_elapsed: u64,
build_repairs_batch_elapsed: u64,
batch_send_repairs_elapsed: u64,
) {
self.set_root_elapsed += set_root_elapsed;
self.dump_slots_elapsed += dump_slots_elapsed;
self.get_votes_elapsed += get_votes_elapsed;
self.add_votes_elapsed += add_votes_elapsed;
self.build_repairs_batch_elapsed += build_repairs_batch_elapsed;
Expand Down Expand Up @@ -205,6 +210,7 @@ impl RepairService {
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
dumped_slots_receiver: DumpedSlotsReceiver,
) -> Self {
let t_repair = {
let blockstore = blockstore.clone();
Expand All @@ -220,6 +226,7 @@ impl RepairService {
repair_info,
verified_vote_receiver,
&outstanding_requests,
dumped_slots_receiver,
)
})
.unwrap()
Expand All @@ -246,6 +253,7 @@ impl RepairService {
repair_info: RepairInfo,
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
dumped_slots_receiver: DumpedSlotsReceiver,
) {
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
let serve_repair = ServeRepair::new(
Expand All @@ -267,6 +275,7 @@ impl RepairService {
}

let mut set_root_elapsed;
let mut dump_slots_elapsed;
let mut get_votes_elapsed;
let mut add_votes_elapsed;

Expand All @@ -279,6 +288,23 @@ impl RepairService {
repair_weight.set_root(new_root);
set_root_elapsed.stop();

// Remove dumped slots from the weighting heuristic
dump_slots_elapsed = Measure::start("dump_slots_elapsed");
dumped_slots_receiver
.try_iter()
.for_each(|slot_hash_keys_to_dump| {
// Currently we don't use the correct_hash in repair. Since this dumped
// slot is DuplicateConfirmed, we have a >= 52% chance on receiving the
// correct version.
for (slot, _correct_hash) in slot_hash_keys_to_dump {
// `slot` is dumped in blockstore wanting to be repaired, we orphan it along with
// descendants while copying the weighting heurstic so that it can be
// repaired with correct ancestor information
repair_weight.orphan_slot(slot);
}
});
dump_slots_elapsed.stop();

// Add new votes to the weighting heuristic
get_votes_elapsed = Measure::start("get_votes_elapsed");
let mut slot_to_vote_pubkeys: HashMap<Slot, Vec<Pubkey>> = HashMap::new();
Expand Down Expand Up @@ -362,6 +388,7 @@ impl RepairService {

repair_timing.update(
set_root_elapsed.as_us(),
dump_slots_elapsed.as_us(),
get_votes_elapsed.as_us(),
add_votes_elapsed.as_us(),
build_repairs_batch_elapsed.as_us(),
Expand Down Expand Up @@ -397,6 +424,7 @@ impl RepairService {
datapoint_info!(
"repair_service-repair_timing",
("set-root-elapsed", repair_timing.set_root_elapsed, i64),
("dump-slots-elapsed", repair_timing.dump_slots_elapsed, i64),
("get-votes-elapsed", repair_timing.get_votes_elapsed, i64),
("add-votes-elapsed", repair_timing.add_votes_elapsed, i64),
(
Expand Down
Loading

0 comments on commit 2cca8ae

Please sign in to comment.