Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plumb dumps from replay_stage to repair #29058

Merged
merged 3 commits into from
Dec 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,7 @@ mod test {
ref cluster_slots,
..
} = repair_info;
let (dumped_slots_sender, _dumped_slots_receiver) = unbounded();

// Add the responder to the eligible list for requests
let responder_id = responder_info.id;
Expand All @@ -1559,6 +1560,7 @@ mod test {
&requester_blockstore,
None,
&mut PurgeRepairSlotCounter::default(),
&dumped_slots_sender,
);

// Simulate making a request
Expand Down
126 changes: 76 additions & 50 deletions core/src/heaviest_subtree_fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl UpdateOperation {
}
}

#[derive(Clone)]
struct ForkInfo {
// Amount of stake that has voted for exactly this slot
stake_voted_at: ForkWeight,
Expand Down Expand Up @@ -413,34 +414,35 @@ impl HeaviestSubtreeForkChoice {
.map(|(slot_hash, fork_info)| (slot_hash, fork_info.stake_voted_subtree))
}

/// Dump the node `slot_hash_key` and propagate the stake subtraction up to the root of the
/// tree. Children of `slot_hash_key` are implicitely dumped as well, as if we were able to
/// chain them to a defunct parent that implies that they are defunct as well (consistent with
/// bank forks).
/// Split off the node at `slot_hash_key` and propagate the stake subtraction up to the root of the
/// tree.
///
/// Returns all removed slots
pub fn dump_node(&mut self, slot_hash_key: &SlotHashKey) -> Vec<Slot> {
let parent = {
let mut node_to_dump = self
/// Assumes that `slot_hash_key` is not the `tree_root`
/// Returns the subtree originating from `slot_hash_key`
pub fn split_off(&mut self, slot_hash_key: &SlotHashKey) -> Self {
assert_ne!(self.tree_root, *slot_hash_key);
AshwinSekar marked this conversation as resolved.
Show resolved Hide resolved
let split_tree_root = {
let mut node_to_split_at = self
.fork_infos
.get_mut(slot_hash_key)
.expect("Slot hash key must exist in tree");
let split_tree_fork_info = node_to_split_at.clone();
// Remove stake to be aggregated up the tree
node_to_dump.stake_voted_subtree = 0;
node_to_dump.stake_voted_at = 0;
node_to_split_at.stake_voted_subtree = 0;
node_to_split_at.stake_voted_at = 0;
// Mark this node as invalid so that it cannot be chosen as best child
node_to_dump.latest_invalid_ancestor = Some(slot_hash_key.0);
node_to_dump.parent
node_to_split_at.latest_invalid_ancestor = Some(slot_hash_key.0);
split_tree_fork_info
};

let mut update_operations: UpdateOperations = BTreeMap::new();
// Aggregate up to the root
self.insert_aggregate_operations(&mut update_operations, *slot_hash_key);
self.process_update_operations(update_operations);

// Remove node + all children
// Remove node + all children and add to new tree
let mut split_tree_fork_infos = HashMap::new();
let mut to_visit = vec![*slot_hash_key];
let mut removed = Vec::new();

while !to_visit.is_empty() {
let current_node = to_visit.pop().unwrap();
Expand All @@ -449,22 +451,34 @@ impl HeaviestSubtreeForkChoice {
.remove(&current_node)
.expect("Node must exist in tree");

removed.push(current_node.0);
to_visit.extend(current_fork_info.children.into_iter());
to_visit.extend(current_fork_info.children.iter());
split_tree_fork_infos.insert(current_node, current_fork_info);
}

if let Some(parent) = parent {
// Remove link from parent
let parent_fork_info = self
.fork_infos
.get_mut(&parent)
.expect("Parent must exist in fork infos");
parent_fork_info.children.remove(slot_hash_key);
} else {
warn!("Dumped root of tree {:?}", slot_hash_key);
// Remove link from parent
let parent_fork_info = self
.fork_infos
.get_mut(&split_tree_root.parent.expect("Cannot split off from root"))
.expect("Parent must exist in fork infos");
parent_fork_info.children.remove(slot_hash_key);

// Update the root of the new tree with the proper info, now that we have finished
// aggregating
split_tree_fork_infos.insert(*slot_hash_key, split_tree_root);

// Split off the relevant votes to the new tree
let mut split_tree_latest_votes = self.latest_votes.clone();
split_tree_latest_votes.retain(|_, node| split_tree_fork_infos.contains_key(node));
self.latest_votes
.retain(|_, node| self.fork_infos.contains_key(node));

// Create a new tree from the split
HeaviestSubtreeForkChoice {
tree_root: *slot_hash_key,
fork_infos: split_tree_fork_infos,
latest_votes: split_tree_latest_votes,
last_root_time: Instant::now(),
}

removed
}

#[cfg(test)]
Expand Down Expand Up @@ -3474,7 +3488,7 @@ mod test {
}

#[test]
fn test_dump_node_simple() {
fn test_split_off_simple() {
let mut heaviest_subtree_fork_choice = setup_forks();
let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake);
Expand All @@ -3491,7 +3505,7 @@ mod test {
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
heaviest_subtree_fork_choice.dump_node(&(5, Hash::default()));
let tree = heaviest_subtree_fork_choice.split_off(&(5, Hash::default()));

assert_eq!(
3 * stake,
Expand Down Expand Up @@ -3519,10 +3533,18 @@ mod test {
None,
heaviest_subtree_fork_choice.stake_voted_subtree(&(6, Hash::default()))
);
assert_eq!(
stake,
tree.stake_voted_subtree(&(5, Hash::default())).unwrap()
);
assert_eq!(
stake,
tree.stake_voted_subtree(&(6, Hash::default())).unwrap()
);
}

#[test]
fn test_dump_node_unvoted() {
fn test_split_off_unvoted() {
let mut heaviest_subtree_fork_choice = setup_forks();
let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake);
Expand All @@ -3539,7 +3561,7 @@ mod test {
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
heaviest_subtree_fork_choice.dump_node(&(2, Hash::default()));
let tree = heaviest_subtree_fork_choice.split_off(&(2, Hash::default()));

assert_eq!(
4 * stake,
Expand All @@ -3561,10 +3583,12 @@ mod test {
None,
heaviest_subtree_fork_choice.stake_voted_subtree(&(4, Hash::default()))
);
assert_eq!(0, tree.stake_voted_subtree(&(2, Hash::default())).unwrap());
assert_eq!(0, tree.stake_voted_subtree(&(4, Hash::default())).unwrap());
}

#[test]
fn test_dump_node_on_best_path() {
fn test_split_off_on_best_path() {
let mut heaviest_subtree_fork_choice = setup_forks();
let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake);
Expand All @@ -3584,18 +3608,21 @@ mod test {

assert_eq!(6, heaviest_subtree_fork_choice.best_overall_slot().0);

heaviest_subtree_fork_choice.dump_node(&(6, Hash::default()));
let tree = heaviest_subtree_fork_choice.split_off(&(6, Hash::default()));
assert_eq!(5, heaviest_subtree_fork_choice.best_overall_slot().0);
assert_eq!(6, tree.best_overall_slot().0);

heaviest_subtree_fork_choice.dump_node(&(3, Hash::default()));
let tree = heaviest_subtree_fork_choice.split_off(&(3, Hash::default()));
assert_eq!(4, heaviest_subtree_fork_choice.best_overall_slot().0);
assert_eq!(5, tree.best_overall_slot().0);

heaviest_subtree_fork_choice.dump_node(&(1, Hash::default()));
let tree = heaviest_subtree_fork_choice.split_off(&(1, Hash::default()));
assert_eq!(0, heaviest_subtree_fork_choice.best_overall_slot().0);
assert_eq!(4, tree.best_overall_slot().0);
}

#[test]
fn test_dump_with_dups() {
fn test_split_off_with_dups() {
let (
mut heaviest_subtree_fork_choice,
duplicate_leaves_descended_from_4,
Expand Down Expand Up @@ -3629,16 +3656,17 @@ mod test {
heaviest_subtree_fork_choice.best_overall_slot(),
expected_best_slot_hash
);
heaviest_subtree_fork_choice.dump_node(&expected_best_slot_hash);
let tree = heaviest_subtree_fork_choice.split_off(&expected_best_slot_hash);

assert_eq!(
heaviest_subtree_fork_choice.best_overall_slot(),
duplicate_leaves_descended_from_4[1]
);
assert_eq!(tree.best_overall_slot(), expected_best_slot_hash);
}

#[test]
fn test_dump_subtree_with_dups() {
fn test_split_off_subtree_with_dups() {
let (
mut heaviest_subtree_fork_choice,
duplicate_leaves_descended_from_4,
Expand Down Expand Up @@ -3672,45 +3700,43 @@ mod test {
heaviest_subtree_fork_choice.best_overall_slot(),
expected_best_slot_hash
);
heaviest_subtree_fork_choice.dump_node(&(2, Hash::default()));
let tree = heaviest_subtree_fork_choice.split_off(&(2, Hash::default()));

assert_eq!(
heaviest_subtree_fork_choice.best_overall_slot(),
duplicate_leaves_descended_from_5[0]
);
assert_eq!(tree.best_overall_slot(), expected_best_slot_hash);
}

#[test]
fn test_dump_node_complicated() {
fn test_split_off_complicated() {
let mut heaviest_subtree_fork_choice = setup_complicated_forks();

let dump_and_check =
let split_and_check =
|tree: &mut HeaviestSubtreeForkChoice, node: Slot, nodes_to_check: Vec<Slot>| {
for &n in nodes_to_check.iter() {
assert!(tree.contains_block(&(n, Hash::default())));
}
tree.dump_node(&(node, Hash::default()));
let split_tree = tree.split_off(&(node, Hash::default()));
for &n in nodes_to_check.iter() {
assert!(!tree.contains_block(&(n, Hash::default())));
assert!(split_tree.contains_block(&(n, Hash::default())));
}
};

dump_and_check(
split_and_check(
&mut heaviest_subtree_fork_choice,
14,
vec![14, 15, 16, 22, 23, 17, 21, 18, 19, 20, 24, 25],
);
dump_and_check(&mut heaviest_subtree_fork_choice, 12, vec![12, 13]);
dump_and_check(
split_and_check(&mut heaviest_subtree_fork_choice, 12, vec![12, 13]);
split_and_check(
&mut heaviest_subtree_fork_choice,
2,
vec![2, 7, 8, 9, 33, 34, 10, 31, 32],
);
dump_and_check(
&mut heaviest_subtree_fork_choice,
0,
vec![0, 1, 5, 6, 3, 11, 26],
);
split_and_check(&mut heaviest_subtree_fork_choice, 1, vec![1, 5, 6]);
}

fn setup_forks() -> HeaviestSubtreeForkChoice {
Expand Down
28 changes: 28 additions & 0 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub type DuplicateSlotsResetSender = CrossbeamSender<Vec<(Slot, Hash)>>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Vec<(Slot, Hash)>>;
pub type ConfirmedSlotsSender = CrossbeamSender<Vec<Slot>>;
pub type ConfirmedSlotsReceiver = CrossbeamReceiver<Vec<Slot>>;
pub type DumpedSlotsSender = CrossbeamSender<Vec<(Slot, Hash)>>;
pub type DumpedSlotsReceiver = CrossbeamReceiver<Vec<(Slot, Hash)>>;
pub type OutstandingShredRepairs = OutstandingRequests<ShredRepairType>;

#[derive(Default, Debug)]
Expand Down Expand Up @@ -92,6 +94,7 @@ pub struct RepairStats {
#[derive(Default, Debug)]
pub struct RepairTiming {
pub set_root_elapsed: u64,
pub dump_slots_elapsed: u64,
pub get_votes_elapsed: u64,
pub add_votes_elapsed: u64,
pub get_best_orphans_elapsed: u64,
Expand All @@ -107,12 +110,14 @@ impl RepairTiming {
fn update(
&mut self,
set_root_elapsed: u64,
dump_slots_elapsed: u64,
get_votes_elapsed: u64,
add_votes_elapsed: u64,
build_repairs_batch_elapsed: u64,
batch_send_repairs_elapsed: u64,
) {
self.set_root_elapsed += set_root_elapsed;
self.dump_slots_elapsed += dump_slots_elapsed;
self.get_votes_elapsed += get_votes_elapsed;
self.add_votes_elapsed += add_votes_elapsed;
self.build_repairs_batch_elapsed += build_repairs_batch_elapsed;
Expand Down Expand Up @@ -208,6 +213,7 @@ impl RepairService {
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
dumped_slots_receiver: DumpedSlotsReceiver,
) -> Self {
let t_repair = {
let blockstore = blockstore.clone();
Expand All @@ -223,6 +229,7 @@ impl RepairService {
repair_info,
verified_vote_receiver,
&outstanding_requests,
dumped_slots_receiver,
)
})
.unwrap()
Expand All @@ -249,6 +256,7 @@ impl RepairService {
repair_info: RepairInfo,
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
dumped_slots_receiver: DumpedSlotsReceiver,
) {
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
let serve_repair = ServeRepair::new(
Expand All @@ -271,6 +279,7 @@ impl RepairService {
}

let mut set_root_elapsed;
let mut dump_slots_elapsed;
let mut get_votes_elapsed;
let mut add_votes_elapsed;

Expand All @@ -283,6 +292,23 @@ impl RepairService {
repair_weight.set_root(new_root);
set_root_elapsed.stop();

// Remove dumped slots from the weighting heuristic
dump_slots_elapsed = Measure::start("dump_slots_elapsed");
dumped_slots_receiver
.try_iter()
.for_each(|slot_hash_keys_to_dump| {
// Currently we don't use the correct_hash in repair. Since this dumped
// slot is DuplicateConfirmed, we have a >= 52% chance on receiving the
// correct version.
for (slot, _correct_hash) in slot_hash_keys_to_dump {
// `slot` is dumped in blockstore wanting to be repaired, we orphan it along with
// descendants while copying the weighting heurstic so that it can be
// repaired with correct ancestor information
repair_weight.split_off(slot);
}
});
dump_slots_elapsed.stop();

// Add new votes to the weighting heuristic
get_votes_elapsed = Measure::start("get_votes_elapsed");
let mut slot_to_vote_pubkeys: HashMap<Slot, Vec<Pubkey>> = HashMap::new();
Expand Down Expand Up @@ -366,6 +392,7 @@ impl RepairService {

repair_timing.update(
set_root_elapsed.as_us(),
dump_slots_elapsed.as_us(),
get_votes_elapsed.as_us(),
add_votes_elapsed.as_us(),
build_repairs_batch_elapsed.as_us(),
Expand Down Expand Up @@ -401,6 +428,7 @@ impl RepairService {
datapoint_info!(
"repair_service-repair_timing",
("set-root-elapsed", repair_timing.set_root_elapsed, i64),
("dump-slots-elapsed", repair_timing.dump_slots_elapsed, i64),
("get-votes-elapsed", repair_timing.get_votes_elapsed, i64),
("add-votes-elapsed", repair_timing.add_votes_elapsed, i64),
(
Expand Down
Loading