Skip to content

Commit

Permalink
replay: reload tower if set-identity during startup (#35173)
Browse files Browse the repository at this point in the history
* replay: reload tower if set-identity during startup

* pr feedback: add unit tests

* pr feedback: use tower.node_pubkey, more descriptive names
  • Loading branch information
AshwinSekar authored Feb 20, 2024
1 parent d48f277 commit befe8b9
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 25 deletions.
22 changes: 22 additions & 0 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,28 @@ impl Tower {
}
}

#[cfg(test)]
pub fn new_random(node_pubkey: Pubkey) -> Self {
use rand::Rng;

let mut rng = rand::thread_rng();
let root_slot = rng.gen();
let vote_state = VoteState::new_rand_for_tests(node_pubkey, root_slot);
let last_vote = VoteStateUpdate::from(
vote_state
.votes
.iter()
.map(|lv| (lv.slot(), lv.confirmation_count()))
.collect::<Vec<_>>(),
);
Self {
node_pubkey,
vote_state,
last_vote: VoteTransaction::CompactVoteStateUpdate(last_vote),
..Tower::default()
}
}

pub fn new_from_bankforks(
bank_forks: &BankForks,
node_pubkey: &Pubkey,
Expand Down
126 changes: 101 additions & 25 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,21 @@ impl ReplayStage {
let _exit = Finalizer::new(exit.clone());
let mut identity_keypair = cluster_info.keypair().clone();
let mut my_pubkey = identity_keypair.pubkey();
if my_pubkey != tower.node_pubkey {
// set-identity was called during the startup procedure, ensure the tower is consistent
// before starting the loop. further calls to set-identity will reload the tower in the loop
let my_old_pubkey = tower.node_pubkey;
tower = Self::load_tower(
tower_storage.as_ref(),
&my_pubkey,
&vote_account,
&bank_forks,
);
warn!(
"Identity changed during startup from {} to {}",
my_old_pubkey, my_pubkey
);
}
let (mut progress, mut heaviest_subtree_fork_choice) =
Self::initialize_progress_and_fork_choice_with_locked_bank_forks(
&bank_forks,
Expand Down Expand Up @@ -983,28 +998,12 @@ impl ReplayStage {
my_pubkey = identity_keypair.pubkey();

// Load the new identity's tower
tower = Tower::restore(tower_storage.as_ref(), &my_pubkey)
.and_then(|restored_tower| {
let root_bank = bank_forks.read().unwrap().root_bank();
let slot_history = root_bank.get_slot_history();
restored_tower.adjust_lockouts_after_replay(
root_bank.slot(),
&slot_history,
)
})
.unwrap_or_else(|err| {
if err.is_file_missing() {
Tower::new_from_bankforks(
&bank_forks.read().unwrap(),
&my_pubkey,
&vote_account,
)
} else {
error!("Failed to load tower for {}: {}", my_pubkey, err);
std::process::exit(1);
}
});

tower = Self::load_tower(
tower_storage.as_ref(),
&my_pubkey,
&vote_account,
&bank_forks,
);
// Ensure the validator can land votes with the new identity before
// becoming leader
has_new_vote_been_rooted = !wait_for_vote_to_start_leader;
Expand Down Expand Up @@ -1154,6 +1153,32 @@ impl ReplayStage {
})
}

fn load_tower(
tower_storage: &dyn TowerStorage,
node_pubkey: &Pubkey,
vote_account: &Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Tower {
Tower::restore(tower_storage, node_pubkey)
.and_then(|restored_tower| {
let root_bank = bank_forks.read().unwrap().root_bank();
let slot_history = root_bank.get_slot_history();
restored_tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history)
})
.unwrap_or_else(|err| {
if err.is_file_missing() {
Tower::new_from_bankforks(
&bank_forks.read().unwrap(),
node_pubkey,
vote_account,
)
} else {
error!("Failed to load tower for {}: {}", node_pubkey, err);
std::process::exit(1);
}
})
}

fn check_for_vote_only_mode(
heaviest_bank_slot: Slot,
forks_root: Slot,
Expand Down Expand Up @@ -4230,9 +4255,9 @@ pub(crate) mod tests {
crate::{
consensus::{
progress_map::{ValidatorStakeInfo, RETRANSMIT_BASE_DELAY_MS},
tower_storage::NullTowerStorage,
tower_storage::{FileTowerStorage, NullTowerStorage},
tree_diff::TreeDiff,
Tower,
Tower, VOTE_THRESHOLD_DEPTH,
},
replay_stage::ReplayStage,
vote_simulator::{self, VoteSimulator},
Expand All @@ -4254,7 +4279,7 @@ pub(crate) mod tests {
},
solana_runtime::{
accounts_background_service::AbsRequestSender,
commitment::BlockCommitment,
commitment::{BlockCommitment, VOTE_THRESHOLD_SIZE},
genesis_utils::{GenesisConfigInfo, ValidatorVoteKeypairs},
},
solana_sdk::{
Expand All @@ -4278,6 +4303,7 @@ pub(crate) mod tests {
iter,
sync::{atomic::AtomicU64, Arc, RwLock},
},
tempfile::tempdir,
trees::{tr, Tree},
};

Expand Down Expand Up @@ -8598,4 +8624,54 @@ pub(crate) mod tests {
assert_eq!(reset_fork, Some(4));
assert_eq!(failures, vec![HeaviestForkFailures::LockedOut(4),]);
}

#[test]
fn test_tower_load_missing() {
let tower_file = tempdir().unwrap().into_path();
let tower_storage = FileTowerStorage::new(tower_file);
let node_pubkey = Pubkey::new_unique();
let vote_account = Pubkey::new_unique();
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
let generate_votes = |pubkeys: Vec<Pubkey>| {
pubkeys
.into_iter()
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
.collect()
};
let (vote_simulator, _blockstore) =
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
let bank_forks = vote_simulator.bank_forks;

let tower =
ReplayStage::load_tower(&tower_storage, &node_pubkey, &vote_account, &bank_forks);
let expected_tower = Tower::new_for_tests(VOTE_THRESHOLD_DEPTH, VOTE_THRESHOLD_SIZE);
assert_eq!(tower.vote_state, expected_tower.vote_state);
assert_eq!(tower.node_pubkey, node_pubkey);
}

#[test]
fn test_tower_load() {
let tower_file = tempdir().unwrap().into_path();
let tower_storage = FileTowerStorage::new(tower_file);
let node_keypair = Keypair::new();
let node_pubkey = node_keypair.pubkey();
let vote_account = Pubkey::new_unique();
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
let generate_votes = |pubkeys: Vec<Pubkey>| {
pubkeys
.into_iter()
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
.collect()
};
let (vote_simulator, _blockstore) =
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
let bank_forks = vote_simulator.bank_forks;
let expected_tower = Tower::new_random(node_pubkey);
expected_tower.save(&tower_storage, &node_keypair).unwrap();

let tower =
ReplayStage::load_tower(&tower_storage, &node_pubkey, &vote_account, &bank_forks);
assert_eq!(tower.vote_state, expected_tower.vote_state);
assert_eq!(tower.node_pubkey, expected_tower.node_pubkey);
}
}
18 changes: 18 additions & 0 deletions sdk/program/src/vote/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,24 @@ impl VoteState {
}
}

pub fn new_rand_for_tests(node_pubkey: Pubkey, root_slot: Slot) -> Self {
let votes = (1..32)
.map(|x| LandedVote {
latency: 0,
lockout: Lockout::new_with_confirmation_count(
u64::from(x).saturating_add(root_slot),
32_u32.saturating_sub(x),
),
})
.collect();
Self {
node_pubkey,
root_slot: Some(root_slot),
votes,
..VoteState::default()
}
}

pub fn get_authorized_voter(&self, epoch: Epoch) -> Option<Pubkey> {
self.authorized_voters.get_authorized_voter(epoch)
}
Expand Down

0 comments on commit befe8b9

Please sign in to comment.