Skip to content

Commit

Permalink
wen_restart: replace get_aggregate_result() with more methods (solana…
Browse files Browse the repository at this point in the history
…-labs#254)

* Replace AggregateResult with more methods.

* Rename slots_to_repair() to slots_to_repair_iter().
  • Loading branch information
wen-coding committed Mar 15, 2024
1 parent 83decd6 commit d24f70d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 45 deletions.
80 changes: 44 additions & 36 deletions wen-restart/src/last_voted_fork_slots_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ pub struct LastVotedForkSlotsAggregate {
slots_to_repair: HashSet<Slot>,
}

pub struct LastVotedForkSlotsAggregateResult {
pub slots_to_repair: Vec<Slot>,
pub active_percent: f64, /* 0 ~ 100.0 */
}

#[derive(Clone, Debug, PartialEq)]
pub struct LastVotedForkSlotsFinalResult {
Expand Down Expand Up @@ -144,22 +140,25 @@ impl LastVotedForkSlotsAggregate {
})
}

pub(crate) fn get_aggregate_result(&self) -> LastVotedForkSlotsAggregateResult {
LastVotedForkSlotsAggregateResult {
slots_to_repair: self.slots_to_repair.iter().cloned().collect(),
active_percent: self.total_active_stake() as f64
/ self.epoch_stakes.total_stake() as f64
* 100.0,
}
}

pub(crate) fn get_final_result(self) -> LastVotedForkSlotsFinalResult {
let total_active_stake = self.total_active_stake();
LastVotedForkSlotsFinalResult {
slots_stake_map: self.slots_stake_map,
total_active_stake,
}
}

pub(crate) fn active_percent(&self) -> f64 {
let total_stake = self.epoch_stakes.total_stake();
let total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| {
sum.saturating_add(Self::validator_stake(&self.epoch_stakes, pubkey))
});
total_active_stake as f64 / total_stake as f64 * 100.0
}

pub(crate) fn slots_to_repair_iter(&self) -> impl Iterator<Item = &Slot> {
self.slots_to_repair.iter()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -255,11 +254,15 @@ mod tests {
}),
);
}
let result = test_state.slots_aggregate.get_aggregate_result();
let mut expected_active_percent =
(initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0;
assert_eq!(result.active_percent, expected_active_percent);
assert!(result.slots_to_repair.is_empty());
assert_eq!(
test_state.slots_aggregate.active_percent(),
(initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0
);
assert!(test_state
.slots_aggregate
.slots_to_repair_iter()
.next()
.is_none());

let new_active_validator = test_state.validator_voting_keypairs
[initial_num_active_validators + 1]
Expand All @@ -285,11 +288,14 @@ mod tests {
wallclock: now,
}),
);
let result = test_state.slots_aggregate.get_aggregate_result();
expected_active_percent =
let expected_active_percent =
(initial_num_active_validators + 2) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0;
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
assert_eq!(
test_state.slots_aggregate.active_percent(),
expected_active_percent
);
let mut actual_slots =
Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned());
actual_slots.sort();
assert_eq!(actual_slots, test_state.last_voted_fork_slots);

Expand Down Expand Up @@ -317,9 +323,12 @@ mod tests {
wallclock: now,
}),
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
assert_eq!(
test_state.slots_aggregate.active_percent(),
expected_active_percent
);
let mut actual_slots =
Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned());
actual_slots.sort();
assert_eq!(actual_slots, vec![root_slot + 1]);

Expand All @@ -338,9 +347,12 @@ mod tests {
),
None,
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
assert_eq!(
test_state.slots_aggregate.active_percent(),
expected_active_percent
);
let mut actual_slots =
Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned());
actual_slots.sort();
assert_eq!(actual_slots, vec![root_slot + 1]);
}
Expand All @@ -357,8 +369,7 @@ mod tests {
last_vote_bankhash: last_vote_bankhash.to_string(),
shred_version: SHRED_VERSION as u32,
};
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 10.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 10.0);
assert_eq!(
test_state
.slots_aggregate
Expand All @@ -372,8 +383,7 @@ mod tests {
.unwrap(),
Some(record.clone()),
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 20.0);
// Now if you get the same result from Gossip again, it should be ignored.
assert_eq!(
test_state.slots_aggregate.aggregate(
Expand Down Expand Up @@ -417,8 +427,7 @@ mod tests {
}),
);
// percentage doesn't change since it's a replace.
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 20.0);

// Record from validator with zero stake should be ignored.
assert_eq!(
Expand All @@ -437,8 +446,7 @@ mod tests {
None,
);
// percentage doesn't change since the previous aggregate is ignored.
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 20.0);
}

#[test]
Expand Down
21 changes: 12 additions & 9 deletions wen-restart/src/wen_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,34 +191,37 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
.insert(from, record);
}
}
let result = last_voted_fork_slots_aggregate.get_aggregate_result();
// Because all operations on the aggregate are called from this single thread, we can
// fetch all results separately without worrying about them being out of sync. We can
// also use returned iterator without the vector changing underneath us.
let active_percent = last_voted_fork_slots_aggregate.active_percent();
let mut filtered_slots: Vec<Slot>;
{
let my_bank_forks = bank_forks.read().unwrap();
filtered_slots = result
.slots_to_repair
.into_iter()
filtered_slots = last_voted_fork_slots_aggregate
.slots_to_repair_iter()
.filter(|slot| {
if slot <= &root_slot || is_full_slots.contains(slot) {
if *slot <= &root_slot || is_full_slots.contains(*slot) {
return false;
}
let is_full = my_bank_forks
.get(*slot)
.get(**slot)
.map_or(false, |bank| bank.is_frozen());
if is_full {
is_full_slots.insert(*slot);
is_full_slots.insert(**slot);
}
!is_full
})
.cloned()
.collect();
}
filtered_slots.sort();
info!(
"Active peers: {} Slots to repair: {:?}",
result.active_percent, &filtered_slots
active_percent, &filtered_slots
);
if filtered_slots.is_empty()
&& result.active_percent > wait_for_supermajority_threshold_percent as f64
&& active_percent > wait_for_supermajority_threshold_percent as f64
{
*wen_restart_repair_slots.write().unwrap() = vec![];
break;
Expand Down

0 comments on commit d24f70d

Please sign in to comment.