From f5a3f2476a61111a44365b85a59f318debef7780 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Thu, 14 Mar 2024 19:01:17 -0700 Subject: [PATCH] wen_restart: replace get_aggregate_result() with more methods (#254) * Replace AggregateResult with more methods. * Rename slots_to_repair() to slots_to_repair_iter(). --- .../src/last_voted_fork_slots_aggregate.rs | 72 ++++++++++--------- wen-restart/src/wen_restart.rs | 21 +++--- 2 files changed, 50 insertions(+), 43 deletions(-) diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index 8a26c4d315f419..96127c1a9fc2a3 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -22,11 +22,6 @@ pub struct LastVotedForkSlotsAggregate { slots_to_repair: HashSet, } -pub struct LastVotedForkSlotsAggregateResult { - pub slots_to_repair: Vec, - pub active_percent: f64, /* 0 ~ 100.0 */ -} - impl LastVotedForkSlotsAggregate { pub(crate) fn new( root_slot: Slot, @@ -131,16 +126,16 @@ impl LastVotedForkSlotsAggregate { Some(record) } - pub(crate) fn get_aggregate_result(&self) -> LastVotedForkSlotsAggregateResult { + pub(crate) fn active_percent(&self) -> f64 { let total_stake = self.epoch_stakes.total_stake(); let total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| { sum.saturating_add(Self::validator_stake(&self.epoch_stakes, pubkey)) }); - let active_percent = total_active_stake as f64 / total_stake as f64 * 100.0; - LastVotedForkSlotsAggregateResult { - slots_to_repair: self.slots_to_repair.iter().cloned().collect(), - active_percent, - } + total_active_stake as f64 / total_stake as f64 * 100.0 + } + + pub(crate) fn slots_to_repair_iter(&self) -> impl Iterator { + self.slots_to_repair.iter() } } @@ -237,11 +232,15 @@ mod tests { }), ); } - let result = test_state.slots_aggregate.get_aggregate_result(); - let mut expected_active_percent = - (initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0; - assert_eq!(result.active_percent, expected_active_percent); - assert!(result.slots_to_repair.is_empty()); + assert_eq!( + test_state.slots_aggregate.active_percent(), + (initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0 + ); + assert!(test_state + .slots_aggregate + .slots_to_repair_iter() + .next() + .is_none()); let new_active_validator = test_state.validator_voting_keypairs [initial_num_active_validators + 1] @@ -267,11 +266,14 @@ mod tests { wallclock: now, }), ); - let result = test_state.slots_aggregate.get_aggregate_result(); - expected_active_percent = + let expected_active_percent = (initial_num_active_validators + 2) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0; - assert_eq!(result.active_percent, expected_active_percent); - let mut actual_slots = Vec::from_iter(result.slots_to_repair); + assert_eq!( + test_state.slots_aggregate.active_percent(), + expected_active_percent + ); + let mut actual_slots = + Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned()); actual_slots.sort(); assert_eq!(actual_slots, test_state.last_voted_fork_slots); @@ -299,9 +301,12 @@ mod tests { wallclock: now, }), ); - let result = test_state.slots_aggregate.get_aggregate_result(); - assert_eq!(result.active_percent, expected_active_percent); - let mut actual_slots = Vec::from_iter(result.slots_to_repair); + assert_eq!( + test_state.slots_aggregate.active_percent(), + expected_active_percent + ); + let mut actual_slots = + Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned()); actual_slots.sort(); assert_eq!(actual_slots, vec![root_slot + 1]); @@ -320,9 +325,12 @@ mod tests { ), None, ); - let result = test_state.slots_aggregate.get_aggregate_result(); - assert_eq!(result.active_percent, expected_active_percent); - let mut actual_slots = Vec::from_iter(result.slots_to_repair); + assert_eq!( + test_state.slots_aggregate.active_percent(), + expected_active_percent + ); + let mut actual_slots = + Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned()); actual_slots.sort(); assert_eq!(actual_slots, vec![root_slot + 1]); } @@ -339,8 +347,7 @@ mod tests { last_vote_bankhash: last_vote_bankhash.to_string(), shred_version: SHRED_VERSION as u32, }; - let result = test_state.slots_aggregate.get_aggregate_result(); - assert_eq!(result.active_percent, 10.0); + assert_eq!(test_state.slots_aggregate.active_percent(), 10.0); assert_eq!( test_state .slots_aggregate @@ -354,8 +361,7 @@ mod tests { .unwrap(), Some(record.clone()), ); - let result = test_state.slots_aggregate.get_aggregate_result(); - assert_eq!(result.active_percent, 20.0); + assert_eq!(test_state.slots_aggregate.active_percent(), 20.0); // Now if you get the same result from Gossip again, it should be ignored. assert_eq!( test_state.slots_aggregate.aggregate( @@ -399,8 +405,7 @@ mod tests { }), ); // percentage doesn't change since it's a replace. - let result = test_state.slots_aggregate.get_aggregate_result(); - assert_eq!(result.active_percent, 20.0); + assert_eq!(test_state.slots_aggregate.active_percent(), 20.0); // Record from validator with zero stake should be ignored. assert_eq!( @@ -419,8 +424,7 @@ mod tests { None, ); // percentage doesn't change since the previous aggregate is ignored. - let result = test_state.slots_aggregate.get_aggregate_result(); - assert_eq!(result.active_percent, 20.0); + assert_eq!(test_state.slots_aggregate.active_percent(), 20.0); } #[test] diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index b14b7e4e840c61..5f7d66a788b3d9 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -154,34 +154,37 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots( .insert(from, record); } } - let result = last_voted_fork_slots_aggregate.get_aggregate_result(); + // Because all operations on the aggregate are called from this single thread, we can + // fetch all results separately without worrying about them being out of sync. We can + // also use returned iterator without the vector changing underneath us. + let active_percent = last_voted_fork_slots_aggregate.active_percent(); let mut filtered_slots: Vec; { let my_bank_forks = bank_forks.read().unwrap(); - filtered_slots = result - .slots_to_repair - .into_iter() + filtered_slots = last_voted_fork_slots_aggregate + .slots_to_repair_iter() .filter(|slot| { - if slot <= &root_slot || is_full_slots.contains(slot) { + if *slot <= &root_slot || is_full_slots.contains(*slot) { return false; } let is_full = my_bank_forks - .get(*slot) + .get(**slot) .map_or(false, |bank| bank.is_frozen()); if is_full { - is_full_slots.insert(*slot); + is_full_slots.insert(**slot); } !is_full }) + .cloned() .collect(); } filtered_slots.sort(); info!( "Active peers: {} Slots to repair: {:?}", - result.active_percent, &filtered_slots + active_percent, &filtered_slots ); if filtered_slots.is_empty() - && result.active_percent > wait_for_supermajority_threshold_percent as f64 + && active_percent > wait_for_supermajority_threshold_percent as f64 { *wen_restart_repair_slots.write().unwrap() = vec![]; break;