diff --git a/core/benches/consensus.rs b/core/benches/consensus.rs new file mode 100644 index 00000000000000..a6954ce8c23cad --- /dev/null +++ b/core/benches/consensus.rs @@ -0,0 +1,34 @@ +#![feature(test)] + +extern crate solana_core; +extern crate test; + +use solana_core::consensus::Tower; +use solana_ledger::bank_forks::BankForks; +use solana_runtime::bank::Bank; +use solana_sdk::{ + pubkey::Pubkey, + signature::{Keypair, Signer}, +}; +use std::sync::Arc; +use tempfile::TempDir; +use test::Bencher; + +#[bench] +fn bench_save_tower(bench: &mut Bencher) { + let dir = TempDir::new().unwrap(); + let path = dir.path(); + + let vote_account_pubkey = &Pubkey::default(); + let node_keypair = Arc::new(Keypair::new()); + let tower = Tower::new( + &node_keypair.pubkey(), + &vote_account_pubkey, + &BankForks::new(0, Bank::default()), + &path, + ); + + bench.iter(move || { + tower.save(&node_keypair).unwrap(); + }); +} diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 9c19aded03d54b..d61fda45275865 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -3,27 +3,34 @@ use crate::{ pubkey_references::PubkeyReferences, }; use chrono::prelude::*; -use solana_ledger::bank_forks::BankForks; +use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore, blockstore_db}; use solana_runtime::bank::Bank; use solana_sdk::{ account::Account, clock::{Slot, UnixTimestamp}, hash::Hash, pubkey::Pubkey, + signature::{Keypair, Signature, Signer}, }; use solana_vote_program::vote_state::{ BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY, TIMESTAMP_SLOT_INTERVAL, }; use std::{ collections::{BTreeMap, HashMap, HashSet}, + fs::{self, File}, + io::BufReader, ops::Bound::{Included, Unbounded}, + path::{Path, PathBuf}, sync::Arc, }; +use thiserror::Error; pub const VOTE_THRESHOLD_DEPTH: usize = 8; pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; pub const SWITCH_FORK_THRESHOLD: f64 = 0.38; +pub type Result = std::result::Result; + #[derive(Default, Debug, Clone)] pub struct StakeLockout { lockout: u64, @@ -42,6 +49,7 @@ impl StakeLockout { } } +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] pub struct Tower { node_pubkey: Pubkey, threshold_depth: usize, @@ -49,6 +57,8 @@ pub struct Tower { lockouts: VoteState, last_vote: Vote, last_timestamp: BlockTimestamp, + #[serde(skip)] + save_path: PathBuf, } impl Default for Tower { @@ -60,19 +70,29 @@ impl Default for Tower { lockouts: VoteState::default(), last_vote: Vote::default(), last_timestamp: BlockTimestamp::default(), + save_path: PathBuf::default(), } } } impl Tower { - pub fn new(node_pubkey: &Pubkey, vote_account_pubkey: &Pubkey, bank_forks: &BankForks) -> Self { - let mut tower = Self::new_with_key(node_pubkey); - + pub fn new( + node_pubkey: &Pubkey, + vote_account_pubkey: &Pubkey, + bank_forks: &BankForks, + save_path: &Path, + ) -> Self { + let mut tower = Self { + node_pubkey: *node_pubkey, + save_path: PathBuf::from(save_path), + ..Tower::default() + }; tower.initialize_lockouts_from_bank_forks(&bank_forks, vote_account_pubkey); tower } + #[cfg(test)] pub fn new_with_key(node_pubkey: &Pubkey) -> Self { Self { node_pubkey: *node_pubkey, @@ -290,6 +310,14 @@ impl Tower { self.lockouts.root_slot } + pub fn last_lockout_vote_slot(&self) -> Option { + self.lockouts + .votes + .iter() + .max_by(|x, y| x.slot.cmp(&y.slot)) + .map(|v| v.slot) + } + // a slot is not recent if it's older than the newest vote we have pub fn is_recent(&self, slot: u64) -> bool { if let Some(last_vote) = self.lockouts.votes.back() { @@ -350,12 +378,15 @@ impl Tower { .slots .last() .map(|last_vote| { - let last_vote_ancestors = ancestors.get(&last_vote).unwrap(); + let last_vote_ancestors = match ancestors.get(&last_vote) { + None => return true, // last_vote may not be in ancestors after restarting from a snapshot + Some(last_vote_ancestors) => last_vote_ancestors, + }; let switch_slot_ancestors = ancestors.get(&switch_slot).unwrap(); if switch_slot == *last_vote || switch_slot_ancestors.contains(last_vote) { // If the `switch_slot is a descendant of the last vote, - // no switching proof is neceessary + // no switching proof is necessary return true; } @@ -533,6 +564,14 @@ impl Tower { bank_weights.pop().map(|b| b.2) } + pub fn adjust_lockouts_if_newer_root(&mut self, root_slot: Slot) { + let my_root_slot = self.lockouts.root_slot.unwrap_or(0); + if root_slot > my_root_slot { + self.lockouts.root_slot = Some(root_slot); + self.lockouts.votes.retain(|v| v.slot > root_slot); + } + } + fn initialize_lockouts_from_bank_forks( &mut self, bank_forks: &BankForks, @@ -556,6 +595,12 @@ impl Tower { "vote account's node_pubkey doesn't match", ); self.lockouts = vote_state; + } else { + info!( + "vote account({}) not found in heaviest bank (slot={})", + vote_account_pubkey, + bank.slot() + ); } } } @@ -574,6 +619,113 @@ impl Tower { None } } + + pub fn get_filename(path: &Path, node_pubkey: &Pubkey) -> PathBuf { + PathBuf::from(path) + .join(format!("tower-{}", node_pubkey)) + .with_extension("bin") + } + + pub fn save(&self, node_keypair: &Arc) -> Result<()> { + if self.node_pubkey != node_keypair.pubkey() { + return Err(TowerError::WrongTower(format!( + "node_pubkey is {:?} but found tower for {:?}", + node_keypair.pubkey(), + self.node_pubkey + ))); + } + + fs::create_dir_all(&self.save_path)?; + let filename = Self::get_filename(&self.save_path, &self.node_pubkey); + let new_filename = filename.with_extension("new"); + { + let mut file = File::create(&new_filename)?; + let saveable_tower = SavedTower::new(self, node_keypair)?; + bincode::serialize_into(&mut file, &saveable_tower)?; + } + fs::rename(&new_filename, &filename)?; + Ok(()) + } + + pub fn restore(save_path: &Path, node_pubkey: &Pubkey) -> Result { + let filename = Self::get_filename(save_path, node_pubkey); + + let file = File::open(&filename)?; + let mut stream = BufReader::new(file); + + let saved_tower: SavedTower = bincode::deserialize_from(&mut stream)?; + if !saved_tower.verify(node_pubkey) { + return Err(TowerError::InvalidSignature); + } + let mut tower = saved_tower.deserialize()?; + tower.save_path = save_path.to_path_buf(); + + // check that the tower actually belongs to this node + if &tower.node_pubkey != node_pubkey { + return Err(TowerError::WrongTower(format!( + "node_pubkey is {:?} but found tower for {:?}", + node_pubkey, tower.node_pubkey + ))); + } + Ok(tower) + } +} + +#[derive(Error, Debug)] +pub enum TowerError { + #[error("IO Error: {0}")] + IOError(#[from] std::io::Error), + + #[error("Serialization Error: {0}")] + SerializeError(#[from] Box), + + #[error("The signature on the saved tower is invalid")] + InvalidSignature, + + #[error("The tower does not match this validator: {0}")] + WrongTower(String), +} + +#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq)] +pub struct SavedTower { + signature: Signature, + data: Vec, +} + +impl SavedTower { + pub fn new(tower: &Tower, keypair: &Arc) -> Result { + let data = bincode::serialize(tower)?; + let signature = keypair.sign_message(&data); + Ok(Self { data, signature }) + } + + pub fn verify(&self, pubkey: &Pubkey) -> bool { + self.signature.verify(pubkey.as_ref(), &self.data) + } + + pub fn deserialize(&self) -> Result { + bincode::deserialize(&self.data).map_err(|e| e.into()) + } +} + +// Given an untimely crash, tower may have roots that are not reflected in blockstore because +// `ReplayState::handle_votable_bank()` saves tower before setting blockstore roots +pub fn reconcile_blockstore_roots_with_tower( + tower: &Tower, + blockstore: &Blockstore, +) -> blockstore_db::Result<()> { + if let Some(tower_root) = tower.root() { + let last_blockstore_root = blockstore.last_root(); + if last_blockstore_root < tower_root { + let new_roots: Vec<_> = blockstore + .slot_meta_iterator(last_blockstore_root + 1)? + .map(|(slot, _)| slot) + .take_while(|slot| *slot <= tower_root) + .collect(); + blockstore.set_roots(&new_roots)? + } + } + Ok(()) } #[cfg(test)] @@ -585,7 +737,9 @@ pub mod test { progress_map::ForkProgress, replay_stage::{HeaviestForkFailures, ReplayStage}, }; - use solana_ledger::bank_forks::BankForks; + use solana_ledger::{ + bank_forks::BankForks, blockstore::make_slot_entries, get_tmp_ledger_path, + }; use solana_runtime::{ bank::Bank, genesis_utils::{ @@ -604,10 +758,13 @@ pub mod test { }; use std::{ collections::HashMap, + fs::{remove_file, OpenOptions}, + io::{Read, Seek, SeekFrom, Write}, rc::Rc, sync::RwLock, {thread::sleep, time::Duration}, }; + use tempfile::TempDir; use trees::{tr, Tree, TreeWalk}; pub(crate) struct VoteSimulator { @@ -1718,4 +1875,113 @@ pub mod test { .is_some()); assert!(tower.last_timestamp.timestamp > timestamp); } + + fn run_test_load_tower_snapshot( + modify_original: F, + modify_serialized: G, + ) -> (Tower, Result) + where + F: Fn(&mut Tower, &Pubkey) -> (), + G: Fn(&PathBuf) -> (), + { + let dir = TempDir::new().unwrap(); + + // Use values that will not match the default derived from BankForks + let mut tower = Tower::new_for_tests(10, 0.9); + tower.save_path = dir.path().to_path_buf(); + + let identity_keypair = Arc::new(Keypair::new()); + modify_original(&mut tower, &identity_keypair.pubkey()); + + tower.save(&identity_keypair).unwrap(); + modify_serialized(&Tower::get_filename( + &tower.save_path, + &identity_keypair.pubkey(), + )); + let loaded = Tower::restore(&dir.path(), &identity_keypair.pubkey()); + + (tower, loaded) + } + + #[test] + fn test_load_tower_ok() { + let (tower, loaded) = + run_test_load_tower_snapshot(|tower, pubkey| tower.node_pubkey = *pubkey, |_| ()); + assert_eq!(loaded.unwrap(), tower) + } + + #[test] + fn test_load_tower_wrong_identity() { + let identity_keypair = Arc::new(Keypair::new()); + let tower = Tower::new_with_key(&Pubkey::default()); + assert_matches!( + tower.save(&identity_keypair), + Err(TowerError::WrongTower(_)) + ) + } + + #[test] + fn test_load_tower_invalid_signature() { + let (_, loaded) = run_test_load_tower_snapshot( + |tower, pubkey| tower.node_pubkey = *pubkey, + |path| { + let mut file = OpenOptions::new() + .read(true) + .write(true) + .open(path) + .unwrap(); + let mut buf = [0u8]; + assert_eq!(file.read(&mut buf).unwrap(), 1); + buf[0] = buf[0] + 1; + assert_eq!(file.seek(SeekFrom::Start(0)).unwrap(), 0); + assert_eq!(file.write(&buf).unwrap(), 1); + }, + ); + assert_matches!(loaded, Err(TowerError::InvalidSignature)) + } + + #[test] + fn test_load_tower_deser_failure() { + let (_, loaded) = run_test_load_tower_snapshot( + |tower, pubkey| tower.node_pubkey = *pubkey, + |path| { + OpenOptions::new() + .write(true) + .truncate(true) + .open(&path) + .expect(&format!("Failed to truncate file: {:?}", path)); + }, + ); + assert_matches!(loaded, Err(TowerError::SerializeError(_))) + } + + #[test] + fn test_load_tower_missing() { + let (_, loaded) = run_test_load_tower_snapshot( + |tower, pubkey| tower.node_pubkey = *pubkey, + |path| { + remove_file(path).unwrap(); + }, + ); + assert_matches!(loaded, Err(TowerError::IOError(_))) + } + + #[test] + fn test_reconcile_blockstore_roots_with_tower() { + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + assert_eq!(blockstore.last_root(), 0); + + let (shreds, _) = make_slot_entries(1, 0, 42); + blockstore.insert_shreds(shreds, None, false).unwrap(); + assert_eq!(blockstore.last_root(), 0); + + let mut tower = Tower::new_with_key(&Pubkey::default()); + tower.lockouts.root_slot = Some(1); + reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap(); + assert_eq!(blockstore.last_root(), 1); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 97a7614f5c2586..dde82431bf1397 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -153,6 +153,7 @@ impl ReplayStage { cluster_info: Arc, ledger_signal_receiver: Receiver, poh_recorder: Arc>, + mut tower: Tower, vote_tracker: Arc, cluster_slots: Arc, retransmit_slots_sender: RetransmitSlotsSender, @@ -173,7 +174,6 @@ impl ReplayStage { } = config; trace!("replay stage"); - let mut tower = Tower::new(&my_pubkey, &vote_account, &bank_forks.read().unwrap()); // Start the replay stage loop let (lockouts_sender, commitment_service) = AggregateCommitmentService::new( @@ -861,7 +861,15 @@ impl ReplayStage { } trace!("handle votable bank {}", bank.slot()); let (vote, tower_index) = tower.new_vote_from_bank(bank, vote_account_pubkey); - if let Some(new_root) = tower.record_bank_vote(vote) { + let new_root = tower.record_bank_vote(vote); + let last_vote = tower.last_vote_and_timestamp(); + + if let Err(err) = tower.save(&cluster_info.keypair) { + error!("Unable to save tower: {:?}", err); + std::process::exit(1); + } + + if let Some(new_root) = new_root { // get the root bank before squash let root_bank = bank_forks .read() @@ -915,7 +923,7 @@ impl ReplayStage { bank, vote_account_pubkey, authorized_voter_keypairs, - tower.last_vote_and_timestamp(), + last_vote, tower_index, ); Ok(()) diff --git a/core/src/tvu.rs b/core/src/tvu.rs index cc03e609353443..49455ff6fab8d5 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -9,6 +9,7 @@ use crate::{ cluster_info_vote_listener::VoteTracker, cluster_slots::ClusterSlots, commitment::BlockCommitmentCache, + consensus::Tower, ledger_cleanup_service::LedgerCleanupService, poh_recorder::PohRecorder, replay_stage::{ReplayStage, ReplayStageConfig}, @@ -20,19 +21,19 @@ use crate::{ sigverify_stage::SigVerifyStage, }; use crossbeam_channel::unbounded; -use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::{ bank_forks::BankForks, blockstore::{Blockstore, CompletedSlotsReceiver}, blockstore_processor::TransactionStatusSender, + leader_schedule_cache::LeaderScheduleCache, snapshot_package::AccountsPackageSender, }; use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signer}, }; -use std::collections::HashSet; use std::{ + collections::HashSet, net::UdpSocket, sync::{ atomic::AtomicBool, @@ -86,6 +87,7 @@ impl Tvu { ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, + tower: Tower, leader_schedule_cache: &Arc, exit: &Arc, completed_slots_receiver: CompletedSlotsReceiver, @@ -191,6 +193,7 @@ impl Tvu { cluster_info.clone(), ledger_signal_receiver, poh_recorder.clone(), + tower, vote_tracker, cluster_slots, retransmit_slots_sender, @@ -236,11 +239,15 @@ impl Tvu { #[cfg(test)] pub mod tests { use super::*; - use crate::banking_stage::create_test_recorder; - use crate::cluster_info::{ClusterInfo, Node}; + use crate::{ + banking_stage::create_test_recorder, + cluster_info::{ClusterInfo, Node}, + }; use serial_test_derive::serial; - use solana_ledger::create_new_tmp_ledger; - use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; + use solana_ledger::{ + create_new_tmp_ledger, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }; use solana_runtime::bank::Bank; use std::sync::atomic::Ordering; @@ -278,6 +285,7 @@ pub mod tests { )); let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); + let tower = Tower::new_with_key(&target1_keypair.pubkey()); let tvu = Tvu::new( &vote_keypair.pubkey(), vec![Arc::new(vote_keypair)], @@ -299,6 +307,7 @@ pub mod tests { block_commitment_cache.clone(), )), &poh_recorder, + tower, &leader_schedule_cache, &exit, completed_slots_receiver, diff --git a/core/src/validator.rs b/core/src/validator.rs index d22830d15b1944..3af2e2393ea9ee 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -5,6 +5,7 @@ use crate::{ cluster_info::{ClusterInfo, Node}, cluster_info_vote_listener::VoteTracker, commitment::BlockCommitmentCache, + consensus::{reconcile_blockstore_roots_with_tower, Tower}, contact_info::ContactInfo, gossip_service::{discover_cluster, GossipService}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, @@ -44,6 +45,7 @@ use solana_sdk::{ signature::{Keypair, Signer}, timing::timestamp, }; +use solana_vote_program::vote_state::VoteState; use std::{ collections::HashSet, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -79,6 +81,7 @@ pub struct ValidatorConfig { pub no_rocksdb_compaction: bool, pub accounts_hash_interval_slots: u64, pub max_genesis_archive_unpacked_size: u64, + pub require_tower: bool, } impl Default for ValidatorConfig { @@ -105,6 +108,7 @@ impl Default for ValidatorConfig { no_rocksdb_compaction: false, accounts_hash_interval_slots: std::u64::MAX, max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, + require_tower: false, } } } @@ -176,7 +180,6 @@ impl Validator { sigverify::init(); info!("Done."); - info!("creating bank..."); let ( genesis_config, bank_forks, @@ -185,14 +188,14 @@ impl Validator { completed_slots_receiver, leader_schedule_cache, snapshot_hash, - ) = new_banks_from_blockstore(config, ledger_path, poh_verify); + tower, + ) = new_banks_from_ledger(&id, vote_account, config, ledger_path, poh_verify); let leader_schedule_cache = Arc::new(leader_schedule_cache); - let exit = Arc::new(AtomicBool::new(false)); let bank = bank_forks.working_bank(); let bank_forks = Arc::new(RwLock::new(bank_forks)); - info!("Starting validator from slot {}", bank.slot()); + info!("Starting validator with working bank slot {}", bank.slot()); { let hard_forks: Vec<_> = bank.hard_forks().read().unwrap().iter().copied().collect(); if !hard_forks.is_empty() { @@ -200,16 +203,12 @@ impl Validator { } } - let mut validator_exit = ValidatorExit::default(); - let exit_ = exit.clone(); - validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); - let validator_exit = Arc::new(RwLock::new(Some(validator_exit))); - node.info.wallclock = timestamp(); node.info.shred_version = compute_shred_version( &genesis_config.hash(), Some(&bank.hard_forks().read().unwrap()), ); + Self::print_node_info(&node); if let Some(expected_shred_version) = config.expected_shred_version { @@ -222,6 +221,12 @@ impl Validator { } } + let mut validator_exit = ValidatorExit::default(); + let exit = Arc::new(AtomicBool::new(false)); + let exit_ = exit.clone(); + validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); + let validator_exit = Arc::new(RwLock::new(Some(validator_exit))); + let cluster_info = Arc::new(ClusterInfo::new(node.info.clone(), keypair.clone())); let blockstore = Arc::new(blockstore); let block_commitment_cache = Arc::new(RwLock::new( @@ -420,6 +425,7 @@ impl Validator { ledger_signal_receiver, &subscriptions, &poh_recorder, + tower, &leader_schedule_cache, &exit, completed_slots_receiver, @@ -542,10 +548,21 @@ impl Validator { } } +fn empty_vote_account(bank: &Arc, vote_account: &Pubkey) -> Option { + if let Some(account) = &bank.get_account(vote_account) { + if let Some(vote_state) = VoteState::from(&account) { + return Some(vote_state.votes.is_empty()); + } + } + None +} + #[allow(clippy::type_complexity)] -fn new_banks_from_blockstore( +fn new_banks_from_ledger( + validator_identity: &Pubkey, + vote_account: &Pubkey, config: &ValidatorConfig, - blockstore_path: &Path, + ledger_path: &Path, poh_verify: bool, ) -> ( GenesisConfig, @@ -555,9 +572,10 @@ fn new_banks_from_blockstore( CompletedSlotsReceiver, LeaderScheduleCache, Option<(Slot, Hash)>, + Tower, ) { - let genesis_config = - open_genesis_config(blockstore_path, config.max_genesis_archive_unpacked_size); + info!("loading ledger from {:?}...", ledger_path); + let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size); // This needs to be limited otherwise the state in the VoteAccount data // grows too large @@ -572,18 +590,23 @@ fn new_banks_from_blockstore( if let Some(expected_genesis_hash) = config.expected_genesis_hash { if genesis_hash != expected_genesis_hash { error!("genesis hash mismatch: expected {}", expected_genesis_hash); - error!( - "Delete the ledger directory to continue: {:?}", - blockstore_path - ); + error!("Delete the ledger directory to continue: {:?}", ledger_path); process::exit(1); } } let (mut blockstore, ledger_signal_receiver, completed_slots_receiver) = - Blockstore::open_with_signal(blockstore_path).expect("Failed to open ledger database"); + Blockstore::open_with_signal(ledger_path).expect("Failed to open ledger database"); blockstore.set_no_compaction(config.no_rocksdb_compaction); + let restored_tower = Tower::restore(ledger_path, &validator_identity); + if let Ok(tower) = &restored_tower { + reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap_or_else(|err| { + error!("Failed to reconcile blockstore with tower: {:?}", err); + std::process::exit(1); + }); + } + let process_options = blockstore_processor::ProcessOptions { poh_verify, dev_halt_at_slot: config.dev_halt_at_slot, @@ -601,9 +624,39 @@ fn new_banks_from_blockstore( ) .unwrap_or_else(|err| { error!("Failed to load ledger: {:?}", err); - std::process::exit(1); + process::exit(1); }); + let tower = match restored_tower { + Ok(mut tower) => { + // The tower root can be older if the validator booted from a newer snapshot, so + // tower lockouts may need adjustment + tower.adjust_lockouts_if_newer_root(bank_forks.root()); + tower + } + Err(err) => { + if config.require_tower + && empty_vote_account(&bank_forks.working_bank(), &vote_account) != Some(true) + { + error!("Tower restore failed: {:?}", err); + process::exit(1); + } + info!("Rebuilding tower from the latest vote account"); + Tower::new( + &validator_identity, + &vote_account, + &bank_forks, + &ledger_path, + ) + } + }; + + info!( + "Tower state: root slot={:?}, last vote slot={:?}", + tower.root(), + tower.last_lockout_vote_slot() + ); + leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone()); bank_forks.set_snapshot_config(config.snapshot_config.clone()); @@ -617,6 +670,7 @@ fn new_banks_from_blockstore( completed_slots_receiver, leader_schedule_cache, snapshot_hash, + tower, ) } @@ -776,8 +830,7 @@ fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: boo let my_id = cluster_info.id(); for (activated_stake, vote_account) in bank.vote_accounts().values() { - let vote_state = - solana_vote_program::vote_state::VoteState::from(&vote_account).unwrap_or_default(); + let vote_state = VoteState::from(&vote_account).unwrap_or_default(); total_activated_stake += activated_stake; if *activated_stake == 0 { diff --git a/core/tests/fork-selection.rs b/core/tests/fork-selection.rs index 66fb20b7615ea9..696fbaf0b6f6eb 100644 --- a/core/tests/fork-selection.rs +++ b/core/tests/fork-selection.rs @@ -74,8 +74,7 @@ extern crate rand; use rand::{thread_rng, Rng}; -use std::collections::HashMap; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; #[derive(Clone, Default, Debug, Hash, Eq, PartialEq)] pub struct Fork { diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 960db5dbfb49a7..f904163a42a582 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -181,7 +181,6 @@ impl LocalCluster { ); let mut validators = HashMap::new(); - error!("leader_pubkey: {}", leader_pubkey); let leader_info = ValidatorInfo { keypair: leader_keypair.clone(), voting_keypair: leader_voting_keypair, diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 9ef8e3f3bdb46b..5397bb390a0fe9 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -4,8 +4,10 @@ use serial_test_derive::serial; use solana_client::rpc_client::RpcClient; use solana_client::thin_client::create_client; use solana_core::{ - broadcast_stage::BroadcastStageType, consensus::VOTE_THRESHOLD_DEPTH, - gossip_service::discover_cluster, validator::ValidatorConfig, + broadcast_stage::BroadcastStageType, + consensus::{Tower, VOTE_THRESHOLD_DEPTH}, + gossip_service::discover_cluster, + validator::ValidatorConfig, }; use solana_download_utils::download_snapshot; use solana_ledger::bank_forks::CompressionType; @@ -838,6 +840,7 @@ fn test_snapshot_download() { #[test] #[serial] fn test_snapshot_restart_tower() { + solana_logger::setup(); // First set up the cluster with 2 nodes let snapshot_interval_slots = 10; let num_account_paths = 2; @@ -1161,6 +1164,127 @@ fn test_no_voting() { } } +#[test] +#[serial] +fn test_validator_saves_tower() { + solana_logger::setup(); + + let validator_config = ValidatorConfig { + require_tower: true, + ..ValidatorConfig::default() + }; + let validator_identity_keypair = Arc::new(Keypair::new()); + let validator_id = validator_identity_keypair.pubkey(); + let config = ClusterConfig { + cluster_lamports: 10_000, + node_stakes: vec![100], + validator_configs: vec![validator_config], + validator_keys: Some(vec![validator_identity_keypair.clone()]), + ..ClusterConfig::default() + }; + let mut cluster = LocalCluster::new(&config); + + let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + + let ledger_path = cluster + .validators + .get(&validator_id) + .unwrap() + .info + .ledger_path + .clone(); + + // Wait for some votes to be generated + loop { + if let Ok(slot) = validator_client.get_slot_with_commitment(CommitmentConfig::recent()) { + trace!("current slot: {}", slot); + if slot > 2 { + break; + } + } + sleep(Duration::from_millis(10)); + } + + // Stop validator and check saved tower + let validator_info = cluster.exit_node(&validator_id); + let tower1 = Tower::restore(&ledger_path, &validator_id).unwrap(); + trace!("tower1: {:?}", tower1); + assert_eq!(tower1.root(), Some(0)); + + // Restart the validator and wait for a new root + cluster.restart_node(&validator_id, validator_info); + let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + + // Wait for the first root + loop { + if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + trace!("current root: {}", root); + if root > 0 { + break; + } + } + sleep(Duration::from_millis(50)); + } + + // Stop validator, and check saved tower + let validator_info = cluster.exit_node(&validator_id); + let tower2 = Tower::restore(&ledger_path, &validator_id).unwrap(); + trace!("tower2: {:?}", tower2); + assert_eq!(tower2.root(), Some(1)); + + // Rollback saved tower to `tower1` to simulate a validator starting from a newer snapshot + // without having to wait for that snapshot to be generated in this test + tower1.save(&validator_identity_keypair).unwrap(); + + cluster.restart_node(&validator_id, validator_info); + let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + + // Wait for a new root, demonstrating the validator was able to make progress from the older `tower1` + loop { + if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + trace!("current root: {}", root); + if root > 1 { + break; + } + } + sleep(Duration::from_millis(50)); + } + + // Check the new root is reflected in the saved tower state + let mut validator_info = cluster.exit_node(&validator_id); + let tower3 = Tower::restore(&ledger_path, &validator_id).unwrap(); + trace!("tower3: {:?}", tower3); + assert!(tower3.root().unwrap() > 1); + + // Remove the tower file entirely and allow the validator to start without a tower. It will + // rebuild tower from its vote account contents + fs::remove_file(Tower::get_filename(&ledger_path, &validator_id)).unwrap(); + validator_info.config.require_tower = false; + + cluster.restart_node(&validator_id, validator_info); + let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + + // Wait for a couple more slots to pass so another vote occurs + let current_slot = validator_client + .get_slot_with_commitment(CommitmentConfig::recent()) + .unwrap(); + loop { + if let Ok(slot) = validator_client.get_slot_with_commitment(CommitmentConfig::recent()) { + trace!("current_slot: {}, slot: {}", current_slot, slot); + if slot > current_slot + 1 { + break; + } + } + sleep(Duration::from_millis(50)); + } + + cluster.close_preserve_ledgers(); + + let tower4 = Tower::restore(&ledger_path, &validator_id).unwrap(); + trace!("tower4: {:?}", tower4); + assert_eq!(tower4.root(), tower3.root()); +} + fn wait_for_next_snapshot( cluster: &LocalCluster, snapshot_package_output_path: &Path, diff --git a/multinode-demo/bootstrap-validator.sh b/multinode-demo/bootstrap-validator.sh index 80731b2923515c..f0ddfb1dcf081b 100755 --- a/multinode-demo/bootstrap-validator.sh +++ b/multinode-demo/bootstrap-validator.sh @@ -81,6 +81,7 @@ ledger_dir="$SOLANA_CONFIG_DIR"/bootstrap-validator args+=( --enable-rpc-exit --enable-rpc-set-log-filter + --require-tower --ledger "$ledger_dir" --rpc-port 8899 --snapshot-interval-slots 200 diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 0505cacbbe0b6a..8a6f7a3cffb017 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -222,6 +222,7 @@ default_arg --ledger "$ledger_dir" default_arg --log - default_arg --enable-rpc-exit default_arg --enable-rpc-set-log-filter +default_arg --require-tower if [[ -n $SOLANA_CUDA ]]; then program=$solana_validator_cuda diff --git a/run.sh b/run.sh index 1142d9eb230bc0..a70759e1799502 100755 --- a/run.sh +++ b/run.sh @@ -98,6 +98,7 @@ args=( --enable-rpc-exit --enable-rpc-transaction-history --init-complete-file "$dataDir"/init-completed + --require-tower ) solana-validator "${args[@]}" & validator=$! diff --git a/validator/src/main.rs b/validator/src/main.rs index 074f0355a22b44..48c42ff8b32a99 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -700,6 +700,12 @@ pub fn main() { .takes_value(false) .help("Use CUDA"), ) + .arg( + clap::Arg::with_name("require_tower") + .long("require-tower") + .takes_value(false) + .help("Refuse to start if saved tower state is not found"), + ) .arg( Arg::with_name("expected_genesis_hash") .long("expected-genesis-hash") @@ -860,6 +866,7 @@ pub fn main() { }; let mut validator_config = ValidatorConfig { + require_tower: matches.is_present("require_tower"), dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(), expected_genesis_hash: matches .value_of("expected_genesis_hash")