Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discard pre hard fork persisted tower if hard-forking #13536

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,9 @@ pub enum TowerError {

#[error("The tower is fatally inconsistent with blockstore: {0}")]
FatallyInconsistent(&'static str),

#[error("The tower is useless because of new hard fork: {0}")]
HardFork(Slot),
}

impl TowerError {
Expand Down
31 changes: 29 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,11 +720,38 @@ fn post_process_restored_tower(
ledger_path: &Path,
bank_forks: &BankForks,
) -> Tower {
let mut should_require_tower = config.require_tower;

restored_tower
.and_then(|tower| {
let root_bank = bank_forks.root_bank();
let slot_history = root_bank.get_slot_history();
tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history)
let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);

if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
if root_bank.slot() == wait_slot_for_supermajority {
// intentionally fail to restore tower; we're supposedly in a new hard fork; past
// out-of-chain vote state doesn't make sense at all
// what if --wait-for-supermajority again if the validator restarted?
let message = format!("Hardfork is detected; discarding tower restoration result: {:?}", tower);
datapoint_error!(
"tower_error",
(
"error",
message,
String
),
);
error!("{}", message);

// unconditionally relax tower requirement so that we can always restore tower
// from root bank.
should_require_tower = false;
return Err(crate::consensus::TowerError::HardFork(wait_slot_for_supermajority));
}
}

tower
})
.unwrap_or_else(|err| {
let voting_has_been_active =
Expand All @@ -739,7 +766,7 @@ fn post_process_restored_tower(
),
);
}
if config.require_tower && voting_has_been_active {
if should_require_tower && voting_has_been_active {
error!("Requested mandatory tower restore failed: {}", err);
error!(
"And there is an existing vote_account containing actual votes. \
Expand Down
10 changes: 10 additions & 0 deletions local-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,15 @@ pub trait Cluster {
fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo>;
fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo;
fn restart_node(&mut self, pubkey: &Pubkey, cluster_validator_info: ClusterValidatorInfo);
fn create_restart_context(
&mut self,
pubkey: &Pubkey,
cluster_validator_info: &mut ClusterValidatorInfo,
) -> (solana_core::cluster_info::Node, Option<ContactInfo>);
fn restart_node_with_context(
cluster_validator_info: ClusterValidatorInfo,
restart_context: (solana_core::cluster_info::Node, Option<ContactInfo>),
) -> ClusterValidatorInfo;
fn add_node(&mut self, pubkey: &Pubkey, cluster_validator_info: ClusterValidatorInfo);
fn exit_restart_node(&mut self, pubkey: &Pubkey, config: ValidatorConfig);
}
31 changes: 26 additions & 5 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,11 @@ impl Cluster for LocalCluster {
node
}

fn restart_node(&mut self, pubkey: &Pubkey, mut cluster_validator_info: ClusterValidatorInfo) {
fn create_restart_context(
&mut self,
pubkey: &Pubkey,
cluster_validator_info: &mut ClusterValidatorInfo,
) -> (solana_core::cluster_info::Node, Option<ContactInfo>) {
// Update the stored ContactInfo for this node
let node = Node::new_localhost_with_pubkey(&pubkey);
cluster_validator_info.info.contact_info = node.info.clone();
Expand All @@ -627,10 +631,28 @@ impl Cluster for LocalCluster {
self.entry_point_info = node.info.clone();
None
} else {
Some(&self.entry_point_info)
Some(self.entry_point_info.clone())
}
};

(node, entry_point_info)
}

fn restart_node(&mut self, pubkey: &Pubkey, mut cluster_validator_info: ClusterValidatorInfo) {
let restart_context = self.create_restart_context(pubkey, &mut cluster_validator_info);
let cluster_validator_info =
Self::restart_node_with_context(cluster_validator_info, restart_context);
self.add_node(pubkey, cluster_validator_info);
}

fn add_node(&mut self, pubkey: &Pubkey, cluster_validator_info: ClusterValidatorInfo) {
self.validators.insert(*pubkey, cluster_validator_info);
}

fn restart_node_with_context(
mut cluster_validator_info: ClusterValidatorInfo,
(node, entry_point_info): (Node, Option<ContactInfo>),
) -> ClusterValidatorInfo {
// Restart the node
let validator_info = &cluster_validator_info.info;
cluster_validator_info.config.account_paths =
Expand All @@ -641,12 +663,11 @@ impl Cluster for LocalCluster {
&validator_info.ledger_path,
&validator_info.voting_keypair.pubkey(),
vec![validator_info.voting_keypair.clone()],
entry_point_info,
entry_point_info.as_ref(),
&cluster_validator_info.config,
);

cluster_validator_info.validator = Some(restarted_node);
self.validators.insert(*pubkey, cluster_validator_info);
cluster_validator_info
}

fn exit_restart_node(&mut self, pubkey: &Pubkey, validator_config: ValidatorConfig) {
Expand Down
116 changes: 116 additions & 0 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,122 @@ fn test_future_tower_master_slave() {
do_test_future_tower(ClusterMode::MasterSlave);
}

#[test]
fn test_hard_fork_invalidates_tower() {
solana_logger::setup();

// First set up the cluster with 2 nodes
let slots_per_epoch = 2048;
let node_stakes = vec![60, 40];

let validator_keys = vec![
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
"2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8",
]
.iter()
.map(|s| (Arc::new(Keypair::from_base58_string(s)), true))
.take(node_stakes.len())
.collect::<Vec<_>>();
let validators = validator_keys
.iter()
.map(|(kp, _)| kp.pubkey())
.collect::<Vec<_>>();

let validator_a_pubkey = validators[0];
let validator_b_pubkey = validators[1];

let config = ClusterConfig {
cluster_lamports: 100_000,
node_stakes: node_stakes.clone(),
validator_configs: vec![ValidatorConfig::default(); node_stakes.len()],
validator_keys: Some(validator_keys),
slots_per_epoch,
stakers_slot_offset: slots_per_epoch,
skip_warmup_slots: true,
..ClusterConfig::default()
};
let cluster = std::sync::Arc::new(std::sync::Mutex::new(LocalCluster::new(&config)));

let val_a_ledger_path = cluster.lock().unwrap().ledger_path(&validator_a_pubkey);

let min_root = 15;
loop {
sleep(Duration::from_millis(100));

if let Some(root) = root_in_tower(&val_a_ledger_path, &validator_a_pubkey) {
if root >= min_root {
break;
}
}
}

let mut validator_a_info = cluster.lock().unwrap().exit_node(&validator_a_pubkey);
let mut validator_b_info = cluster.lock().unwrap().exit_node(&validator_b_pubkey);

// setup hard fork at slot < a previously rooted slot!
let hard_fork_slot = min_root - 5;
let hard_fork_slots = Some(vec![hard_fork_slot]);
let mut hard_forks = solana_sdk::hard_forks::HardForks::default();
hard_forks.register(hard_fork_slot);

let expected_shred_version = solana_sdk::shred_version::compute_shred_version(
&cluster.lock().unwrap().genesis_config.hash(),
Some(&hard_forks),
);

validator_a_info.config.new_hard_forks = hard_fork_slots.clone();
validator_a_info.config.wait_for_supermajority = Some(hard_fork_slot);
validator_a_info.config.expected_shred_version = Some(expected_shred_version);

validator_b_info.config.new_hard_forks = hard_fork_slots;
validator_b_info.config.wait_for_supermajority = Some(hard_fork_slot);
validator_b_info.config.expected_shred_version = Some(expected_shred_version);

// restart validator A first
let cluster_for_a = cluster.clone();
// Spawn a thread because wait_for_supermajority blocks in Validator::new()!
let thread = std::thread::spawn(move || {
let restart_context = cluster_for_a
.lock()
.unwrap()
.create_restart_context(&validator_a_pubkey, &mut validator_a_info);
let restarted_validator_info =
LocalCluster::restart_node_with_context(validator_a_info, restart_context);
cluster_for_a
.lock()
.unwrap()
.add_node(&validator_a_pubkey, restarted_validator_info);
});

// test validator A actually to wait for supermajority
let mut last_vote = None;
for _ in 0..10 {
sleep(Duration::from_millis(1000));

let new_last_vote = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap();
if let Some(last_vote) = last_vote {
assert_eq!(last_vote, new_last_vote);
} else {
last_vote = Some(new_last_vote);
}
}

// restart validator B normally
cluster
.lock()
.unwrap()
.restart_node(&validator_b_pubkey, validator_b_info);

// validator A should now start so join its thread here
thread.join().unwrap();

// new slots should be rooted after hard-fork cluster relaunch
cluster
.lock()
.unwrap()
.check_for_new_roots(16, &"hard fork");
}

#[test]
#[serial]
fn test_no_optimistic_confirmation_violation_with_tower() {
Expand Down