Skip to content

Commit

Permalink
Wen restart aggregate last voted fork slots (solana-labs#33892)
Browse files Browse the repository at this point in the history
* Push and aggregate RestartLastVotedForkSlots.

* Fix API and lint errors.

* Reduce clutter.

* Put my own LastVotedForkSlots into the aggregate.

* Write LastVotedForkSlots aggregate progress into local file.

* Fix typo and name constants.

* Fix flaky test.

* Clarify the comments.

* - Use constant for wait_for_supermajority
- Avoid waiting after first shred when repair is in wen_restart

* Fix delay_after_first_shred and remove loop in wen_restart.

* Read wen_restart slots inside the loop instead.

* Discard turbine shreds while in wen_restart in windows insert rather than
shred_fetch_stage.

* Use the new Gossip API.

* Rename slots_to_repair_for_wen_restart and a few others.

* Rename a few more and list all states.

* Pipe exit down to aggregate loop so we can exit early.

* Fix import of RestartLastVotedForkSlots.

* Use the new method to generate test bank.

* Make linter happy.

* Use new bank constructor for tests.

* Fix a bad merge.

* - add new const for wen_restart
- fix the test to cover more cases
- add generate_repairs_for_slot_not_throtted_by_tick and
  generate_repairs_for_slot_throtted_by_tick to make it readable

* Add initialize and put the main logic into a loop.

* Change aggregate interface and other fixes.

* Add failure tests and tests for state transition.

* Add more tests and add ability to recover from written records in
last_voted_fork_slots_aggregate.

* Various name changes.

* We don't really care what type of error is returned.

* Wait on expected progress message in proto file instead of sleep.

* Code reorganization and cleanup.

* Make linter happy.

* Add WenRestartError.

* Split WenRestartErrors into separate erros per state.

* Revert "Split WenRestartErrors into separate erros per state."

This reverts commit 4c920cb.

* Use individual functions when testing for failures.

* Move initialization errors into initialize().

* Use anyhow instead of thiserror to generate backtrace for error.

* Add missing Cargo.lock.

* Add error log when last_vote is missing in the tower storage.

* Change error log info.

* Change test to match exact error.
  • Loading branch information
wen-coding authored Mar 2, 2024
1 parent 608329b commit bfe44d9
Show file tree
Hide file tree
Showing 14 changed files with 1,703 additions and 107 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,7 @@ mod test {
ancestor_duplicate_slots_sender,
repair_validators: None,
repair_whitelist,
wen_restart_repair_slots: None,
};

let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
Expand Down
2 changes: 1 addition & 1 deletion core/src/repair/repair_generic_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ pub fn get_closest_completion(
continue;
}
let slot_meta = slot_meta_cache.get(&path_slot).unwrap().as_ref().unwrap();
let new_repairs = RepairService::generate_repairs_for_slot(
let new_repairs = RepairService::generate_repairs_for_slot_throttled_by_tick(
blockstore,
path_slot,
slot_meta,
Expand Down
183 changes: 152 additions & 31 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ pub struct RepairInfo {
pub repair_validators: Option<HashSet<Pubkey>>,
// Validators which should be given priority when serving
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
// A given list of slots to repair when in wen_restart
pub wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
}

pub struct RepairSlotRange {
Expand Down Expand Up @@ -397,17 +399,24 @@ impl RepairService {
);
add_votes_elapsed.stop();

let repairs = repair_weight.get_best_weighted_repairs(
blockstore,
root_bank.epoch_stakes_map(),
root_bank.epoch_schedule(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut repair_timing,
&mut best_repairs_stats,
);
let repairs = match repair_info.wen_restart_repair_slots.clone() {
Some(slots_to_repair) => Self::generate_repairs_for_wen_restart(
blockstore,
MAX_REPAIR_LENGTH,
&slots_to_repair.read().unwrap(),
),
None => repair_weight.get_best_weighted_repairs(
blockstore,
root_bank.epoch_stakes_map(),
root_bank.epoch_schedule(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut repair_timing,
&mut best_repairs_stats,
),
};

let mut popular_pruned_forks = repair_weight.get_popular_pruned_forks(
root_bank.epoch_stakes_map(),
Expand Down Expand Up @@ -618,32 +627,58 @@ impl RepairService {
}
}

pub fn generate_repairs_for_slot_throttled_by_tick(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
) -> Vec<ShredRepairType> {
Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, true)
}

pub fn generate_repairs_for_slot_not_throttled_by_tick(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
) -> Vec<ShredRepairType> {
Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, false)
}

/// If this slot is missing shreds generate repairs
pub fn generate_repairs_for_slot(
fn generate_repairs_for_slot(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
throttle_requests_by_shred_tick: bool,
) -> Vec<ShredRepairType> {
let defer_repair_threshold_ticks = if throttle_requests_by_shred_tick {
DEFER_REPAIR_THRESHOLD_TICKS
} else {
0
};
if max_repairs == 0 || slot_meta.is_full() {
vec![]
} else if slot_meta.consumed == slot_meta.received {
// check delay time of last shred
if let Some(reference_tick) = slot_meta
.received
.checked_sub(1)
.and_then(|index| blockstore.get_data_shred(slot, index).ok()?)
.and_then(|shred| shred::layout::get_reference_tick(&shred).ok())
.map(u64::from)
{
// System time is not monotonic
let ticks_since_first_insert = DEFAULT_TICKS_PER_SECOND
* timestamp().saturating_sub(slot_meta.first_shred_timestamp)
/ 1_000;
if ticks_since_first_insert
< reference_tick.saturating_add(DEFER_REPAIR_THRESHOLD_TICKS)
if throttle_requests_by_shred_tick {
// check delay time of last shred
if let Some(reference_tick) = slot_meta
.received
.checked_sub(1)
.and_then(|index| blockstore.get_data_shred(slot, index).ok()?)
.and_then(|shred| shred::layout::get_reference_tick(&shred).ok())
.map(u64::from)
{
return vec![];
// System time is not monotonic
let ticks_since_first_insert = DEFAULT_TICKS_PER_SECOND
* timestamp().saturating_sub(slot_meta.first_shred_timestamp)
/ 1_000;
if ticks_since_first_insert
< reference_tick.saturating_add(defer_repair_threshold_ticks)
{
return vec![];
}
}
}
vec![ShredRepairType::HighestShred(slot, slot_meta.received)]
Expand All @@ -652,7 +687,7 @@ impl RepairService {
.find_missing_data_indexes(
slot,
slot_meta.first_shred_timestamp,
DEFER_REPAIR_THRESHOLD_TICKS,
defer_repair_threshold_ticks,
slot_meta.consumed,
slot_meta.received,
max_repairs,
Expand All @@ -674,7 +709,7 @@ impl RepairService {
while repairs.len() < max_repairs && !pending_slots.is_empty() {
let slot = pending_slots.pop().unwrap();
if let Some(slot_meta) = blockstore.meta(slot).unwrap() {
let new_repairs = Self::generate_repairs_for_slot(
let new_repairs = Self::generate_repairs_for_slot_throttled_by_tick(
blockstore,
slot,
&slot_meta,
Expand All @@ -689,6 +724,33 @@ impl RepairService {
}
}

pub(crate) fn generate_repairs_for_wen_restart(
blockstore: &Blockstore,
max_repairs: usize,
slots: &Vec<Slot>,
) -> Vec<ShredRepairType> {
let mut repairs: Vec<ShredRepairType> = Vec::new();
for slot in slots {
if let Some(slot_meta) = blockstore.meta(*slot).unwrap() {
// When in wen_restart, turbine is not running, so there is
// no need to wait after first shred.
let new_repairs = Self::generate_repairs_for_slot_not_throttled_by_tick(
blockstore,
*slot,
&slot_meta,
max_repairs - repairs.len(),
);
repairs.extend(new_repairs);
} else {
repairs.push(ShredRepairType::HighestShred(*slot, 0));
}
if repairs.len() >= max_repairs {
break;
}
}
repairs
}

fn get_repair_peers(
cluster_info: Arc<ClusterInfo>,
cluster_slots: Arc<ClusterSlots>,
Expand Down Expand Up @@ -845,7 +907,7 @@ impl RepairService {
..SlotMeta::default()
});

let new_repairs = Self::generate_repairs_for_slot(
let new_repairs = Self::generate_repairs_for_slot_throttled_by_tick(
blockstore,
slot,
&meta,
Expand All @@ -867,7 +929,7 @@ impl RepairService {
// If the slot is full, no further need to repair this slot
None
} else {
Some(Self::generate_repairs_for_slot(
Some(Self::generate_repairs_for_slot_throttled_by_tick(
blockstore,
slot,
&slot_meta,
Expand Down Expand Up @@ -1548,4 +1610,63 @@ mod test {
);
assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
}

#[test]
fn test_generate_repairs_for_wen_restart() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let max_repairs = 3;

let slots: Vec<u64> = vec![2, 3, 5, 7];
let num_entries_per_slot = max_ticks_per_n_shreds(3, None) + 1;

let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
for (i, (mut slot_shreds, _)) in shreds.into_iter().enumerate() {
slot_shreds.remove(i);
blockstore.insert_shreds(slot_shreds, None, false).unwrap();
}

let mut slots_to_repair: Vec<Slot> = vec![];

// When slots_to_repair is empty, ignore all and return empty result.
let result = RepairService::generate_repairs_for_wen_restart(
&blockstore,
max_repairs,
&slots_to_repair,
);
assert!(result.is_empty());

// When asked to repair slot with missing shreds and some unknown slot, return correct results.
slots_to_repair = vec![3, 81];
let result = RepairService::generate_repairs_for_wen_restart(
&blockstore,
max_repairs,
&slots_to_repair,
);
assert_eq!(
result,
vec![
ShredRepairType::Shred(3, 1),
ShredRepairType::HighestShred(81, 0),
],
);

// Test that it will not generate more than max_repairs.e().unwrap();
slots_to_repair = vec![2, 82, 7, 83, 84];
let result = RepairService::generate_repairs_for_wen_restart(
&blockstore,
max_repairs,
&slots_to_repair,
);
assert_eq!(result.len(), max_repairs);
assert_eq!(
result,
vec![
ShredRepairType::Shred(2, 0),
ShredRepairType::HighestShred(82, 0),
ShredRepairType::HighestShred(7, 3),
],
);
}
}
2 changes: 1 addition & 1 deletion core/src/repair/repair_weighted_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub fn get_best_repair_shreds(
if let Some(slot_meta) = slot_meta {
match next {
Visit::Unvisited(slot) => {
let new_repairs = RepairService::generate_repairs_for_slot(
let new_repairs = RepairService::generate_repairs_for_slot_throttled_by_tick(
blockstore,
slot,
slot_meta,
Expand Down
3 changes: 3 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl Tvu {
repair_quic_endpoint_sender: AsyncSender<LocalRequest>,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
cluster_slots: Arc<ClusterSlots>,
wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
) -> Result<Self, String> {
let TvuSockets {
repair: repair_socket,
Expand Down Expand Up @@ -214,6 +215,7 @@ impl Tvu {
repair_whitelist: tvu_config.repair_whitelist,
cluster_info: cluster_info.clone(),
cluster_slots: cluster_slots.clone(),
wen_restart_repair_slots,
};
WindowService::new(
blockstore.clone(),
Expand Down Expand Up @@ -499,6 +501,7 @@ pub mod tests {
repair_quic_endpoint_sender,
outstanding_repair_requests,
cluster_slots,
None,
)
.expect("assume success");
exit.store(true, Ordering::Relaxed);
Expand Down
15 changes: 15 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ use {

const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
// Right now since we reuse the wait for supermajority code, the
// following threshold should always greater than or equal to
// WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT.
const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;

#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
#[strum(serialize_all = "kebab-case")]
Expand Down Expand Up @@ -1236,6 +1241,11 @@ impl Validator {
};

let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
let wen_restart_repair_slots = if in_wen_restart {
Some(Arc::new(RwLock::new(Vec::new())))
} else {
None
};
let tower = match process_blockstore.process_to_create_tower() {
Ok(tower) => {
info!("Tower state: {:?}", tower);
Expand Down Expand Up @@ -1310,6 +1320,7 @@ impl Validator {
repair_quic_endpoint_sender,
outstanding_repair_requests.clone(),
cluster_slots.clone(),
wen_restart_repair_slots.clone(),
)?;

if in_wen_restart {
Expand All @@ -1319,6 +1330,10 @@ impl Validator {
last_vote,
blockstore.clone(),
cluster_info.clone(),
bank_forks.clone(),
wen_restart_repair_slots.clone(),
WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
exit.clone(),
) {
Ok(()) => {
return Err("wen_restart phase one completedy".to_string());
Expand Down
Loading

0 comments on commit bfe44d9

Please sign in to comment.