From cabd0a09c3f5406319da0e9cb249ea4716de3b31 Mon Sep 17 00:00:00 2001 From: sakridge Date: Mon, 22 Jun 2020 20:27:45 -0700 Subject: [PATCH] Weight repair slots based on vote stake (#10741) * Weight repair slots based on vote stake * Add test --- core/src/cluster_info_vote_listener.rs | 4 +- core/src/repair_service.rs | 163 +++++++++++++++++++++---- core/src/retransmit_stage.rs | 3 + core/src/tvu.rs | 1 + core/src/window_service.rs | 3 + 5 files changed, 147 insertions(+), 27 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 225e231144e4dd..84acd6898aba74 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -49,7 +49,7 @@ pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver>; pub struct SlotVoteTracker { voted: HashSet>, updates: Option>>, - total_stake: u64, + pub total_stake: u64, } impl SlotVoteTracker { @@ -62,7 +62,7 @@ impl SlotVoteTracker { #[derive(Default)] pub struct VoteTracker { // Map from a slot to a set of validators who have voted for that slot - slot_vote_trackers: RwLock>>>, + pub slot_vote_trackers: RwLock>>>, // Don't track votes from people who are not staked, acts as a spam filter epoch_authorized_voters: RwLock>>, leader_schedule_epoch: RwLock, diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 0fc032dd857f42..33bdf725ab9471 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -2,12 +2,16 @@ //! regularly finds missing shreds in the ledger and sends repair requests for those shreds use crate::{ cluster_info::ClusterInfo, + cluster_info_vote_listener::VoteTracker, cluster_slots::ClusterSlots, commitment::VOTE_THRESHOLD_SIZE, result::Result, serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE}, }; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; +use rand::distributions::{Distribution, WeightedIndex}; +use rand::{thread_rng, Rng, SeedableRng}; +use rand_chacha::ChaChaRng; use solana_ledger::{ blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta}, shred::Nonce, @@ -113,6 +117,7 @@ impl RepairService { cluster_info: Arc, repair_info: RepairInfo, cluster_slots: Arc, + vote_tracker: Arc, ) -> Self { let t_repair = Builder::new() .name("solana-repair-service".to_string()) @@ -124,6 +129,7 @@ impl RepairService { cluster_info, repair_info, &cluster_slots, + vote_tracker, ) }) .unwrap(); @@ -138,6 +144,7 @@ impl RepairService { cluster_info: Arc, repair_info: RepairInfo, cluster_slots: &ClusterSlots, + vote_tracker: Arc, ) { let serve_repair = ServeRepair::new(cluster_info.clone()); let id = cluster_info.id(); @@ -190,6 +197,7 @@ impl RepairService { root_bank.slot(), MAX_REPAIR_LENGTH, &duplicate_slot_repair_statuses, + &vote_tracker, ) }; @@ -271,6 +279,7 @@ impl RepairService { root: Slot, max_repairs: usize, duplicate_slot_repair_statuses: &HashMap, + vote_tracker: &Arc, ) -> Result> { // Slot height and shred indexes for shreds we want to repair let mut repairs: Vec = vec![]; @@ -280,10 +289,9 @@ impl RepairService { max_repairs, root, duplicate_slot_repair_statuses, + vote_tracker, ); - // TODO: Incorporate gossip to determine priorities for repair? - // Try to resolve orphans in blockstore let orphans = blockstore.orphans_iterator(root + 1).unwrap(); Self::generate_repairs_for_orphans(orphans, &mut repairs); @@ -526,27 +534,64 @@ impl RepairService { max_repairs: usize, slot: Slot, duplicate_slot_repair_statuses: &HashMap, + vote_tracker: &Arc, ) { + let mut seed = [0u8; 32]; + thread_rng().fill(&mut seed); + let rng = &mut ChaChaRng::from_seed(seed); let mut pending_slots = vec![slot]; while repairs.len() < max_repairs && !pending_slots.is_empty() { - let slot = pending_slots.pop().unwrap(); - if duplicate_slot_repair_statuses.contains_key(&slot) { - // These are repaired through a different path - continue; + pending_slots.retain(|slot| !duplicate_slot_repair_statuses.contains_key(slot)); + let mut next_pending_slots = vec![]; + let mut level_repairs = HashMap::new(); + for slot in &pending_slots { + if let Some(slot_meta) = blockstore.meta(*slot).unwrap() { + let new_repairs = Self::generate_repairs_for_slot( + blockstore, + *slot, + &slot_meta, + std::usize::MAX, + ); + if !new_repairs.is_empty() { + level_repairs.insert(*slot, new_repairs); + } + next_pending_slots.extend(slot_meta.next_slots); + } } - if let Some(slot_meta) = blockstore.meta(slot).unwrap() { - let new_repairs = Self::generate_repairs_for_slot( - blockstore, - slot, - &slot_meta, - max_repairs - repairs.len(), - ); - repairs.extend(new_repairs); - let next_slots = slot_meta.next_slots; - pending_slots.extend(next_slots); - } else { - break; + + if !level_repairs.is_empty() { + let mut slots_to_repair: Vec<_> = level_repairs.keys().cloned().collect(); + let mut weights: Vec<_> = { + let r_vote_tracker = vote_tracker.slot_vote_trackers.read().unwrap(); + slots_to_repair + .iter() + .map(|slot| { + if let Some(slot_vote_tracker) = r_vote_tracker.get(slot) { + std::cmp::max(slot_vote_tracker.read().unwrap().total_stake, 1) + } else { + // should it be something else? + 1 + } + }) + .collect() + }; + + let mut weighted_index = WeightedIndex::new(weights.clone()).unwrap(); + while repairs.len() < max_repairs && !level_repairs.is_empty() { + let index = weighted_index.sample(rng); + let slot_repairs = level_repairs.get_mut(&slots_to_repair[index]).unwrap(); + repairs.push(slot_repairs.remove(0)); + if slot_repairs.is_empty() { + level_repairs.remove(&slots_to_repair[index]); + slots_to_repair.remove(index); + weights.remove(index); + if !weights.is_empty() { + weighted_index = WeightedIndex::new(weights.clone()).unwrap(); + } + } + } } + pending_slots = next_pending_slots; } } @@ -633,8 +678,10 @@ mod test { let (shreds2, _) = make_slot_entries(5, 2, 1); shreds.extend(shreds2); blockstore.insert_shreds(shreds, None, false).unwrap(); + let vote_tracker = Arc::new(VoteTracker::default()); assert_eq!( - RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new()).unwrap(), + RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new(), &vote_tracker) + .unwrap(), vec![RepairType::HighestShred(0, 0), RepairType::Orphan(2)] ); } @@ -654,9 +701,11 @@ mod test { // any shreds for blockstore.insert_shreds(shreds, None, false).unwrap(); + let vote_tracker = Arc::new(VoteTracker::default()); // Check that repair tries to patch the empty slot assert_eq!( - RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new()).unwrap(), + RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new(), &vote_tracker) + .unwrap(), vec![RepairType::HighestShred(0, 0)] ); } @@ -701,9 +750,16 @@ mod test { }) .collect(); + let vote_tracker = Arc::new(VoteTracker::default()); assert_eq!( - RepairService::generate_repairs(&blockstore, 0, std::usize::MAX, &HashMap::new()) - .unwrap(), + RepairService::generate_repairs( + &blockstore, + 0, + std::usize::MAX, + &HashMap::new(), + &vote_tracker + ) + .unwrap(), expected ); @@ -712,7 +768,8 @@ mod test { &blockstore, 0, expected.len() - 2, - &HashMap::new() + &HashMap::new(), + &vote_tracker, ) .unwrap()[..], expected[0..expected.len() - 2] @@ -721,6 +778,55 @@ mod test { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + #[test] + pub fn test_repairs_distributed_across_slots() { + solana_logger::setup(); + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + + let num_entries_per_slot = 100; + + // Create some shreds + for i in 1..10 { + let (shreds, _) = make_slot_entries(i, 0, num_entries_per_slot as u64); + + // Only insert the first shred + blockstore + .insert_shreds(shreds[..1].to_vec(), None, false) + .unwrap(); + } + + let vote_tracker = Arc::new(VoteTracker::default()); + let repairs = RepairService::generate_repairs( + &blockstore, + 0, + num_entries_per_slot, + &HashMap::new(), + &vote_tracker, + ) + .unwrap(); + let mut repairs_slots = HashMap::new(); + for repair in repairs { + match repair { + RepairType::Shred(slot, _shred_index) => { + *repairs_slots.entry(slot).or_insert(0) += 1; + } + RepairType::HighestShred(slot, _shred_index) => { + *repairs_slots.entry(slot).or_insert(0) += 1; + } + RepairType::Orphan(slot) => { + *repairs_slots.entry(slot).or_insert(0) += 1; + } + } + } + for i in 1..10 { + assert!(repairs_slots.contains_key(&i)); + } + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + #[test] pub fn test_generate_highest_repair() { let blockstore_path = get_tmp_ledger_path!(); @@ -742,9 +848,16 @@ mod test { let expected: Vec = vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)]; + let vote_tracker = Arc::new(VoteTracker::default()); assert_eq!( - RepairService::generate_repairs(&blockstore, 0, std::usize::MAX, &HashMap::new()) - .unwrap(), + RepairService::generate_repairs( + &blockstore, + 0, + std::usize::MAX, + &HashMap::new(), + &vote_tracker + ) + .unwrap(), expected ); } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index d157f08b0bd2c0..0f1f7f7e4813e3 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -1,5 +1,6 @@ //! The `retransmit_stage` retransmits shreds between validators +use crate::cluster_info_vote_listener::VoteTracker; use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_slots::ClusterSlots, @@ -413,6 +414,7 @@ impl RetransmitStage { shred_version: u16, cluster_slots: Arc, duplicate_slots_reset_sender: DuplicateSlotsResetSender, + vote_tracker: Arc, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -457,6 +459,7 @@ impl RetransmitStage { rv && is_connected }, cluster_slots, + vote_tracker, ); let thread_hdls = t_retransmit; diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 40debe1ffe4f21..cb3fe94ff07685 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -145,6 +145,7 @@ impl Tvu { tvu_config.shred_version, cluster_slots.clone(), duplicate_slots_reset_sender, + vote_tracker.clone(), ); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index d539826eae20a6..42c359773776d8 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -3,6 +3,7 @@ //! use crate::{ cluster_info::ClusterInfo, + cluster_info_vote_listener::VoteTracker, cluster_slots::ClusterSlots, repair_response, repair_service::{RepairInfo, RepairService}, @@ -300,6 +301,7 @@ impl WindowService { leader_schedule_cache: &Arc, shred_filter: F, cluster_slots: Arc, + vote_tracker: Arc, ) -> WindowService where F: 'static @@ -316,6 +318,7 @@ impl WindowService { cluster_info.clone(), repair_info, cluster_slots, + vote_tracker, ); let (insert_sender, insert_receiver) = unbounded();