diff --git a/core/benches/consensus.rs b/core/benches/consensus.rs new file mode 100644 index 00000000000000..a6954ce8c23cad --- /dev/null +++ b/core/benches/consensus.rs @@ -0,0 +1,34 @@ +#![feature(test)] + +extern crate solana_core; +extern crate test; + +use solana_core::consensus::Tower; +use solana_ledger::bank_forks::BankForks; +use solana_runtime::bank::Bank; +use solana_sdk::{ + pubkey::Pubkey, + signature::{Keypair, Signer}, +}; +use std::sync::Arc; +use tempfile::TempDir; +use test::Bencher; + +#[bench] +fn bench_save_tower(bench: &mut Bencher) { + let dir = TempDir::new().unwrap(); + let path = dir.path(); + + let vote_account_pubkey = &Pubkey::default(); + let node_keypair = Arc::new(Keypair::new()); + let tower = Tower::new( + &node_keypair.pubkey(), + &vote_account_pubkey, + &BankForks::new(0, Bank::default()), + &path, + ); + + bench.iter(move || { + tower.save(&node_keypair).unwrap(); + }); +} diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 055b02c394477e..ff7b14082e4b1d 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -7,18 +7,25 @@ use solana_sdk::{ clock::{Slot, UnixTimestamp}, hash::Hash, pubkey::Pubkey, + signature::{Keypair, Signature, Signer}, }; use solana_vote_program::vote_state::{ BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY, TIMESTAMP_SLOT_INTERVAL, }; use std::{ collections::{HashMap, HashSet}, + fs::{self, File}, + io::BufReader, + path::{Path, PathBuf}, sync::Arc, }; +use thiserror::Error; pub const VOTE_THRESHOLD_DEPTH: usize = 8; pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; +pub type Result = std::result::Result; + #[derive(Default, Debug, Clone)] pub struct StakeLockout { lockout: u64, @@ -37,6 +44,7 @@ impl StakeLockout { } } +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] pub struct Tower { node_pubkey: Pubkey, threshold_depth: usize, @@ -44,6 +52,8 @@ pub struct Tower { lockouts: VoteState, last_vote: Vote, last_timestamp: BlockTimestamp, + #[serde(skip)] + save_path: PathBuf, } impl Default for Tower { @@ -55,19 +65,28 @@ impl Default for Tower { lockouts: VoteState::default(), last_vote: Vote::default(), last_timestamp: BlockTimestamp::default(), + save_path: PathBuf::default(), } } } impl Tower { - pub fn new(node_pubkey: &Pubkey, vote_account_pubkey: &Pubkey, bank_forks: &BankForks) -> Self { - let mut tower = Self::new_with_key(node_pubkey); - + pub fn new( + node_pubkey: &Pubkey, + vote_account_pubkey: &Pubkey, + bank_forks: &BankForks, + save_path: &Path, + ) -> Self { + let mut tower = Self { + node_pubkey: *node_pubkey, + save_path: PathBuf::from(save_path), + ..Tower::default() + }; tower.initialize_lockouts_from_bank_forks(&bank_forks, vote_account_pubkey); - tower } + #[cfg(test)] pub fn new_with_key(node_pubkey: &Pubkey) -> Self { Self { node_pubkey: *node_pubkey, @@ -273,6 +292,14 @@ impl Tower { self.lockouts.root_slot } + pub fn last_lockout_vote_slot(&self) -> Option { + self.lockouts + .votes + .iter() + .max_by(|x, y| x.slot.cmp(&y.slot)) + .map(|v| v.slot) + } + // a slot is not recent if it's older than the newest vote we have pub fn is_recent(&self, slot: u64) -> bool { if let Some(last_vote) = self.lockouts.votes.back() { @@ -438,12 +465,12 @@ impl Tower { vote_account_pubkey: &Pubkey, ) { if let Some(bank) = self.find_heaviest_bank(bank_forks) { - let root = bank_forks.root(); if let Some((_stake, vote_account)) = bank.vote_accounts().get(vote_account_pubkey) { + let root_slot = bank_forks.root(); let mut vote_state = VoteState::deserialize(&vote_account.data) .expect("vote_account isn't a VoteState?"); - vote_state.root_slot = Some(root); - vote_state.votes.retain(|v| v.slot > root); + vote_state.root_slot = Some(root_slot); + vote_state.votes.retain(|v| v.slot > root_slot); trace!( "{} lockouts initialized to {:?}", self.node_pubkey, @@ -455,6 +482,12 @@ impl Tower { "vote account's node_pubkey doesn't match", ); self.lockouts = vote_state; + } else { + info!( + "vote account({}) not found in heaviest bank (slot={})", + vote_account_pubkey, + bank.slot() + ); } } } @@ -473,6 +506,117 @@ impl Tower { None } } + + fn get_filename(path: &Path, node_pubkey: &Pubkey) -> PathBuf { + PathBuf::from(path) + .join(format!("tower-{}", node_pubkey)) + .with_extension("bin") + } + + pub fn save(&self, node_keypair: &Arc) -> Result<()> { + assert_eq!(node_keypair.pubkey(), self.node_pubkey); + + if self.node_pubkey != node_keypair.pubkey() { + return Err(TowerError::WrongTower(format!( + "node_pubkey is {:?} but found tower for {:?}", + node_keypair.pubkey(), + self.node_pubkey + ))); + } + + fs::create_dir_all(&self.save_path)?; + let filename = Self::get_filename(&self.save_path, &self.node_pubkey); + let new_filename = filename.with_extension("new"); + { + let mut file = File::create(&new_filename)?; + let saveable_tower = SavedTower::new(self, node_keypair)?; + bincode::serialize_into(&mut file, &saveable_tower)?; + } + fs::rename(&new_filename, &filename)?; + Ok(()) + } + + pub fn restore(save_path: &Path, node_pubkey: &Pubkey) -> Result { + let filename = Self::get_filename(save_path, node_pubkey); + + info!("Restoring tower from {:?}", filename); + let file = File::open(filename)?; + let mut stream = BufReader::new(file); + + let saved_tower: SavedTower = bincode::deserialize_from(&mut stream)?; + if !saved_tower.verify(node_pubkey) { + return Err(TowerError::InvalidSignature); + } + let mut tower = saved_tower.deserialize()?; + tower.save_path = save_path.to_path_buf(); + + // check that the tower actually belongs to this node + if &tower.node_pubkey != node_pubkey { + return Err(TowerError::WrongTower(format!( + "node_pubkey is {:?} but found tower for {:?}", + node_pubkey, tower.node_pubkey + ))); + } + + Ok(tower) + } +} + +#[derive(Error, Debug)] +pub enum TowerError { + #[error("IO Error: {0}")] + IOError(#[from] std::io::Error), + + #[error("Serialization Error: {0}")] + SerializeError(#[from] Box), + + #[error("The signature on the saved tower is invalid")] + InvalidSignature, + + #[error("The tower does not match this validator: {0}")] + WrongTower(String), +} + +#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq)] +pub struct SavedTower { + signature: Signature, + data: Vec, +} + +impl SavedTower { + pub fn new(tower: &Tower, keypair: &Arc) -> Result { + let data = bincode::serialize(tower)?; + let signature = keypair.sign_message(&data); + Ok(Self { data, signature }) + } + + pub fn verify(&self, pubkey: &Pubkey) -> bool { + self.signature.verify(pubkey.as_ref(), &self.data) + } + + pub fn deserialize(&self) -> Result { + bincode::deserialize(&self.data).map_err(|e| e.into()) + } +} + +// Given an untimely crash, tower may have roots that are not reflected in blockstore because +// `ReplayState::handle_votable_bank()` saves tower before setting blockstore roots +pub fn reconcile_blockstore_roots_with_tower( + tower: &Tower, + blockstore: &solana_ledger::blockstore::Blockstore, +) -> solana_ledger::blockstore_db::Result<()> { + if let Some(tower_root) = tower.root() { + let last_blockstore_root = blockstore.last_root(); + if last_blockstore_root < tower_root { + let new_roots: Vec<_> = blockstore + .slot_meta_iterator(last_blockstore_root + 1)? + .map(|(slot, _)| slot) + .take_while(|slot| *slot <= tower_root) + .collect(); + blockstore.set_roots(&new_roots)? + } + } + Ok(()) } #[cfg(test)] diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 08ad1615fa336a..da3416e4c1310c 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -117,6 +117,7 @@ impl ReplayStage { cluster_info: Arc, ledger_signal_receiver: Receiver, poh_recorder: Arc>, + mut tower: Tower, vote_tracker: Arc, cluster_slots: Arc, retransmit_slots_sender: RetransmitSlotsSender, @@ -138,7 +139,6 @@ impl ReplayStage { let (root_bank_sender, root_bank_receiver) = channel(); trace!("replay stage"); - let mut tower = Tower::new(&my_pubkey, &vote_account, &bank_forks.read().unwrap()); // Start the replay stage loop let (lockouts_sender, commitment_service) = @@ -773,7 +773,15 @@ impl ReplayStage { } trace!("handle votable bank {}", bank.slot()); let (vote, tower_index) = tower.new_vote_from_bank(bank, vote_account_pubkey); - if let Some(new_root) = tower.record_bank_vote(vote) { + let new_root = tower.record_bank_vote(vote); + let last_vote = tower.last_vote_and_timestamp(); + + if let Err(err) = tower.save(&cluster_info.keypair) { + error!("Unable to save tower: {:?}", err); + std::process::exit(1); + } + + if let Some(new_root) = new_root { // get the root bank before squash let root_bank = bank_forks .read() @@ -831,7 +839,7 @@ impl ReplayStage { bank, vote_account_pubkey, authorized_voter_keypairs, - tower.last_vote_and_timestamp(), + last_vote, tower_index, ); Ok(()) diff --git a/core/src/tvu.rs b/core/src/tvu.rs index c5a270c58c0458..294cad2d365f4a 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -9,6 +9,7 @@ use crate::{ cluster_info_vote_listener::VoteTracker, cluster_slots::ClusterSlots, commitment::BlockCommitmentCache, + consensus::Tower, ledger_cleanup_service::LedgerCleanupService, poh_recorder::PohRecorder, replay_stage::{ReplayStage, ReplayStageConfig}, @@ -21,19 +22,19 @@ use crate::{ storage_stage::{StorageStage, StorageState}, }; use crossbeam_channel::unbounded; -use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::{ bank_forks::BankForks, blockstore::{Blockstore, CompletedSlotsReceiver}, blockstore_processor::TransactionStatusSender, + leader_schedule_cache::LeaderScheduleCache, snapshot_package::AccountsPackageSender, }; use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signer}, }; -use std::collections::HashSet; use std::{ + collections::HashSet, net::UdpSocket, sync::{ atomic::AtomicBool, @@ -91,6 +92,7 @@ impl Tvu { ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, + tower: Tower, leader_schedule_cache: &Arc, exit: &Arc, completed_slots_receiver: CompletedSlotsReceiver, @@ -204,6 +206,7 @@ impl Tvu { cluster_info.clone(), ledger_signal_receiver, poh_recorder.clone(), + tower, vote_tracker, cluster_slots, retransmit_slots_sender, @@ -263,11 +266,15 @@ impl Tvu { #[cfg(test)] pub mod tests { use super::*; - use crate::banking_stage::create_test_recorder; - use crate::cluster_info::{ClusterInfo, Node}; + use crate::{ + banking_stage::create_test_recorder, + cluster_info::{ClusterInfo, Node}, + }; use serial_test_derive::serial; - use solana_ledger::create_new_tmp_ledger; - use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; + use solana_ledger::{ + create_new_tmp_ledger, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }; use solana_runtime::bank::Bank; use std::sync::atomic::Ordering; @@ -304,6 +311,7 @@ pub mod tests { BlockCommitmentCache::default_with_blockstore(blockstore.clone()), )); let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); + let tower = Tower::new_with_key(&target1_keypair.pubkey()); let tvu = Tvu::new( &vote_keypair.pubkey(), vec![Arc::new(vote_keypair)], @@ -323,6 +331,7 @@ pub mod tests { l_receiver, &Arc::new(RpcSubscriptions::new(&exit, block_commitment_cache.clone())), &poh_recorder, + tower, &leader_schedule_cache, &exit, completed_slots_receiver, diff --git a/core/src/validator.rs b/core/src/validator.rs index bb9089d3d38f7e..39223934e2f5d4 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -5,6 +5,7 @@ use crate::{ cluster_info::{ClusterInfo, Node}, cluster_info_vote_listener::VoteTracker, commitment::BlockCommitmentCache, + consensus::{reconcile_blockstore_roots_with_tower, Tower}, contact_info::ContactInfo, gossip_service::{discover_cluster, GossipService}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, @@ -45,6 +46,7 @@ use solana_sdk::{ signature::{Keypair, Signer}, timing::timestamp, }; +use solana_vote_program::vote_state::VoteState; use std::{ collections::HashSet, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -82,6 +84,7 @@ pub struct ValidatorConfig { pub no_rocksdb_compaction: bool, pub accounts_hash_interval_slots: u64, pub max_genesis_archive_unpacked_size: u64, + pub require_tower: bool, } impl Default for ValidatorConfig { @@ -110,6 +113,7 @@ impl Default for ValidatorConfig { no_rocksdb_compaction: false, accounts_hash_interval_slots: std::u64::MAX, max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, + require_tower: false, } } } @@ -182,7 +186,6 @@ impl Validator { sigverify::init(); info!("Done."); - info!("creating bank..."); let ( genesis_config, bank_forks, @@ -191,14 +194,14 @@ impl Validator { completed_slots_receiver, leader_schedule_cache, snapshot_hash, - ) = new_banks_from_blockstore(config, ledger_path, poh_verify); + tower, + ) = new_banks_from_ledger(&id, vote_account, config, ledger_path, poh_verify); let leader_schedule_cache = Arc::new(leader_schedule_cache); - let exit = Arc::new(AtomicBool::new(false)); let bank = bank_forks.working_bank(); let bank_forks = Arc::new(RwLock::new(bank_forks)); - info!("Starting validator from slot {}", bank.slot()); + info!("Starting validator with working bank slot {}", bank.slot()); { let hard_forks: Vec<_> = bank.hard_forks().read().unwrap().iter().copied().collect(); if !hard_forks.is_empty() { @@ -206,16 +209,12 @@ impl Validator { } } - let mut validator_exit = ValidatorExit::default(); - let exit_ = exit.clone(); - validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); - let validator_exit = Arc::new(RwLock::new(Some(validator_exit))); - node.info.wallclock = timestamp(); node.info.shred_version = compute_shred_version( &genesis_config.hash(), Some(&bank.hard_forks().read().unwrap()), ); + Self::print_node_info(&node); if let Some(expected_shred_version) = config.expected_shred_version { @@ -228,6 +227,12 @@ impl Validator { } } + let mut validator_exit = ValidatorExit::default(); + let exit = Arc::new(AtomicBool::new(false)); + let exit_ = exit.clone(); + validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); + let validator_exit = Arc::new(RwLock::new(Some(validator_exit))); + let cluster_info = Arc::new(ClusterInfo::new(node.info.clone(), keypair.clone())); let storage_state = StorageState::new( @@ -425,6 +430,7 @@ impl Validator { ledger_signal_receiver, &subscriptions, &poh_recorder, + tower, &leader_schedule_cache, &exit, completed_slots_receiver, @@ -552,10 +558,25 @@ impl Validator { } } +fn empty_vote_account(bank_forks: &BankForks, vote_account: &Pubkey) -> Option { + if let Some(account) = &bank_forks + .get(bank_forks.root()) + .expect("Failed to get root bank") + .get_account(vote_account) + { + if let Some(vote_state) = VoteState::from(&account) { + return Some(vote_state.votes.is_empty()); + } + } + return None; +} + #[allow(clippy::type_complexity)] -fn new_banks_from_blockstore( +fn new_banks_from_ledger( + validator_identity: &Pubkey, + vote_account: &Pubkey, config: &ValidatorConfig, - blockstore_path: &Path, + ledger_path: &Path, poh_verify: bool, ) -> ( GenesisConfig, @@ -565,9 +586,10 @@ fn new_banks_from_blockstore( CompletedSlotsReceiver, LeaderScheduleCache, Option<(Slot, Hash)>, + Tower, ) { - let genesis_config = - open_genesis_config(blockstore_path, config.max_genesis_archive_unpacked_size); + info!("loading ledger..."); + let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size); // This needs to be limited otherwise the state in the VoteAccount data // grows too large @@ -582,18 +604,23 @@ fn new_banks_from_blockstore( if let Some(expected_genesis_hash) = config.expected_genesis_hash { if genesis_hash != expected_genesis_hash { error!("genesis hash mismatch: expected {}", expected_genesis_hash); - error!( - "Delete the ledger directory to continue: {:?}", - blockstore_path - ); + error!("Delete the ledger directory to continue: {:?}", ledger_path); process::exit(1); } } let (mut blockstore, ledger_signal_receiver, completed_slots_receiver) = - Blockstore::open_with_signal(blockstore_path).expect("Failed to open ledger database"); + Blockstore::open_with_signal(ledger_path).expect("Failed to open ledger database"); blockstore.set_no_compaction(config.no_rocksdb_compaction); + let restored_tower = Tower::restore(ledger_path, &validator_identity); + if let Ok(tower) = &restored_tower { + reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap_or_else(|err| { + error!("Failed to reconcile blockstore with tower: {:?}", err); + std::process::exit(1); + }); + } + let process_options = blockstore_processor::ProcessOptions { poh_verify, dev_halt_at_slot: config.dev_halt_at_slot, @@ -611,9 +638,42 @@ fn new_banks_from_blockstore( ) .unwrap_or_else(|err| { error!("Failed to load ledger: {:?}", err); - std::process::exit(1); + process::exit(1); + }); + + let tower = restored_tower.unwrap_or_else(|err| { + if config.require_tower && empty_vote_account(&bank_forks, &vote_account) != Some(true) { + error!("Tower restore failed: {:?}", err); + process::exit(1); + } + info!("Rebuilding tower from the latest vote account"); + Tower::new( + &validator_identity, + &vote_account, + &bank_forks, + &ledger_path, + ) }); + info!( + "Tower state: root slot={:?}, last vote slot={:?}", + tower.root(), + tower.last_lockout_vote_slot() + ); + + if let Some(tower_root) = tower.root() { + if tower_root < bank_forks.root() { + // This should be recoverable but right now Tower can't handle it, so bail cleanly instead + // of showering the log with "Couldn't vote on heaviest fork" errors + error!( + "Tower root ({}) is older than ledger root ({})", + tower_root, + bank_forks.root() + ); + process::exit(1); + } + } + leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone()); bank_forks.set_snapshot_config(config.snapshot_config.clone()); @@ -627,6 +687,7 @@ fn new_banks_from_blockstore( completed_slots_receiver, leader_schedule_cache, snapshot_hash, + tower, ) } @@ -788,8 +849,7 @@ fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: boo let my_id = cluster_info.id(); for (activated_stake, vote_account) in bank.vote_accounts().values() { - let vote_state = - solana_vote_program::vote_state::VoteState::from(&vote_account).unwrap_or_default(); + let vote_state = VoteState::from(&vote_account).unwrap_or_default(); total_activated_stake += activated_stake; if *activated_stake == 0 { diff --git a/core/tests/fork-selection.rs b/core/tests/fork-selection.rs index c89b9d4cd526e2..21634ef020e667 100644 --- a/core/tests/fork-selection.rs +++ b/core/tests/fork-selection.rs @@ -74,8 +74,7 @@ extern crate rand; use rand::{thread_rng, Rng}; -use std::collections::HashMap; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; #[derive(Clone, Default, Debug, Hash, Eq, PartialEq)] pub struct Fork { diff --git a/multinode-demo/bootstrap-validator.sh b/multinode-demo/bootstrap-validator.sh index 80731b2923515c..f0ddfb1dcf081b 100755 --- a/multinode-demo/bootstrap-validator.sh +++ b/multinode-demo/bootstrap-validator.sh @@ -81,6 +81,7 @@ ledger_dir="$SOLANA_CONFIG_DIR"/bootstrap-validator args+=( --enable-rpc-exit --enable-rpc-set-log-filter + --require-tower --ledger "$ledger_dir" --rpc-port 8899 --snapshot-interval-slots 200 diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 9c02c3c7b42fa0..567d8f1653ac83 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -228,6 +228,7 @@ default_arg --ledger "$ledger_dir" default_arg --log - default_arg --enable-rpc-exit default_arg --enable-rpc-set-log-filter +default_arg --require-tower if [[ -n $SOLANA_CUDA ]]; then program=$solana_validator_cuda diff --git a/run.sh b/run.sh index 1142d9eb230bc0..a70759e1799502 100755 --- a/run.sh +++ b/run.sh @@ -98,6 +98,7 @@ args=( --enable-rpc-exit --enable-rpc-transaction-history --init-complete-file "$dataDir"/init-completed + --require-tower ) solana-validator "${args[@]}" & validator=$! diff --git a/validator/src/main.rs b/validator/src/main.rs index c9e547678de750..4818fc7a75d522 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -705,6 +705,12 @@ pub fn main() { .takes_value(false) .help("Use CUDA"), ) + .arg( + clap::Arg::with_name("require_tower") + .long("require-tower") + .takes_value(false) + .help("Refuse to start if saved tower state is not found"), + ) .arg( Arg::with_name("expected_genesis_hash") .long("expected-genesis-hash") @@ -867,6 +873,7 @@ pub fn main() { }; let mut validator_config = ValidatorConfig { + require_tower: matches.is_present("require_tower"), dev_sigverify_disabled: matches.is_present("dev_no_sigverify"), dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(), expected_genesis_hash: matches