From 873b49de20e17aa74d6229ee800734ecd2f941b2 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Sun, 25 Dec 2022 08:58:30 -0800 Subject: [PATCH] Plumb dumps from replay_stage to repair (#29058) * Plumb dumps from replay_stage to repair When dumping a slot from replay_stage as a result of duplicate or ancestor hashes, properly update repair subtrees to keep weighting and forks view accurate. * add test * pr comments --- core/src/ancestor_hashes_service.rs | 2 + core/src/heaviest_subtree_fork_choice.rs | 126 ++++++++++++++--------- core/src/repair_service.rs | 28 +++++ core/src/repair_weight.rs | 97 +++++++++++++++++ core/src/replay_stage.rs | 25 ++++- core/src/tvu.rs | 3 + core/src/window_service.rs | 4 +- 7 files changed, 231 insertions(+), 54 deletions(-) diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index 7a771bb9d7b841..de8fef31584831 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -1534,6 +1534,7 @@ mod test { ref cluster_slots, .. } = repair_info; + let (dumped_slots_sender, _dumped_slots_receiver) = unbounded(); // Add the responder to the eligible list for requests let responder_id = responder_info.id; @@ -1559,6 +1560,7 @@ mod test { &requester_blockstore, None, &mut PurgeRepairSlotCounter::default(), + &dumped_slots_sender, ); // Simulate making a request diff --git a/core/src/heaviest_subtree_fork_choice.rs b/core/src/heaviest_subtree_fork_choice.rs index 2b6d61308e76e5..d8b6bab5f82181 100644 --- a/core/src/heaviest_subtree_fork_choice.rs +++ b/core/src/heaviest_subtree_fork_choice.rs @@ -87,6 +87,7 @@ impl UpdateOperation { } } +#[derive(Clone)] struct ForkInfo { // Amount of stake that has voted for exactly this slot stake_voted_at: ForkWeight, @@ -413,24 +414,25 @@ impl HeaviestSubtreeForkChoice { .map(|(slot_hash, fork_info)| (slot_hash, fork_info.stake_voted_subtree)) } - /// Dump the node `slot_hash_key` and propagate the stake subtraction up to the root of the - /// tree. Children of `slot_hash_key` are implicitely dumped as well, as if we were able to - /// chain them to a defunct parent that implies that they are defunct as well (consistent with - /// bank forks). + /// Split off the node at `slot_hash_key` and propagate the stake subtraction up to the root of the + /// tree. /// - /// Returns all removed slots - pub fn dump_node(&mut self, slot_hash_key: &SlotHashKey) -> Vec { - let parent = { - let mut node_to_dump = self + /// Assumes that `slot_hash_key` is not the `tree_root` + /// Returns the subtree originating from `slot_hash_key` + pub fn split_off(&mut self, slot_hash_key: &SlotHashKey) -> Self { + assert_ne!(self.tree_root, *slot_hash_key); + let split_tree_root = { + let mut node_to_split_at = self .fork_infos .get_mut(slot_hash_key) .expect("Slot hash key must exist in tree"); + let split_tree_fork_info = node_to_split_at.clone(); // Remove stake to be aggregated up the tree - node_to_dump.stake_voted_subtree = 0; - node_to_dump.stake_voted_at = 0; + node_to_split_at.stake_voted_subtree = 0; + node_to_split_at.stake_voted_at = 0; // Mark this node as invalid so that it cannot be chosen as best child - node_to_dump.latest_invalid_ancestor = Some(slot_hash_key.0); - node_to_dump.parent + node_to_split_at.latest_invalid_ancestor = Some(slot_hash_key.0); + split_tree_fork_info }; let mut update_operations: UpdateOperations = BTreeMap::new(); @@ -438,9 +440,9 @@ impl HeaviestSubtreeForkChoice { self.insert_aggregate_operations(&mut update_operations, *slot_hash_key); self.process_update_operations(update_operations); - // Remove node + all children + // Remove node + all children and add to new tree + let mut split_tree_fork_infos = HashMap::new(); let mut to_visit = vec![*slot_hash_key]; - let mut removed = Vec::new(); while !to_visit.is_empty() { let current_node = to_visit.pop().unwrap(); @@ -449,22 +451,34 @@ impl HeaviestSubtreeForkChoice { .remove(¤t_node) .expect("Node must exist in tree"); - removed.push(current_node.0); - to_visit.extend(current_fork_info.children.into_iter()); + to_visit.extend(current_fork_info.children.iter()); + split_tree_fork_infos.insert(current_node, current_fork_info); } - if let Some(parent) = parent { - // Remove link from parent - let parent_fork_info = self - .fork_infos - .get_mut(&parent) - .expect("Parent must exist in fork infos"); - parent_fork_info.children.remove(slot_hash_key); - } else { - warn!("Dumped root of tree {:?}", slot_hash_key); + // Remove link from parent + let parent_fork_info = self + .fork_infos + .get_mut(&split_tree_root.parent.expect("Cannot split off from root")) + .expect("Parent must exist in fork infos"); + parent_fork_info.children.remove(slot_hash_key); + + // Update the root of the new tree with the proper info, now that we have finished + // aggregating + split_tree_fork_infos.insert(*slot_hash_key, split_tree_root); + + // Split off the relevant votes to the new tree + let mut split_tree_latest_votes = self.latest_votes.clone(); + split_tree_latest_votes.retain(|_, node| split_tree_fork_infos.contains_key(node)); + self.latest_votes + .retain(|_, node| self.fork_infos.contains_key(node)); + + // Create a new tree from the split + HeaviestSubtreeForkChoice { + tree_root: *slot_hash_key, + fork_infos: split_tree_fork_infos, + latest_votes: split_tree_latest_votes, + last_root_time: Instant::now(), } - - removed } #[cfg(test)] @@ -3474,7 +3488,7 @@ mod test { } #[test] - fn test_dump_node_simple() { + fn test_split_off_simple() { let mut heaviest_subtree_fork_choice = setup_forks(); let stake = 100; let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake); @@ -3491,7 +3505,7 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), ); - heaviest_subtree_fork_choice.dump_node(&(5, Hash::default())); + let tree = heaviest_subtree_fork_choice.split_off(&(5, Hash::default())); assert_eq!( 3 * stake, @@ -3519,10 +3533,18 @@ mod test { None, heaviest_subtree_fork_choice.stake_voted_subtree(&(6, Hash::default())) ); + assert_eq!( + stake, + tree.stake_voted_subtree(&(5, Hash::default())).unwrap() + ); + assert_eq!( + stake, + tree.stake_voted_subtree(&(6, Hash::default())).unwrap() + ); } #[test] - fn test_dump_node_unvoted() { + fn test_split_off_unvoted() { let mut heaviest_subtree_fork_choice = setup_forks(); let stake = 100; let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake); @@ -3539,7 +3561,7 @@ mod test { bank.epoch_stakes_map(), bank.epoch_schedule(), ); - heaviest_subtree_fork_choice.dump_node(&(2, Hash::default())); + let tree = heaviest_subtree_fork_choice.split_off(&(2, Hash::default())); assert_eq!( 4 * stake, @@ -3561,10 +3583,12 @@ mod test { None, heaviest_subtree_fork_choice.stake_voted_subtree(&(4, Hash::default())) ); + assert_eq!(0, tree.stake_voted_subtree(&(2, Hash::default())).unwrap()); + assert_eq!(0, tree.stake_voted_subtree(&(4, Hash::default())).unwrap()); } #[test] - fn test_dump_node_on_best_path() { + fn test_split_off_on_best_path() { let mut heaviest_subtree_fork_choice = setup_forks(); let stake = 100; let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake); @@ -3584,18 +3608,21 @@ mod test { assert_eq!(6, heaviest_subtree_fork_choice.best_overall_slot().0); - heaviest_subtree_fork_choice.dump_node(&(6, Hash::default())); + let tree = heaviest_subtree_fork_choice.split_off(&(6, Hash::default())); assert_eq!(5, heaviest_subtree_fork_choice.best_overall_slot().0); + assert_eq!(6, tree.best_overall_slot().0); - heaviest_subtree_fork_choice.dump_node(&(3, Hash::default())); + let tree = heaviest_subtree_fork_choice.split_off(&(3, Hash::default())); assert_eq!(4, heaviest_subtree_fork_choice.best_overall_slot().0); + assert_eq!(5, tree.best_overall_slot().0); - heaviest_subtree_fork_choice.dump_node(&(1, Hash::default())); + let tree = heaviest_subtree_fork_choice.split_off(&(1, Hash::default())); assert_eq!(0, heaviest_subtree_fork_choice.best_overall_slot().0); + assert_eq!(4, tree.best_overall_slot().0); } #[test] - fn test_dump_with_dups() { + fn test_split_off_with_dups() { let ( mut heaviest_subtree_fork_choice, duplicate_leaves_descended_from_4, @@ -3629,16 +3656,17 @@ mod test { heaviest_subtree_fork_choice.best_overall_slot(), expected_best_slot_hash ); - heaviest_subtree_fork_choice.dump_node(&expected_best_slot_hash); + let tree = heaviest_subtree_fork_choice.split_off(&expected_best_slot_hash); assert_eq!( heaviest_subtree_fork_choice.best_overall_slot(), duplicate_leaves_descended_from_4[1] ); + assert_eq!(tree.best_overall_slot(), expected_best_slot_hash); } #[test] - fn test_dump_subtree_with_dups() { + fn test_split_off_subtree_with_dups() { let ( mut heaviest_subtree_fork_choice, duplicate_leaves_descended_from_4, @@ -3672,45 +3700,43 @@ mod test { heaviest_subtree_fork_choice.best_overall_slot(), expected_best_slot_hash ); - heaviest_subtree_fork_choice.dump_node(&(2, Hash::default())); + let tree = heaviest_subtree_fork_choice.split_off(&(2, Hash::default())); assert_eq!( heaviest_subtree_fork_choice.best_overall_slot(), duplicate_leaves_descended_from_5[0] ); + assert_eq!(tree.best_overall_slot(), expected_best_slot_hash); } #[test] - fn test_dump_node_complicated() { + fn test_split_off_complicated() { let mut heaviest_subtree_fork_choice = setup_complicated_forks(); - let dump_and_check = + let split_and_check = |tree: &mut HeaviestSubtreeForkChoice, node: Slot, nodes_to_check: Vec| { for &n in nodes_to_check.iter() { assert!(tree.contains_block(&(n, Hash::default()))); } - tree.dump_node(&(node, Hash::default())); + let split_tree = tree.split_off(&(node, Hash::default())); for &n in nodes_to_check.iter() { assert!(!tree.contains_block(&(n, Hash::default()))); + assert!(split_tree.contains_block(&(n, Hash::default()))); } }; - dump_and_check( + split_and_check( &mut heaviest_subtree_fork_choice, 14, vec![14, 15, 16, 22, 23, 17, 21, 18, 19, 20, 24, 25], ); - dump_and_check(&mut heaviest_subtree_fork_choice, 12, vec![12, 13]); - dump_and_check( + split_and_check(&mut heaviest_subtree_fork_choice, 12, vec![12, 13]); + split_and_check( &mut heaviest_subtree_fork_choice, 2, vec![2, 7, 8, 9, 33, 34, 10, 31, 32], ); - dump_and_check( - &mut heaviest_subtree_fork_choice, - 0, - vec![0, 1, 5, 6, 3, 11, 26], - ); + split_and_check(&mut heaviest_subtree_fork_choice, 1, vec![1, 5, 6]); } fn setup_forks() -> HeaviestSubtreeForkChoice { diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index df19f841709e9f..f0929501d8a412 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -40,6 +40,8 @@ pub type DuplicateSlotsResetSender = CrossbeamSender>; pub type DuplicateSlotsResetReceiver = CrossbeamReceiver>; pub type ConfirmedSlotsSender = CrossbeamSender>; pub type ConfirmedSlotsReceiver = CrossbeamReceiver>; +pub type DumpedSlotsSender = CrossbeamSender>; +pub type DumpedSlotsReceiver = CrossbeamReceiver>; pub type OutstandingShredRepairs = OutstandingRequests; #[derive(Default, Debug)] @@ -92,6 +94,7 @@ pub struct RepairStats { #[derive(Default, Debug)] pub struct RepairTiming { pub set_root_elapsed: u64, + pub dump_slots_elapsed: u64, pub get_votes_elapsed: u64, pub add_votes_elapsed: u64, pub get_best_orphans_elapsed: u64, @@ -107,12 +110,14 @@ impl RepairTiming { fn update( &mut self, set_root_elapsed: u64, + dump_slots_elapsed: u64, get_votes_elapsed: u64, add_votes_elapsed: u64, build_repairs_batch_elapsed: u64, batch_send_repairs_elapsed: u64, ) { self.set_root_elapsed += set_root_elapsed; + self.dump_slots_elapsed += dump_slots_elapsed; self.get_votes_elapsed += get_votes_elapsed; self.add_votes_elapsed += add_votes_elapsed; self.build_repairs_batch_elapsed += build_repairs_batch_elapsed; @@ -208,6 +213,7 @@ impl RepairService { verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: Arc>, ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, + dumped_slots_receiver: DumpedSlotsReceiver, ) -> Self { let t_repair = { let blockstore = blockstore.clone(); @@ -223,6 +229,7 @@ impl RepairService { repair_info, verified_vote_receiver, &outstanding_requests, + dumped_slots_receiver, ) }) .unwrap() @@ -249,6 +256,7 @@ impl RepairService { repair_info: RepairInfo, verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: &RwLock, + dumped_slots_receiver: DumpedSlotsReceiver, ) { let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root()); let serve_repair = ServeRepair::new( @@ -271,6 +279,7 @@ impl RepairService { } let mut set_root_elapsed; + let mut dump_slots_elapsed; let mut get_votes_elapsed; let mut add_votes_elapsed; @@ -283,6 +292,23 @@ impl RepairService { repair_weight.set_root(new_root); set_root_elapsed.stop(); + // Remove dumped slots from the weighting heuristic + dump_slots_elapsed = Measure::start("dump_slots_elapsed"); + dumped_slots_receiver + .try_iter() + .for_each(|slot_hash_keys_to_dump| { + // Currently we don't use the correct_hash in repair. Since this dumped + // slot is DuplicateConfirmed, we have a >= 52% chance on receiving the + // correct version. + for (slot, _correct_hash) in slot_hash_keys_to_dump { + // `slot` is dumped in blockstore wanting to be repaired, we orphan it along with + // descendants while copying the weighting heurstic so that it can be + // repaired with correct ancestor information + repair_weight.split_off(slot); + } + }); + dump_slots_elapsed.stop(); + // Add new votes to the weighting heuristic get_votes_elapsed = Measure::start("get_votes_elapsed"); let mut slot_to_vote_pubkeys: HashMap> = HashMap::new(); @@ -366,6 +392,7 @@ impl RepairService { repair_timing.update( set_root_elapsed.as_us(), + dump_slots_elapsed.as_us(), get_votes_elapsed.as_us(), add_votes_elapsed.as_us(), build_repairs_batch_elapsed.as_us(), @@ -401,6 +428,7 @@ impl RepairService { datapoint_info!( "repair_service-repair_timing", ("set-root-elapsed", repair_timing.set_root_elapsed, i64), + ("dump-slots-elapsed", repair_timing.dump_slots_elapsed, i64), ("get-votes-elapsed", repair_timing.get_votes_elapsed, i64), ("add-votes-elapsed", repair_timing.add_votes_elapsed, i64), ( diff --git a/core/src/repair_weight.rs b/core/src/repair_weight.rs index 212758f29114eb..ed23cb9d12b79d 100644 --- a/core/src/repair_weight.rs +++ b/core/src/repair_weight.rs @@ -248,6 +248,32 @@ impl RepairWeight { repairs } + /// Split `slot` and descendants into an orphan tree in repair weighting + /// These orphaned slots should be removed from `unrooted_slots` as on proper repair these slots might + /// now be part of the rooted path + pub fn split_off(&mut self, slot: Slot) { + if slot == self.root { + error!("Trying to orphan root of repair tree {}", slot); + return; + } + if let Some(subtree_root) = self.slot_to_tree.get(&slot) { + if *subtree_root == slot { + info!("{} is already orphan, skipping", slot); + return; + } + let subtree = self + .trees + .get_mut(subtree_root) + .expect("subtree must exist"); + let orphaned_tree = subtree.split_off(&(slot, Hash::default())); + for ((orphaned_slot, _), _) in orphaned_tree.all_slots_stake_voted_subtree() { + self.unrooted_slots.remove(orphaned_slot); + self.slot_to_tree.insert(*orphaned_slot, slot); + } + self.trees.insert(slot, orphaned_tree); + } + } + pub fn set_root(&mut self, new_root: Slot) { // Roots should be monotonically increasing assert!(self.root <= new_root); @@ -664,6 +690,7 @@ impl RepairWeight { mod test { use { super::*, + itertools::Itertools, solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}, solana_runtime::{bank::Bank, bank_utils}, solana_sdk::hash::Hash, @@ -1399,6 +1426,76 @@ mod test { ); } + #[test] + fn test_orphan_slot_copy_weight() { + let (blockstore, _, mut repair_weight) = setup_orphan_repair_weight(); + let stake = 100; + let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(1, stake); + repair_weight.add_votes( + &blockstore, + vec![(6, vote_pubkeys)].into_iter(), + bank.epoch_stakes_map(), + bank.epoch_schedule(), + ); + + // Simulate dump from replay + blockstore.clear_unconfirmed_slot(3); + repair_weight.split_off(3); + blockstore.clear_unconfirmed_slot(10); + repair_weight.split_off(10); + + // Verify orphans + let mut orphans = repair_weight.trees.keys().copied().collect_vec(); + orphans.sort(); + assert_eq!(vec![0, 3, 8, 10, 20], orphans); + + // Verify weighting + assert_eq!( + 0, + repair_weight + .trees + .get(&8) + .unwrap() + .stake_voted_subtree(&(8, Hash::default())) + .unwrap() + ); + assert_eq!( + stake, + repair_weight + .trees + .get(&3) + .unwrap() + .stake_voted_subtree(&(3, Hash::default())) + .unwrap() + ); + assert_eq!( + 2 * stake, + repair_weight + .trees + .get(&10) + .unwrap() + .stake_voted_subtree(&(10, Hash::default())) + .unwrap() + ); + + // Get best orphans works as usual + let mut repairs = vec![]; + let mut processed_slots = vec![repair_weight.root].into_iter().collect(); + repair_weight.get_best_orphans( + &blockstore, + &mut processed_slots, + &mut repairs, + bank.epoch_stakes_map(), + bank.epoch_schedule(), + 4, + ); + assert_eq!(repairs.len(), 4); + assert_eq!(repairs[0].slot(), 10); + assert_eq!(repairs[1].slot(), 20); + assert_eq!(repairs[2].slot(), 3); + assert_eq!(repairs[3].slot(), 8); + } + fn setup_orphan_repair_weight() -> (Blockstore, Bank, RepairWeight) { let blockstore = setup_orphans(); let stake = 100; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 241603146d3c8f..9e6b11ab7c51e4 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -20,7 +20,7 @@ use { heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, progress_map::{ForkProgress, ProgressMap, PropagatedStats, ReplaySlotStats}, - repair_service::DuplicateSlotsResetReceiver, + repair_service::{DumpedSlotsSender, DuplicateSlotsResetReceiver}, rewards_recorder_service::RewardsRecorderSender, tower_storage::{SavedTower, SavedTowerVersions, TowerStorage}, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, @@ -401,6 +401,7 @@ impl ReplayStage { block_metadata_notifier: Option, log_messages_bytes_limit: Option, prioritization_fee_cache: Arc, + dumped_slots_sender: DumpedSlotsSender, ) -> Result { let mut tower = if let Some(process_blockstore) = maybe_process_blockstore { let tower = process_blockstore.process_to_create_tower()?; @@ -915,6 +916,7 @@ impl ReplayStage { &blockstore, poh_bank.map(|bank| bank.slot()), &mut purge_repair_slot_counter, + &dumped_slots_sender, ); dump_then_repair_correct_slots_time.stop(); @@ -1166,12 +1168,14 @@ impl ReplayStage { blockstore: &Blockstore, poh_bank_slot: Option, purge_repair_slot_counter: &mut PurgeRepairSlotCounter, + dumped_slots_sender: &DumpedSlotsSender, ) { if duplicate_slots_to_repair.is_empty() { return; } let root_bank = bank_forks.read().unwrap().root_bank(); + let mut dumped = vec![]; // TODO: handle if alternate version of descendant also got confirmed after ancestor was // confirmed, what happens then? Should probably keep track of purged list and skip things // in `duplicate_slots_to_repair` that have already been purged. Add test. @@ -1234,6 +1238,9 @@ impl ReplayStage { bank_forks, blockstore, ); + + dumped.push((*duplicate_slot, *correct_hash)); + let attempt_no = purge_repair_slot_counter .entry(*duplicate_slot) .and_modify(|x| *x += 1) @@ -1246,8 +1253,6 @@ impl ReplayStage { *duplicate_slot, *attempt_no, ); true - // TODO: Send signal to repair to repair the correct version of - // `duplicate_slot` with hash == `correct_hash` } else { warn!( "PoH bank for slot {} is building on duplicate slot {}", @@ -1261,6 +1266,10 @@ impl ReplayStage { // If we purged/repaired, then no need to keep the slot in the set of pending work !did_purge_repair }); + + // Notify repair of the dumped slots along with the correct hash + trace!("Dumped {} slots", dumped.len()); + dumped_slots_sender.send(dumped).unwrap(); } #[allow(clippy::too_many_arguments)] @@ -3594,6 +3603,7 @@ pub(crate) mod tests { vote_simulator::{self, VoteSimulator}, }, crossbeam_channel::unbounded, + itertools::Itertools, solana_entry::entry::{self, Entry}, solana_gossip::{cluster_info::Node, crds::Cursor}, solana_ledger::{ @@ -6038,6 +6048,11 @@ pub(crate) mod tests { duplicate_slots_to_repair.insert(1, Hash::new_unique()); duplicate_slots_to_repair.insert(2, Hash::new_unique()); let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default(); + let (dumped_slots_sender, dumped_slots_receiver) = unbounded(); + let should_be_dumped = duplicate_slots_to_repair + .iter() + .map(|(&s, &h)| (s, h)) + .collect_vec(); ReplayStage::dump_then_repair_correct_slots( &mut duplicate_slots_to_repair, @@ -6048,7 +6063,9 @@ pub(crate) mod tests { blockstore, None, &mut purge_repair_slot_counter, + &dumped_slots_sender, ); + assert_eq!(should_be_dumped, dumped_slots_receiver.recv().ok().unwrap()); let r_bank_forks = bank_forks.read().unwrap(); for slot in 0..=2 { @@ -6154,6 +6171,7 @@ pub(crate) mod tests { let mut ancestors = bank_forks.read().unwrap().ancestors(); let mut descendants = bank_forks.read().unwrap().descendants(); let old_descendants_of_2 = descendants.get(&2).unwrap().clone(); + let (dumped_slots_sender, _dumped_slots_receiver) = unbounded(); ReplayStage::dump_then_repair_correct_slots( &mut duplicate_slots_to_repair, @@ -6164,6 +6182,7 @@ pub(crate) mod tests { blockstore, None, &mut PurgeRepairSlotCounter::default(), + &dumped_slots_sender, ); // Check everything was purged properly diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 0e1d5b3021b7d9..21f584726caa89 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -185,6 +185,7 @@ impl Tvu { let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded(); let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) = unbounded(); + let (dumped_slots_sender, dumped_slots_receiver) = unbounded(); let window_service = { let epoch_schedule = *bank_forks.read().unwrap().working_bank().epoch_schedule(); let repair_info = RepairInfo { @@ -209,6 +210,7 @@ impl Tvu { completed_data_sets_sender, duplicate_slots_sender, ancestor_hashes_replay_update_receiver, + dumped_slots_receiver, ) }; @@ -292,6 +294,7 @@ impl Tvu { block_metadata_notifier, log_messages_bytes_limit, prioritization_fee_cache.clone(), + dumped_slots_sender, )?; let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 32855299753447..076b0c1b98f3f5 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -7,7 +7,7 @@ use { cluster_info_vote_listener::VerifiedVoteReceiver, completed_data_sets_service::CompletedDataSetsSender, repair_response, - repair_service::{OutstandingShredRepairs, RepairInfo, RepairService}, + repair_service::{DumpedSlotsReceiver, OutstandingShredRepairs, RepairInfo, RepairService}, result::{Error, Result}, }, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, @@ -317,6 +317,7 @@ impl WindowService { completed_data_sets_sender: CompletedDataSetsSender, duplicate_slots_sender: DuplicateSlotSender, ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, + dumped_slots_receiver: DumpedSlotsReceiver, ) -> WindowService { let outstanding_requests = Arc::>::default(); @@ -331,6 +332,7 @@ impl WindowService { verified_vote_receiver, outstanding_requests.clone(), ancestor_hashes_replay_update_receiver, + dumped_slots_receiver, ); let (duplicate_sender, duplicate_receiver) = unbounded();